source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py @ 3040

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py@3040
Revision 3040, 15.2 KB checked in by pjkersha, 13 years ago (diff)

python/ndg.security.server/ndg/security/server/conf/attAuthority.tac: fixed bug in soap_getTrustedHostInfo - role list was not being copied to the output for serialisation.

python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py: added assertion statements for testing output from test4GetTrustedHostInfoWithNoRole

python/ndg.security.test/ndg/security/test/AttAuthority/siteAMapConfig.xml,
python/ndg.security.test/ndg/security/test/AttAuthority/siteBMapConfig.xml: ensure every field is filled so that AA get host info calls may be correctly validated

python/ndg.security.test/ndg/security/test/sessionMgr/test.py: new Session Manager server side code unit tests - incomplete.

python/ndg.security.common/ndg/security/common/AttAuthority/init.py: cosmetic change

python/ndg.security.common/ndg/security/common/CredWallet.py: make log message for getAttCert clearer.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2007 STFC & NERC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "P.J.Kershaw@rl.ac.uk"
13__revision__ = '$Id$'
14
15import unittest
16import os, sys, getpass, re
17from ConfigParser import SafeConfigParser
18
19from ndg.security.common.AttAuthority import AttAuthorityClient
20from ndg.security.common.AttCert import AttCertRead
21from ndg.security.common.X509 import X509CertParse, X509CertRead
22
23
24class AttAuthorityClientTestCase(unittest.TestCase):
25    clntPriKeyPwd = None
26    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
27
28    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
29        '''Read proxy cert and user cert from a single PEM file and put in
30        a list ready for input into SignatureHandler'''               
31        proxyCertFileTxt = open(proxyCertFilePath).read()
32       
33        pemPatRE = re.compile(self.__class__.pemPat, re.S)
34        x509CertList = pemPatRE.findall(proxyCertFileTxt)
35       
36        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
37                            x509CertList]
38   
39        # Expecting proxy cert first - move this to the end.  This will
40        # be the cert used to verify the message signature
41        signingCertChain.reverse()
42       
43        return signingCertChain
44
45
46    def setUp(self):
47
48        configParser = SafeConfigParser()
49        configParser.read("./attAuthorityClientTest.cfg")
50       
51        self.cfg = {}
52        for section in configParser.sections():
53            self.cfg[section] = dict(configParser.items(section))
54
55        tracefile = sys.stderr
56
57        if self.clntPriKeyPwd is None:
58            try:
59                if self.cfg['setUp'].get('clntprikeypwd') is None:
60                    self.clntPriKeyPwd = getpass.getpass(\
61                            prompt="\nsetUp - client private key password: ")
62                else:
63                    self.clntPriKeyPwd=self.cfg['setUp'].get('clntprikeypwd')
64            except KeyboardInterrupt:
65                sys.exit(0)
66
67        # List of CA certificates for use in validation of certs used in
68        # signature for server reponse
69        try:
70            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
71        except KeyError:
72            caCertFilePathList = []
73         
74        try:
75            sslCACertList = [X509CertRead(file) for file in \
76                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
77        except KeyError:
78            sslCACertList = []
79           
80         
81        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
82
83        # Check certificate types proxy or standard
84        proxyCertFilePath = self.cfg['setUp'].get('proxycertfilepath')
85        if proxyCertFilePath:
86            signingCertChain = \
87                        self._getCertChainFromProxyCertFile(proxyCertFilePath)
88        else:
89            signingCertChain = None
90               
91        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
92
93        # Instantiate WS proxy
94        self.clnt = AttAuthorityClient(uri=self.cfg['setUp']['uri'],
95            sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
96            sslCACertList=sslCACertList,
97            setSignatureHandler=setSignatureHandler,
98            reqBinSecTokValType=reqBinSecTokValType,
99            signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
100            signingCertChain=signingCertChain,
101            signingPriKeyFilePath=self.cfg['setUp'].get('clntprikeyfilepath'),
102            signingPriKeyPwd=self.clntPriKeyPwd,
103            caCertFilePathList=caCertFilePathList,
104            tracefile=sys.stderr)
105           
106   
107    def test1GetX509Cert(self):
108        '''test1GetX509Cert: retrieve Attribute Authority's X.509 cert.'''
109        resp = self.clnt.getX509Cert()
110        print "Attribute Authority X.509 cert.:\n" + resp
111
112    def test2GetHostInfo(self):
113        """test2GetHostInfo: retrieve info for AA host"""
114        hostInfo = self.clnt.getHostInfo()
115        print "Host Info:\n %s" % hostInfo
116       
117
118    def test3GetTrustedHostInfo(self):
119        """test3GetTrustedHostInfo: retrieve trusted host info matching a
120        given role"""
121        trustedHostInfo = self.clnt.getTrustedHostInfo(\
122                                 self.cfg['test3GetTrustedHostInfo']['role'])
123        for hostname, hostInfo in trustedHostInfo.items():
124            assert hostname, "Hostname not set"
125            for k, v in hostInfo.items():
126                assert k, "hostInfo value key unset"
127
128        print "Trusted Host Info:\n %s" % trustedHostInfo
129
130
131    def test4GetTrustedHostInfoWithNoRole(self):
132        """test4GetTrustedHostInfoWithNoRole: retrieve trusted host info
133        irrespective of role"""
134        trustedHostInfo = self.clnt.getTrustedHostInfo()
135        for hostname, hostInfo in trustedHostInfo.items():
136            assert hostname, "Hostname not set"
137            for k, v in hostInfo.items():
138                assert k, "hostInfo value key unset"
139                assert v, ("%s value not set" % k)
140                   
141        print "Trusted Host Info:\n %s" % trustedHostInfo
142       
143
144    def test4aGetAllHostsInfo(self):
145        """test4aGetAllHostsInfo: retrieve info for all hosts"""
146        allHostInfo = self.clnt.getAllHostsInfo()
147        for hostname, hostInfo in allHostInfo.items():
148            assert hostname, "Hostname not set"
149            for k, v in hostInfo.items():
150                assert k, "hostInfo value key unset"
151                   
152        print "All Hosts Info:\n %s" % allHostInfo
153
154
155    def test5GetAttCert(self):       
156        """test5GetAttCert: Request attribute certificate from NDG Attribute
157        Authority Web Service."""
158   
159        # Read user Certificate into a string ready for passing via WS
160        try:
161            userCertFilePath = \
162                self.cfg['test5GetAttCert'].get('issuingclntcertfilepath')
163            userCertTxt = open(userCertFilePath, 'r').read()
164       
165        except TypeError:
166            # No issuing cert set
167            userCertTxt = None
168               
169        except IOError, ioErr:
170            raise "Error reading certificate file \"%s\": %s" % \
171                                    (ioErr.filename, ioErr.strerror)
172
173        # Make attribute certificate request
174        attCert = self.clnt.getAttCert(userCert=userCertTxt)
175       
176        print "Attribute Certificate: \n\n:" + str(attCert)
177       
178        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
179        attCert.write()
180       
181    def test6GetAttCertWithUserIdSet(self):       
182        """test6GetAttCertWithUserIdSet: Request attribute certificate from
183        NDG Attribute Authority Web Service setting a specific user Id
184        independent of the signer of the SOAP request."""
185   
186        # Read user Certificate into a string ready for passing via WS
187        try:
188            userCertFilePath = \
189    self.cfg['test6GetAttCertWithUserIdSet'].get('issuingclntcertfilepath')
190            userCertTxt = open(userCertFilePath, 'r').read()
191       
192        except TypeError:
193            # No issuing cert set
194            userCertTxt = None
195               
196        except IOError, ioErr:
197            raise "Error reading certificate file \"%s\": %s" % \
198                                    (ioErr.filename, ioErr.strerror)
199
200        # Make attribute certificate request
201        userId = self.cfg['test6GetAttCertWithUserIdSet']['userid']
202        attCert = self.clnt.getAttCert(userId=userId,
203                                       userCert=userCertTxt)
204       
205        print "Attribute Certificate: \n\n:" + str(attCert)
206       
207        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
208        attCert.write()
209
210    def test7GetMappedAttCert(self):       
211        """test7GetMappedAttCert: Request mapped attribute certificate from
212        NDG Attribute Authority Web Service."""
213   
214        # Read user Certificate into a string ready for passing via WS
215        try:
216            userCertFilePath = \
217            self.cfg['test7GetMappedAttCert'].get('issuingclntcertfilepath')
218            userCertTxt = open(userCertFilePath, 'r').read()
219       
220        except TypeError:
221            # No issuing cert set
222            userCertTxt = None
223               
224        except IOError, ioErr:
225            raise "Error reading certificate file \"%s\": %s" % \
226                                    (ioErr.filename, ioErr.strerror)
227   
228   
229        # Simlarly for Attribute Certificate
230        try:
231            userAttCert = AttCertRead(\
232                self.cfg['test7GetMappedAttCert']['userattcertfilepath'])
233           
234        except IOError, ioErr:
235            raise "Error reading attribute certificate file \"%s\": %s" %\
236                                    (ioErr.filename, ioErr.strerror)
237
238        try:
239            if self.cfg['test7GetMappedAttCert'].get('clntprikeypwd') is None:
240                clntPriKeyPwd = getpass.getpass(\
241                            prompt="\nsetUp - client private key password: ")
242            else:
243                clntPriKeyPwd = \
244                        self.cfg['test7GetMappedAttCert'].get('clntprikeypwd')
245        except KeyboardInterrupt:
246            sys.exit(0)
247
248        # List of CA certificates for use in validation of certs used in
249        # signature for server reponse
250        try:
251            caCertFilePathList=\
252            self.cfg['test7GetMappedAttCert']['cacertfilepathlist'].split()
253        except:
254            caCertFilePathList = []
255           
256        reqBinSecTokValType = \
257                self.cfg['test7GetMappedAttCert'].get('reqbinsectokvaltype')
258       
259        # Check certificate types proxy or standard
260        proxyCertFilePath = \
261                    self.cfg['test7GetMappedAttCert'].get('proxycertfilepath')
262        if proxyCertFilePath:
263            signingCertChain = \
264                        self._getCertChainFromProxyCertFile(proxyCertFilePath)       
265        else:
266            signingCertChain = None
267
268        setSignatureHandler = \
269                eval(self.cfg['test7GetMappedAttCert']['setsignaturehandler'])
270       
271        # Make client to site B Attribute Authority
272        clnt = AttAuthorityClient(\
273uri=self.cfg['test7GetMappedAttCert']['uri'], 
274setSignatureHandler=setSignatureHandler,
275reqBinSecTokValType=reqBinSecTokValType,
276signingCertFilePath=self.cfg['test7GetMappedAttCert'].get('clntcertfilepath'),
277signingCertChain=signingCertChain,
278signingPriKeyFilePath=self.cfg['test7GetMappedAttCert'].get('clntprikeyfilepath'),
279signingPriKeyPwd=clntPriKeyPwd,
280caCertFilePathList=caCertFilePathList,
281tracefile=sys.stderr)
282   
283        # Make attribute certificate request
284        attCert = clnt.getAttCert(userCert=userCertTxt,
285                                  userAttCert=userAttCert)
286        print "Attribute Certificate: \n\n:" + str(attCert)
287       
288        attCert.filePath = \
289                    self.cfg['test7GetMappedAttCert']['mappedattcertfilepath']
290        attCert.write()
291       
292       
293    def test8GetMappedAttCertStressTest(self):       
294        """test8GetMappedAttCertStressTest: Request mapped attribute certificate from
295        NDG Attribute Authority Web Service."""
296   
297        # Read user Certificate into a string ready for passing via WS
298        try:
299            userCertFilePath = \
300    self.cfg['test8GetMappedAttCertStressTest'].get('issuingclntcertfilepath')
301            userCertTxt = open(userCertFilePath, 'r').read()
302       
303        except TypeError:
304            # No issuing cert set
305            userCertTxt = None
306               
307        except IOError, ioErr:
308            raise "Error reading certificate file \"%s\": %s" % \
309                                    (ioErr.filename, ioErr.strerror)
310
311        try:
312            if self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd') is None:
313                clntPriKeyPwd = getpass.getpass(\
314                            prompt="\nsetUp - client private key password: ")
315            else:
316                clntPriKeyPwd = \
317            self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd')
318        except KeyboardInterrupt:
319            sys.exit(0)
320
321        # List of CA certificates for use in validation of certs used in
322        # signature for server reponse
323        try:
324            caCertFilePathList=\
325    self.cfg['test8GetMappedAttCertStressTest']['cacertfilepathlist'].split()
326        except:
327            caCertFilePathList = []
328           
329        reqBinSecTokValType = \
330        self.cfg['test8GetMappedAttCertStressTest'].get('reqbinsectokvaltype')
331       
332        # Check certificate types proxy or standard
333        proxyCertFilePath = \
334        self.cfg['test8GetMappedAttCertStressTest'].get('proxycertfilepath')
335        if proxyCertFilePath:
336            signingCertChain = \
337                        self._getCertChainFromProxyCertFile(proxyCertFilePath)       
338        else:
339            signingCertChain = None
340
341        setSignatureHandler = \
342    eval(self.cfg['test8GetMappedAttCertStressTest']['setsignaturehandler'])
343       
344        # Make client to site B Attribute Authority
345        clnt = AttAuthorityClient(\
346uri=self.cfg['test8GetMappedAttCertStressTest']['uri'], 
347setSignatureHandler=setSignatureHandler,
348reqBinSecTokValType=reqBinSecTokValType,
349signingCertChain=signingCertChain,
350signingCertFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('clntcertfilepath'),
351signingPriKeyFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeyfilepath'),
352signingPriKeyPwd=clntPriKeyPwd,
353caCertFilePathList=caCertFilePathList,
354tracefile=sys.stderr)
355
356        acFilePathList = \
357self.cfg['test8GetMappedAttCertStressTest']['userattcertfilepathlist'].split()
358
359        for acFilePath in acFilePathList:
360            try:
361                userAttCert = AttCertRead(acFilePath)
362               
363            except IOError, ioErr:
364                raise "Error reading attribute certificate file \"%s\": %s" %\
365                                        (ioErr.filename, ioErr.strerror)
366       
367            # Make attribute certificate request
368            try:
369                attCert = clnt.getAttCert(userCert=userCertTxt,
370                                          userAttCert=userAttCert)
371            except Exception, e:
372                outFilePfx = 'test8GetMappedAttCertStressTest-%s' % \
373                        os.path.basename(acFilePath)   
374                msgFile = open(outFilePfx+".msg", 'w')
375                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
376             
377#_____________________________________________________________________________       
378class AttAuthorityClientTestSuite(unittest.TestSuite):
379    def __init__(self):
380        map = map(AttAuthorityClientTestCase,
381                  (
382                    "test1GetX509Cert",
383                    "test2GetHostInfo",
384                    "test3GetTrustedHostInfo",
385                    "test4GetTrustedHostInfoWithNoRole",
386                    "test5GetAttCert",
387                    "test6GetAttCertWithUserIdSet",
388                    "test7GetMappedAttCert",
389                    "test8GetMappedAttCertStressTest",
390                  ))
391        unittest.TestSuite.__init__(self, map)
392                                       
393if __name__ == "__main__":
394    unittest.main()
Note: See TracBrowser for help on using the repository browser.