source: TI12-security/trunk/python/MyProxyClient/test/test_myproxyclient.py @ 4838

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/MyProxyClient/test/test_myproxyclient.py@4838
Revision 4838, 9.8 KB checked in by pjkersha, 11 years ago (diff)
  • Added LICENSE file to all egg top-level directories - contains the STFC copyirght statement and BSD license.
  • ndg.security.server.wsgi.authn - WSGI authentication module contains HTTP Basic Authentication middleware class
  • MyProxyClient? package - modified license from LGPL to BSD
  • ndg.security.server.wsgi.pep, ndg.security.server.wsgi.ssl: implementation for protection of pyDAP deployment with SSL Client AuthN.
  • ndg.security.common.X509: added Parse class method to X500DN class.
  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""MyProxy Client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "02/07/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = """BSD"""
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12
13import unittest
14import os
15import sys
16import getpass
17import traceback
18
19from myproxy.client import CaseSensitiveConfigParser, MyProxyClient
20
21xpdVars = os.path.expandvars
22jnPath = os.path.join
23mkPath = lambda file: jnPath(os.environ['MYPROXYCLIENT_UNITTEST_DIR'], file)
24
25class _MyProxyClientTestCase(unittest.TestCase):
26    '''Base implements environment settings common to all test case classes'''
27    if 'NDGSEC_INT_DEBUG' in os.environ:
28        import pdb
29        pdb.set_trace()
30   
31    if 'MYPROXYCLIENT_UNITTEST_DIR' not in os.environ:
32        os.environ['MYPROXYCLIENT_UNITTEST_DIR'] = \
33            os.path.abspath(os.path.dirname(__file__))
34
35
36class MyProxyClientLiveTestCase(_MyProxyClientTestCase):
37    '''Tests require a connection to a real MyProxy service running on a host
38    '''
39   
40    def setUp(self):
41       
42        super(MyProxyClientLiveTestCase, self).setUp()
43       
44        configParser = CaseSensitiveConfigParser()
45        configFilePath = jnPath(os.environ['MYPROXYCLIENT_UNITTEST_DIR'],
46                                "myProxyClientTest.cfg")
47        configParser.read(configFilePath)
48       
49        self.cfg = {}
50        for section in configParser.sections():
51            self.cfg[section] = dict(configParser.items(section))
52           
53        self.clnt = MyProxyClient(
54                        cfgFilePath=xpdVars(self.cfg['setUp']['cfgFilePath']))
55       
56
57    def test1Store(self):
58        '''test1Store: upload X509 cert and private key to repository'''
59        thisSection = self.cfg['test1Store']
60       
61        passphrase = thisSection.get('passphrase')
62        if passphrase is None:
63            passphrase = getpass.getpass(prompt="\ntest1Store credential "
64                                                "pass-phrase: ")
65           
66        ownerPassphrase = thisSection.get('ownerPassphrase')
67        if ownerPassphrase is None:
68            ownerPassphrase = getpass.getpass(prompt="\ntest1Store credential "
69                                                     " owner pass-phrase: ")
70
71        certFile = xpdVars(thisSection['signingCertFilePath'])
72        keyFile = xpdVars(thisSection['signingPriKeyFilePath'])
73        ownerCertFile = xpdVars(thisSection['ownerCertFile'])
74        ownerKeyFile = xpdVars(thisSection['ownerKeyFile'])
75           
76        self.clnt.store(thisSection['username'],
77                        passphrase,
78                        certFile,
79                        keyFile,
80                        ownerCertFile=ownerCertFile,
81                        ownerKeyFile=ownerKeyFile,
82                        ownerPassphrase=ownerPassphrase,
83                        force=False)
84        print("Store creds for user %s" % thisSection['username'])
85   
86   
87    def test2GetDelegation(self):
88        '''test2GetDelegation: retrieve proxy cert./private key'''
89        thisSection = self.cfg['test2GetDelegation']
90       
91        passphrase = thisSection.get('passphrase')
92        if passphrase is None:
93            passphrase = getpass.getpass(prompt="\ntest2GetDelegation "
94                                                "passphrase: ")
95         
96        proxyCertFile = xpdVars(thisSection['proxyCertFileOut'])
97        proxyKeyFile = xpdVars(thisSection['proxyKeyFileOut'])
98
99        creds = self.clnt.getDelegation(thisSection['username'], 
100                                        passphrase)
101        print "proxy credentials:" 
102        print ''.join(creds)
103        open(proxyCertFile, 'w').write(creds[0]+''.join(creds[2:]))           
104        open(proxyKeyFile, 'w').write(creds[1])
105
106
107    def test3Info(self):
108        '''test3Info: Retrieve information about a given credential'''
109        thisSection = self.cfg['test3Info']
110       
111        # ownerPassphrase can be omitted from the congif file in which case
112        # the get call below would return None
113        ownerPassphrase = thisSection.get('ownerPassphrase')
114        if ownerPassphrase is None:
115            ownerPassphrase = getpass.getpass(prompt="\ntest3Info owner "
116                                              "credentials passphrase: ")
117
118        credExists, errorTxt, fields = self.clnt.info(
119                                     thisSection['username'],
120                                     xpdVars(thisSection['ownerCertFile']),
121                                     xpdVars(thisSection['ownerKeyFile']),
122                                     ownerPassphrase=ownerPassphrase)
123        print "test3Info... "
124        print "credExists: %s" % credExists
125        print "errorTxt: " + errorTxt
126        print "fields: %s" % fields
127
128
129    def test4ChangePassphrase(self):       
130        """test4ChangePassphrase: change pass-phrase protecting a given
131        credential"""
132        thisSection = self.cfg['test4ChangePassphrase']
133       
134        passphrase = thisSection.get('passphrase')
135        if passphrase is None:
136            passphrase = getpass.getpass(prompt="test4ChangePassphrase - "
137                                                "passphrase: ")
138       
139        newPassphrase = thisSection.get('newPassphrase')
140        if newPassphrase is None:
141            newPassphrase = getpass.getpass(prompt="test4ChangePassphrase "
142                                                   "- new passphrase: ")
143
144            confirmNewPassphrase = getpass.getpass(prompt=\
145                                                   "test4ChangePassphrase "
146                                                   "- confirm new "
147                                                   "passphrase: ")
148
149            if newPassphrase != confirmNewPassphrase:
150                self.fail("New and confirmed new password don't match")
151               
152        ownerPassphrase = thisSection.get('ownerPassphrase') or passphrase
153
154        self.clnt.changePassphrase(thisSection['username'],
155                                   passphrase,
156                                   newPassphrase, 
157                                   xpdVars(thisSection['ownerCertFile']),
158                                   xpdVars(thisSection['ownerKeyFile']),
159                                   ownerPassphrase=ownerPassphrase)
160        print("Changed pass-phrase")
161
162
163    def test5Destroy(self):
164        '''test5Destroy: destroy credentials for a given user'''
165        thisSection = self.cfg['test5Destroy']
166       
167        ownerPassphrase = thisSection.get('ownerPassphrase')
168        if ownerPassphrase is None:
169            ownerPassphrase = getpass.getpass(prompt="\ntest5Destroy "
170                                              "credential owner passphrase: ")
171
172        self.clnt.destroy(thisSection['username'], 
173                          ownerCertFile=xpdVars(thisSection['ownerCertFile']),
174                          ownerKeyFile=xpdVars(thisSection['ownerKeyFile']),
175                          ownerPassphrase=ownerPassphrase)
176        print("Destroy creds for user %s" % thisSection['username'])
177       
178       
179from myproxy.utils.openssl import OpenSSLConfigError
180
181class MyProxyClientInterfaceTestCase(_MyProxyClientTestCase):
182    '''Test interface for correct getting/setting of attributes'''
183   
184    def test01EnvironmentVarsSet(self):
185
186        try:
187            environBackup = os.environ.copy()
188           
189            os.environ['MYPROXY_SERVER'] = 'localhost.domain'
190            os.environ['MYPROXY_SERVER_DN'] = '/O=NDG/OU=Raphael/CN=raphael'
191            os.environ['MYPROXY_SERVER_PORT'] = '20000'
192            client = MyProxyClient(serverCNPrefix='',
193                                   openSSLConfFilePath=mkPath('openssl.conf'),
194                                   proxyCertMaxLifetime=60000,
195                                   proxyCertLifetime=30000,           
196                                   caCertFilePath=mkPath('ndg-test-ca.crt'))
197           
198            self.assert_(client.port == 20000)
199            self.assert_(client.hostname == 'localhost.domain')
200            self.assert_(client.serverDN == '/O=NDG/OU=Raphael/CN=raphael')
201            self.assert_(client.proxyCertMaxLifetime == 60000)
202            self.assert_(client.proxyCertLifetime == 30000)
203            self.assert_(client.openSSLConfFilePath == mkPath('openssl.conf'))
204            self.assert_(client.caCertFilePath == mkPath('ndg-test-ca.crt'))
205        finally:
206            os.environ = environBackup
207           
208
209    def test02SetProperties(self):
210       
211        client = MyProxyClient()
212        try:
213            client.port = None
214            self.fail("Expecting AttributeError raised from port set to "
215                      "invalid type")
216        except AttributeError:
217            pass
218
219        client.port = 8000
220        client.hostname = '127.0.0.1'
221        client.serverDN = '/O=NDG/OU=BADC/CN=raphael'
222        client.proxyCertMaxLifetime = 80000
223        client.proxyCertLifetime = 70000
224       
225        try:
226            client.openSSLConfFilePath = mkPath('ssl.cnf')
227            self.fail("Expecting OpenSSLConfigError raised for invalid file "
228                      "'ssl.cnf'")
229        except OpenSSLConfigError:
230            pass
231       
232        client.caCertFilePath = mkPath('ca.pem') 
233        client.caCertDir = mkPath('/etc/grid-security/certificates') 
234       
235        self.assert_(client.port == 8000)
236        self.assert_(client.hostname == '127.0.0.1')
237        self.assert_(client.serverDN == '/O=NDG/OU=BADC/CN=raphael')
238        self.assert_(client.proxyCertMaxLifetime == 80000)
239        self.assert_(client.proxyCertLifetime == 70000)
240        self.assert_(client.openSSLConfFilePath == mkPath('ssl.cnf'))
241        self.assert_(client.caCertFilePath == mkPath('ca.pem')) 
242        self.assert_(
243                client.caCertDir == mkPath('/etc/grid-security/certificates')) 
244                                       
245if __name__ == "__main__":
246    unittest.main()
Note: See TracBrowser for help on using the repository browser.