source: TI12-security/trunk/MyProxyClient/myproxy/test/test_myproxyclient.py @ 6829

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/MyProxyClient/myproxy/test/test_myproxyclient.py@6829
Revision 6829, 10.6 KB checked in by pjkersha, 10 years ago (diff)

Working getTrustRoots method but SEGV errors with private key conversion to PEM format. Will make a branch to revisit pyOpenSSL and dump M2Crypto.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""MyProxy Client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "02/07/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = """BSD- See LICENSE file in top-level directory"""
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os
17import sys
18import getpass
19import traceback
20from os import path
21
22from M2Crypto import X509
23
24from myproxy.client import CaseSensitiveConfigParser, MyProxyClient
25
26mkPath = lambda file: path.join(os.environ['MYPROXYCLIENT_UNITTEST_DIR'], file)
27
28class _MyProxyClientTestCase(unittest.TestCase):
29    '''Base implements environment settings common to all test case classes'''
30    if 'NDGSEC_INT_DEBUG' in os.environ:
31        import pdb
32        pdb.set_trace()
33   
34    if 'MYPROXYCLIENT_UNITTEST_DIR' not in os.environ:
35        os.environ['MYPROXYCLIENT_UNITTEST_DIR'] = \
36                                        path.abspath(path.dirname(__file__))
37
38
39class MyProxyClientLiveTestCase(_MyProxyClientTestCase):
40    '''Tests require a connection to a real MyProxy service running on a host
41    '''
42    CONFIG_FILENAME = "myProxyClientTest.cfg"
43   
44    def setUp(self):
45       
46        super(MyProxyClientLiveTestCase, self).setUp()
47       
48        configParser = CaseSensitiveConfigParser()
49        configFilePath = path.join(os.environ['MYPROXYCLIENT_UNITTEST_DIR'],
50                                   MyProxyClientLiveTestCase.CONFIG_FILENAME)
51        configParser.read(configFilePath)
52       
53        self.cfg = {}
54        for section in configParser.sections():
55            self.cfg[section] = dict(configParser.items(section))
56       
57        configFilePath = path.expandvars(self.cfg['setUp']['cfgFilePath'])
58        self.clnt = MyProxyClient(cfgFilePath=configFilePath)
59       
60    def test01Store(self):
61        # upload X509 cert and private key to repository
62        thisSection = self.cfg['test01Store']
63       
64        passphrase = thisSection.get('passphrase')
65        if passphrase is None:
66            passphrase = getpass.getpass(prompt="\ntest1Store credential "
67                                                "pass-phrase: ")
68           
69        ownerPassphrase = thisSection.get('ownerPassphrase')
70        if ownerPassphrase is None:
71            ownerPassphrase = getpass.getpass(prompt="\ntest1Store credential "
72                                                     " owner pass-phrase: ")
73
74        certFile = path.expandvars(thisSection['signingCertFilePath'])
75        keyFile = path.expandvars(thisSection['signingPriKeyFilePath'])
76        ownerCertFile = path.expandvars(thisSection['ownerCertFile'])
77        ownerKeyFile = path.expandvars(thisSection['ownerKeyFile'])
78           
79        self.clnt.store(thisSection['username'],
80                        passphrase,
81                        certFile,
82                        keyFile,
83                        ownerCertFile=ownerCertFile,
84                        ownerKeyFile=ownerKeyFile,
85                        ownerPassphrase=ownerPassphrase,
86                        force=False)
87        print("Store creds for user %s" % thisSection['username'])
88   
89    def test02GetDelegation(self):
90        # retrieve proxy cert./private key
91        thisSection = self.cfg['test02GetDelegation']
92       
93        passphrase = thisSection.get('passphrase')
94        if passphrase is None:
95            passphrase = getpass.getpass(prompt="\ntest2GetDelegation "
96                                                "passphrase: ")
97         
98        proxyCertFile = path.expandvars(thisSection['proxyCertFileOut'])
99        proxyKeyFile = path.expandvars(thisSection['proxyKeyFileOut'])
100
101        creds = self.clnt.getDelegation(thisSection['username'], passphrase)
102        print "proxy credentials:" 
103        print ''.join(creds)
104        open(proxyCertFile, 'w').write(creds[0]+''.join(creds[2:]))           
105        open(proxyKeyFile, 'w').write(creds[1])
106
107    def test03Info(self):
108        # Retrieve information about a given credential
109        thisSection = self.cfg['test03Info']
110       
111        # ownerPassphrase can be omitted from the congif file in which case
112        # the get call below would return None
113        ownerPassphrase = thisSection.get('ownerPassphrase')
114        if ownerPassphrase is None:
115            ownerPassphrase = getpass.getpass(prompt="\ntest3Info owner "
116                                              "credentials passphrase: ")
117
118        credExists, errorTxt, fields = self.clnt.info(
119                                     thisSection['username'],
120                                     path.expandvars(thisSection['ownerCertFile']),
121                                     path.expandvars(thisSection['ownerKeyFile']),
122                                     ownerPassphrase=ownerPassphrase)
123        print "test3Info... "
124        print "credExists: %s" % credExists
125        print "errorTxt: " + errorTxt
126        print "fields: %s" % fields
127
128    def test04ChangePassphrase(self):       
129        # change pass-phrase protecting a given credential
130        thisSection = self.cfg['test04ChangePassphrase']
131       
132        passphrase = thisSection.get('passphrase')
133        if passphrase is None:
134            passphrase = getpass.getpass(prompt="test4ChangePassphrase - "
135                                                "passphrase: ")
136       
137        newPassphrase = thisSection.get('newPassphrase')
138        if newPassphrase is None:
139            newPassphrase = getpass.getpass(prompt="test4ChangePassphrase "
140                                                   "- new passphrase: ")
141
142            confirmNewPassphrase = getpass.getpass(prompt=\
143                                                   "test4ChangePassphrase "
144                                                   "- confirm new "
145                                                   "passphrase: ")
146
147            if newPassphrase != confirmNewPassphrase:
148                self.fail("New and confirmed new password don't match")
149               
150        ownerPassphrase = thisSection.get('ownerPassphrase') or passphrase
151
152        self.clnt.changePassphrase(thisSection['username'],
153                               passphrase,
154                               newPassphrase, 
155                               path.expandvars(thisSection['ownerCertFile']),
156                               path.expandvars(thisSection['ownerKeyFile']),
157                               ownerPassphrase=ownerPassphrase)
158        print("Changed pass-phrase")
159
160    def test05Destroy(self):
161        # destroy credentials for a given user
162        thisSection = self.cfg['test05Destroy']
163       
164        ownerPassphrase = thisSection.get('ownerPassphrase')
165        if ownerPassphrase is None:
166            ownerPassphrase = getpass.getpass(prompt="\ntest5Destroy "
167                                              "credential owner passphrase: ")
168
169        self.clnt.destroy(thisSection['username'], 
170                  ownerCertFile=path.expandvars(thisSection['ownerCertFile']),
171                  ownerKeyFile=path.expandvars(thisSection['ownerKeyFile']),
172                  ownerPassphrase=ownerPassphrase)
173        print("Destroy creds for user %s" % thisSection['username'])
174       
175    def test06GetTrustRoots(self):
176        # Test get trust roots command
177        trustRoots = self.clnt.getTrustRoots()
178        self.assert_(trustRoots)
179        self.assert_(isinstance(trustRoots, dict))
180        self.assert_(len(trustRoots) > 0)
181        for fileName, fileContents in trustRoots.items():
182            if fileName.endswith('.0'):
183                # test parsing certificate
184                cert = X509.load_cert_string(fileContents, X509.FORMAT_PEM)
185                self.assert_(cert)
186                self.assert_(isinstance(cert, X509.X509))
187                subj = cert.get_subject()
188                self.assert_(subj)
189                print("Trust root certificate retrieved with DN=%s" % subj)
190               
191       
192from myproxy.utils.openssl import OpenSSLConfigError
193
194class MyProxyClientInterfaceTestCase(_MyProxyClientTestCase):
195    '''Test interface for correct getting/setting of attributes'''
196   
197    def test01EnvironmentVarsSet(self):
198
199        try:
200            environBackup = os.environ.copy()
201           
202            os.environ['MYPROXY_SERVER'] = 'localhost.domain'
203            os.environ['MYPROXY_SERVER_DN'] = '/O=NDG/OU=Raphael/CN=raphael'
204            os.environ['MYPROXY_SERVER_PORT'] = '20000'
205            client = MyProxyClient(serverCNPrefix='',
206                                   openSSLConfFilePath=mkPath('openssl.conf'),
207                                   proxyCertMaxLifetime=60000,
208                                   proxyCertLifetime=30000,           
209                                   caCertFilePath=mkPath('ndg-test-ca.crt'))
210           
211            self.assert_(client.port == 20000)
212            self.assert_(client.hostname == 'localhost.domain')
213            self.assert_(client.serverDN == '/O=NDG/OU=Raphael/CN=raphael')
214            self.assert_(client.proxyCertMaxLifetime == 60000)
215            self.assert_(client.proxyCertLifetime == 30000)
216            self.assert_(client.openSSLConfFilePath == mkPath('openssl.conf'))
217            self.assert_(client.caCertFilePath == mkPath('ndg-test-ca.crt'))
218        finally:
219            os.environ = environBackup
220           
221
222    def test02SetProperties(self):
223       
224        client = MyProxyClient()
225        try:
226            client.port = None
227            self.fail("Expecting AttributeError raised from port set to "
228                      "invalid type")
229        except AttributeError:
230            pass
231
232        client.port = 8000
233        client.hostname = '127.0.0.1'
234        client.serverDN = '/O=NDG/OU=BADC/CN=raphael'
235        client.proxyCertMaxLifetime = 80000
236        client.proxyCertLifetime = 70000
237       
238        try:
239            client.openSSLConfFilePath = mkPath('ssl.cnf')
240            self.fail("Expecting OpenSSLConfigError raised for invalid file "
241                      "'ssl.cnf'")
242        except OpenSSLConfigError:
243            pass
244       
245        client.caCertFilePath = mkPath('ca.pem') 
246        client.caCertDir = mkPath('/etc/grid-security/certificates') 
247       
248        self.assert_(client.port == 8000)
249        self.assert_(client.hostname == '127.0.0.1')
250        self.assert_(client.serverDN == '/O=NDG/OU=BADC/CN=raphael')
251        self.assert_(client.proxyCertMaxLifetime == 80000)
252        self.assert_(client.proxyCertLifetime == 70000)
253        self.assert_(client.openSSLConfFilePath == mkPath('ssl.cnf'))
254        self.assert_(client.caCertFilePath == mkPath('ca.pem')) 
255        self.assert_(
256                client.caCertDir == mkPath('/etc/grid-security/certificates')) 
257                                       
258if __name__ == "__main__":
259    unittest.main()
Note: See TracBrowser for help on using the repository browser.