source: TI12-security/trunk/python/MyProxyClient/test/test_myproxyclient.py @ 4644

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/MyProxyClient/test/test_myproxyclient.py@4644
Revision 4644, 9.9 KB checked in by pjkersha, 11 years ago (diff)

#941: MyProxyClient egg: new package structure myproxy.client.MyProxyClient? + myproxy.client.utils

  • Property svn:executable set to *
  • Property svn:keywords set to Id
RevLine 
[1858]1#!/usr/bin/env python
2"""NDG MyProxy client unit tests
3
4NERC Data Grid Project
5"""
[2909]6__author__ = "P J Kershaw"
7__date__ = "02/07/07"
8__copyright__ = "(C) 2007 STFC & NERC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
[4404]12__contact__ = "Philip.Kershaw@stfc.ac.uk"
[2270]13__revision__ = '$Id$'
[1967]14
[1858]15import unittest
16import os
17import sys
[1861]18import getpass
19import traceback
[1858]20
[4644]21from myproxy.client import CaseSensitiveConfigParser, MyProxyClient
[1858]22
[3176]23xpdVars = os.path.expandvars
24jnPath = os.path.join
[4624]25mkPath = lambda file: jnPath(os.environ['MYPROXYCLIENT_UNITTEST_DIR'], file)
[3176]26
[4637]27class _MyProxyClientTestCase(unittest.TestCase):
28    '''Base implements environment settings common to all test case classes'''
29    if 'NDGSEC_INT_DEBUG' in os.environ:
30        import pdb
31        pdb.set_trace()
[1858]32   
[4637]33    if 'MYPROXYCLIENT_UNITTEST_DIR' not in os.environ:
34        os.environ['MYPROXYCLIENT_UNITTEST_DIR'] = \
35            os.path.abspath(os.path.dirname(__file__))
36
37
38class MyProxyClientLiveTestCase(_MyProxyClientTestCase):
39    '''Tests require a connection to a real MyProxy service running on a host
40    '''
41   
[1858]42    def setUp(self):
43       
[4637]44        super(MyProxyClientLiveTestCase, self).setUp()
[3176]45       
[4624]46        configParser = CaseSensitiveConfigParser()
47        configFilePath = jnPath(os.environ['MYPROXYCLIENT_UNITTEST_DIR'],
[3176]48                                "myProxyClientTest.cfg")
49        configParser.read(configFilePath)
[1858]50       
51        self.cfg = {}
52        for section in configParser.sections():
53            self.cfg[section] = dict(configParser.items(section))
54           
[4624]55        self.clnt = MyProxyClient(
56                        cfgFilePath=xpdVars(self.cfg['setUp']['cfgFilePath']))
[1861]57       
58
59    def test1Store(self):
60        '''test1Store: upload X509 cert and private key to repository'''
[4624]61        thisSection = self.cfg['test1Store']
62       
63        passphrase = thisSection.get('passphrase')
[2893]64        if passphrase is None:
[4624]65            passphrase = getpass.getpass(prompt="\ntest1Store credential "
66                                                "pass-phrase: ")
[1945]67           
[4624]68        ownerPassphrase = thisSection.get('ownerPassphrase')
[2893]69        if ownerPassphrase is None:
[4624]70            ownerPassphrase = getpass.getpass(prompt="\ntest1Store credential "
71                                                     " owner pass-phrase: ")
[3176]72
[4624]73        certFile = xpdVars(thisSection['signingCertFilePath'])
74        keyFile = xpdVars(thisSection['signingPriKeyFilePath'])
75        ownerCertFile = xpdVars(thisSection['ownerCertFile'])
76        ownerKeyFile = xpdVars(thisSection['ownerKeyFile'])
[1861]77           
[4624]78        self.clnt.store(thisSection['username'],
79                        passphrase,
80                        certFile,
81                        keyFile,
82                        ownerCertFile=ownerCertFile,
83                        ownerKeyFile=ownerKeyFile,
84                        ownerPassphrase=ownerPassphrase,
85                        force=False)
[4637]86        print("Store creds for user %s" % thisSection['username'])
[1858]87   
88   
[1861]89    def test2GetDelegation(self):
90        '''test2GetDelegation: retrieve proxy cert./private key'''
[4624]91        thisSection = self.cfg['test2GetDelegation']
92       
93        passphrase = thisSection.get('passphrase')
[2893]94        if passphrase is None:
[4624]95            passphrase = getpass.getpass(prompt="\ntest2GetDelegation "
[4637]96                                                "passphrase: ")
[1861]97         
[4624]98        proxyCertFile = xpdVars(thisSection['proxyCertFileOut'])
[4637]99        proxyKeyFile = xpdVars(thisSection['proxyKeyFileOut'])
[3176]100
[4624]101        creds = self.clnt.getDelegation(thisSection['username'], 
102                                        passphrase)
103        print "proxy credentials:" 
104        print ''.join(creds)
105        open(proxyCertFile, 'w').write(creds[0]+''.join(creds[2:]))           
106        open(proxyKeyFile, 'w').write(creds[1])
[1858]107
108
[1861]109    def test3Info(self):
110        '''test3Info: Retrieve information about a given credential'''
[4637]111        thisSection = self.cfg['test3Info']
[1945]112       
[4624]113        # ownerPassphrase can be omitted from the congif file in which case
[1945]114        # the get call below would return None
[4624]115        ownerPassphrase = thisSection.get('ownerPassphrase')
[2893]116        if ownerPassphrase is None:
[4624]117            ownerPassphrase = getpass.getpass(prompt="\ntest3Info owner "
[4637]118                                              "credentials passphrase: ")
[1881]119
[4624]120        credExists, errorTxt, fields = self.clnt.info(
121                                     thisSection['username'],
122                                     xpdVars(thisSection['ownerCertFile']),
123                                     xpdVars(thisSection['ownerKeyFile']),
124                                     ownerPassphrase=ownerPassphrase)
125        print "test3Info... "
126        print "credExists: %s" % credExists
127        print "errorTxt: " + errorTxt
128        print "fields: %s" % fields
[1858]129
130
[1861]131    def test4ChangePassphrase(self):       
132        """test4ChangePassphrase: change pass-phrase protecting a given
[1858]133        credential"""
[4624]134        thisSection = self.cfg['test4ChangePassphrase']
135       
136        passphrase = thisSection.get('passphrase')
137        if passphrase is None:
138            passphrase = getpass.getpass(prompt="test4ChangePassphrase - "
[4637]139                                                "passphrase: ")
[4624]140       
[4637]141        newPassphrase = thisSection.get('newPassphrase')
[4624]142        if newPassphrase is None:
143            newPassphrase = getpass.getpass(prompt="test4ChangePassphrase "
[4637]144                                                   "- new passphrase: ")
[1861]145
[4624]146            confirmNewPassphrase = getpass.getpass(prompt=\
147                                                   "test4ChangePassphrase "
148                                                   "- confirm new "
[4637]149                                                   "passphrase: ")
[1861]150
[4624]151            if newPassphrase != confirmNewPassphrase:
152                self.fail("New and confirmed new password don't match")
153               
154        ownerPassphrase = thisSection.get('ownerPassphrase') or passphrase
155
156        self.clnt.changePassphrase(thisSection['username'],
157                                   passphrase,
158                                   newPassphrase, 
159                                   xpdVars(thisSection['ownerCertFile']),
160                                   xpdVars(thisSection['ownerKeyFile']),
161                                   ownerPassphrase=ownerPassphrase)
[4637]162        print("Changed pass-phrase")
[4624]163
164
[1861]165    def test5Destroy(self):
166        '''test5Destroy: destroy credentials for a given user'''
[4637]167        thisSection = self.cfg['test5Destroy']
168       
169        ownerPassphrase = thisSection.get('ownerPassphrase')
[2893]170        if ownerPassphrase is None:
[4624]171            ownerPassphrase = getpass.getpass(prompt="\ntest5Destroy "
[4637]172                                              "credential owner passphrase: ")
[1861]173
[4637]174        self.clnt.destroy(thisSection['username'], 
175                          ownerCertFile=xpdVars(thisSection['ownerCertFile']),
176                          ownerKeyFile=xpdVars(thisSection['ownerKeyFile']),
177                          ownerPassphrase=ownerPassphrase)
178        print("Destroy creds for user %s" % thisSection['username'])
179       
180       
181from myproxy import OpenSSLConfigError
182
183class MyProxyClientInterfaceTestCase(_MyProxyClientTestCase):
184    '''Test interface for correct getting/setting of attributes'''
185   
186    def test01EnvironmentVarsSet(self):
187
[1861]188        try:
[4637]189            environBackup = os.environ.copy()
190           
191            os.environ['MYPROXY_SERVER'] = 'localhost.domain'
192            os.environ['MYPROXY_SERVER_DN'] = '/O=NDG/OU=Raphael/CN=raphael'
193            os.environ['MYPROXY_SERVER_PORT'] = '20000'
194            client = MyProxyClient(serverCNPrefix='',
195                                   openSSLConfFilePath=mkPath('openssl.conf'),
196                                   proxyCertMaxLifetime=60000,
197                                   proxyCertLifetime=30000,           
198                                   caCertFilePath=mkPath('ndg-test-ca.crt'))
199           
200            self.assert_(client.port == 20000)
201            self.assert_(client.hostname == 'localhost.domain')
202            self.assert_(client.serverDN == '/O=NDG/OU=Raphael/CN=raphael')
203            self.assert_(client.proxyCertMaxLifetime == 60000)
204            self.assert_(client.proxyCertLifetime == 30000)
205            self.assert_(client.openSSLConfFilePath == mkPath('openssl.conf'))
206            self.assert_(client.caCertFilePath == mkPath('ndg-test-ca.crt'))
207        finally:
208            os.environ = environBackup
209           
210
211    def test02SetProperties(self):
[1858]212       
[4637]213        client = MyProxyClient()
214        try:
215            client.port = None
216            self.fail("Expecting AttributeError raised from port set to "
217                      "invalid type")
218        except AttributeError:
219            pass
220
221        client.port = 8000
222        client.hostname = '127.0.0.1'
223        client.serverDN = '/O=NDG/OU=BADC/CN=raphael'
224        client.proxyCertMaxLifetime = 80000
225        client.proxyCertLifetime = 70000
226       
227        try:
228            client.openSSLConfFilePath = mkPath('ssl.cnf')
229            self.fail("Expecting OpenSSLConfigError raised for invalid file "
230                      "'ssl.cnf'")
231        except OpenSSLConfigError:
232            pass
233       
234        client.caCertFilePath = mkPath('ca.pem') 
235        client.caCertDir = mkPath('/etc/grid-security/certificates') 
236       
237        self.assert_(client.port == 8000)
238        self.assert_(client.hostname == '127.0.0.1')
239        self.assert_(client.serverDN == '/O=NDG/OU=BADC/CN=raphael')
240        self.assert_(client.proxyCertMaxLifetime == 80000)
241        self.assert_(client.proxyCertLifetime == 70000)
242        self.assert_(client.openSSLConfFilePath == mkPath('ssl.cnf'))
243        self.assert_(client.caCertFilePath == mkPath('ca.pem')) 
244        self.assert_(
245                client.caCertDir == mkPath('/etc/grid-security/certificates')) 
[1858]246                                       
247if __name__ == "__main__":
248    unittest.main()
Note: See TracBrowser for help on using the repository browser.