source: TI12-security/trunk/python/MyProxyClient/test/test_myproxyclient.py @ 4637

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/MyProxyClient/test/test_myproxyclient.py@4637
Revision 4637, 9.9 KB checked in by pjkersha, 11 years ago (diff)

#941: completed unit tests. TODO:

  • test egg deployment
  • finalising licensing.
  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG MyProxy client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "02/07/07"
8__copyright__ = "(C) 2007 STFC & NERC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id$'
14
15import unittest
16import os
17import sys
18import getpass
19import traceback
20
21from myproxy import CaseSensitiveConfigParser, MyProxyClient
22
23xpdVars = os.path.expandvars
24jnPath = os.path.join
25mkPath = lambda file: jnPath(os.environ['MYPROXYCLIENT_UNITTEST_DIR'], file)
26
27class _MyProxyClientTestCase(unittest.TestCase):
28    '''Base implements environment settings common to all test case classes'''
29    if 'NDGSEC_INT_DEBUG' in os.environ:
30        import pdb
31        pdb.set_trace()
32   
33    if 'MYPROXYCLIENT_UNITTEST_DIR' not in os.environ:
34        os.environ['MYPROXYCLIENT_UNITTEST_DIR'] = \
35            os.path.abspath(os.path.dirname(__file__))
36
37
38class MyProxyClientLiveTestCase(_MyProxyClientTestCase):
39    '''Tests require a connection to a real MyProxy service running on a host
40    '''
41   
42    def setUp(self):
43       
44        super(MyProxyClientLiveTestCase, self).setUp()
45       
46        configParser = CaseSensitiveConfigParser()
47        configFilePath = jnPath(os.environ['MYPROXYCLIENT_UNITTEST_DIR'],
48                                "myProxyClientTest.cfg")
49        configParser.read(configFilePath)
50       
51        self.cfg = {}
52        for section in configParser.sections():
53            self.cfg[section] = dict(configParser.items(section))
54           
55        self.clnt = MyProxyClient(
56                        cfgFilePath=xpdVars(self.cfg['setUp']['cfgFilePath']))
57       
58
59    def test1Store(self):
60        '''test1Store: upload X509 cert and private key to repository'''
61        thisSection = self.cfg['test1Store']
62       
63        passphrase = thisSection.get('passphrase')
64        if passphrase is None:
65            passphrase = getpass.getpass(prompt="\ntest1Store credential "
66                                                "pass-phrase: ")
67           
68        ownerPassphrase = thisSection.get('ownerPassphrase')
69        if ownerPassphrase is None:
70            ownerPassphrase = getpass.getpass(prompt="\ntest1Store credential "
71                                                     " owner pass-phrase: ")
72
73        certFile = xpdVars(thisSection['signingCertFilePath'])
74        keyFile = xpdVars(thisSection['signingPriKeyFilePath'])
75        ownerCertFile = xpdVars(thisSection['ownerCertFile'])
76        ownerKeyFile = xpdVars(thisSection['ownerKeyFile'])
77           
78        self.clnt.store(thisSection['username'],
79                        passphrase,
80                        certFile,
81                        keyFile,
82                        ownerCertFile=ownerCertFile,
83                        ownerKeyFile=ownerKeyFile,
84                        ownerPassphrase=ownerPassphrase,
85                        force=False)
86        print("Store creds for user %s" % thisSection['username'])
87   
88   
89    def test2GetDelegation(self):
90        '''test2GetDelegation: retrieve proxy cert./private key'''
91        thisSection = self.cfg['test2GetDelegation']
92       
93        passphrase = thisSection.get('passphrase')
94        if passphrase is None:
95            passphrase = getpass.getpass(prompt="\ntest2GetDelegation "
96                                                "passphrase: ")
97         
98        proxyCertFile = xpdVars(thisSection['proxyCertFileOut'])
99        proxyKeyFile = xpdVars(thisSection['proxyKeyFileOut'])
100
101        creds = self.clnt.getDelegation(thisSection['username'], 
102                                        passphrase)
103        print "proxy credentials:" 
104        print ''.join(creds)
105        open(proxyCertFile, 'w').write(creds[0]+''.join(creds[2:]))           
106        open(proxyKeyFile, 'w').write(creds[1])
107
108
109    def test3Info(self):
110        '''test3Info: Retrieve information about a given credential'''
111        thisSection = self.cfg['test3Info']
112       
113        # ownerPassphrase can be omitted from the congif file in which case
114        # the get call below would return None
115        ownerPassphrase = thisSection.get('ownerPassphrase')
116        if ownerPassphrase is None:
117            ownerPassphrase = getpass.getpass(prompt="\ntest3Info owner "
118                                              "credentials passphrase: ")
119
120        credExists, errorTxt, fields = self.clnt.info(
121                                     thisSection['username'],
122                                     xpdVars(thisSection['ownerCertFile']),
123                                     xpdVars(thisSection['ownerKeyFile']),
124                                     ownerPassphrase=ownerPassphrase)
125        print "test3Info... "
126        print "credExists: %s" % credExists
127        print "errorTxt: " + errorTxt
128        print "fields: %s" % fields
129
130
131    def test4ChangePassphrase(self):       
132        """test4ChangePassphrase: change pass-phrase protecting a given
133        credential"""
134        thisSection = self.cfg['test4ChangePassphrase']
135       
136        passphrase = thisSection.get('passphrase')
137        if passphrase is None:
138            passphrase = getpass.getpass(prompt="test4ChangePassphrase - "
139                                                "passphrase: ")
140       
141        newPassphrase = thisSection.get('newPassphrase')
142        if newPassphrase is None:
143            newPassphrase = getpass.getpass(prompt="test4ChangePassphrase "
144                                                   "- new passphrase: ")
145
146            confirmNewPassphrase = getpass.getpass(prompt=\
147                                                   "test4ChangePassphrase "
148                                                   "- confirm new "
149                                                   "passphrase: ")
150
151            if newPassphrase != confirmNewPassphrase:
152                self.fail("New and confirmed new password don't match")
153               
154        ownerPassphrase = thisSection.get('ownerPassphrase') or passphrase
155
156        self.clnt.changePassphrase(thisSection['username'],
157                                   passphrase,
158                                   newPassphrase, 
159                                   xpdVars(thisSection['ownerCertFile']),
160                                   xpdVars(thisSection['ownerKeyFile']),
161                                   ownerPassphrase=ownerPassphrase)
162        print("Changed pass-phrase")
163
164
165    def test5Destroy(self):
166        '''test5Destroy: destroy credentials for a given user'''
167        thisSection = self.cfg['test5Destroy']
168       
169        ownerPassphrase = thisSection.get('ownerPassphrase')
170        if ownerPassphrase is None:
171            ownerPassphrase = getpass.getpass(prompt="\ntest5Destroy "
172                                              "credential owner passphrase: ")
173
174        self.clnt.destroy(thisSection['username'], 
175                          ownerCertFile=xpdVars(thisSection['ownerCertFile']),
176                          ownerKeyFile=xpdVars(thisSection['ownerKeyFile']),
177                          ownerPassphrase=ownerPassphrase)
178        print("Destroy creds for user %s" % thisSection['username'])
179       
180       
181from myproxy import OpenSSLConfigError
182
183class MyProxyClientInterfaceTestCase(_MyProxyClientTestCase):
184    '''Test interface for correct getting/setting of attributes'''
185   
186    def test01EnvironmentVarsSet(self):
187
188        try:
189            environBackup = os.environ.copy()
190           
191            os.environ['MYPROXY_SERVER'] = 'localhost.domain'
192            os.environ['MYPROXY_SERVER_DN'] = '/O=NDG/OU=Raphael/CN=raphael'
193            os.environ['MYPROXY_SERVER_PORT'] = '20000'
194            client = MyProxyClient(serverCNPrefix='',
195                                   openSSLConfFilePath=mkPath('openssl.conf'),
196                                   proxyCertMaxLifetime=60000,
197                                   proxyCertLifetime=30000,           
198                                   caCertFilePath=mkPath('ndg-test-ca.crt'))
199           
200            self.assert_(client.port == 20000)
201            self.assert_(client.hostname == 'localhost.domain')
202            self.assert_(client.serverDN == '/O=NDG/OU=Raphael/CN=raphael')
203            self.assert_(client.proxyCertMaxLifetime == 60000)
204            self.assert_(client.proxyCertLifetime == 30000)
205            self.assert_(client.openSSLConfFilePath == mkPath('openssl.conf'))
206            self.assert_(client.caCertFilePath == mkPath('ndg-test-ca.crt'))
207        finally:
208            os.environ = environBackup
209           
210
211    def test02SetProperties(self):
212       
213        client = MyProxyClient()
214        try:
215            client.port = None
216            self.fail("Expecting AttributeError raised from port set to "
217                      "invalid type")
218        except AttributeError:
219            pass
220
221        client.port = 8000
222        client.hostname = '127.0.0.1'
223        client.serverDN = '/O=NDG/OU=BADC/CN=raphael'
224        client.proxyCertMaxLifetime = 80000
225        client.proxyCertLifetime = 70000
226       
227        try:
228            client.openSSLConfFilePath = mkPath('ssl.cnf')
229            self.fail("Expecting OpenSSLConfigError raised for invalid file "
230                      "'ssl.cnf'")
231        except OpenSSLConfigError:
232            pass
233       
234        client.caCertFilePath = mkPath('ca.pem') 
235        client.caCertDir = mkPath('/etc/grid-security/certificates') 
236       
237        self.assert_(client.port == 8000)
238        self.assert_(client.hostname == '127.0.0.1')
239        self.assert_(client.serverDN == '/O=NDG/OU=BADC/CN=raphael')
240        self.assert_(client.proxyCertMaxLifetime == 80000)
241        self.assert_(client.proxyCertLifetime == 70000)
242        self.assert_(client.openSSLConfFilePath == mkPath('ssl.cnf'))
243        self.assert_(client.caCertFilePath == mkPath('ca.pem')) 
244        self.assert_(
245                client.caCertDir == mkPath('/etc/grid-security/certificates')) 
246                                       
247if __name__ == "__main__":
248    unittest.main()
Note: See TracBrowser for help on using the repository browser.