source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attAuthority/AttAuthorityClientTest.py @ 4129

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attAuthority/AttAuthorityClientTest.py@4129
Revision 4129, 17.7 KB checked in by cbyrom, 11 years ago (diff)

General refactoring and updating of code, including:

Removal of refC14nKw and singnedInfoC14nKw keywords in wsssecurity session manager config
(the refC14nInclNS and signedInfoC14nInclNS keywords are sufficient);
Creation of new DOM signature handler class, dom.py, based on the wsSecurity
class;
Abstraction of common code between dom.py and etree.py into new parent
class, BaseSignatureHandler?.py.
Fixing and extending use of properties in the SignatureHandler? code.
Fixing a few bugs with the original SignatureHandler? code.
Updating of test cases to new code/code structure.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2007 STFC & NERC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "P.J.Kershaw@rl.ac.uk"
13__revision__ = '$Id$'
14
15import unittest
16import os, sys, getpass, re
17from ConfigParser import SafeConfigParser
18
19from ndg.security.common.AttAuthority import AttAuthorityClient
20from ndg.security.common.AttCert import AttCertRead
21from ndg.security.common.X509 import X509CertParse, X509CertRead
22from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
23
24from os.path import expandvars as xpdVars
25from os.path import join as jnPath
26mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
27
28
29class AttAuthorityClientTestCase(unittest.TestCase):
30    clntPriKeyPwd = None
31    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
32
33    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
34        '''Read proxy cert and user cert from a single PEM file and put in
35        a list ready for input into SignatureHandler'''               
36        proxyCertFileTxt = open(proxyCertFilePath).read()
37       
38        pemPatRE = re.compile(self.__class__.pemPat, re.S)
39        x509CertList = pemPatRE.findall(proxyCertFileTxt)
40       
41        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
42                            x509CertList]
43   
44        # Expecting proxy cert first - move this to the end.  This will
45        # be the cert used to verify the message signature
46        signingCertChain.reverse()
47       
48        return signingCertChain
49
50
51    def setUp(self):
52
53        if 'NDGSEC_INT_DEBUG' in os.environ:
54            import pdb
55            pdb.set_trace()
56       
57        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
58            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
59                os.path.abspath(os.path.dirname(__file__))
60
61        configParser = SafeConfigParser()
62        configFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
63                                'attAuthorityClientTest.cfg')
64        configParser.read(configFilePath)
65       
66        self.cfg = {}
67        for section in configParser.sections():
68            self.cfg[section] = dict(configParser.items(section))
69
70        tracefile = sys.stderr
71
72        if self.clntPriKeyPwd is None:
73            try:
74                if self.cfg['setUp'].get('clntprikeypwd') is None:
75                    self.clntPriKeyPwd = getpass.getpass(\
76                            prompt="\nsetUp - client private key password: ")
77                else:
78                    self.clntPriKeyPwd=self.cfg['setUp'].get('clntprikeypwd')
79            except KeyboardInterrupt:
80                sys.exit(0)
81
82        # List of CA certificates for use in validation of certs used in
83        # signature for server reponse
84        try:
85            caCertFilePathList = [xpdVars(file) for file in \
86                            self.cfg['setUp']['cacertfilepathlist'].split()]
87        except KeyError:
88            caCertFilePathList = []
89         
90        try:
91            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
92                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
93        except KeyError:
94            sslCACertList = []
95           
96        clntCertFilePath = xpdVars(self.cfg['setUp'].get('clntcertfilepath'))         
97        clntPriKeyFilePath=xpdVars(self.cfg['setUp'].get('clntprikeyfilepath'))
98        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
99
100        # Check certificate types proxy or standard
101        if reqBinSecTokValType == SigHdlr.binSecTokValType["X509PKIPathv1"]:
102            signingCertChain = \
103                        self._getCertChainFromProxyCertFile(clntCertFilePath)
104            signingCertFilePath = None
105        else:
106            signingCertChain = None
107            signingCertFilePath = clntCertFilePath
108
109        # Inclusive namespace prefixes for Exclusive C14N
110        try:
111            refC14nInclNS = self.cfg['setUp']['wssrefinclns'].split()           
112        except KeyError:
113            refC14nInclNS = []
114
115        try:
116            signedInfoC14nInclNS = self.cfg['setUp']['wsssignedinfoinclns'].split()         
117        except KeyError:
118            signedInfoC14nInclNS = []
119               
120        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
121
122        # Instantiate WS proxy
123        self.clnt = AttAuthorityClient(uri=self.cfg['setUp']['uri'],
124                        sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
125                        sslCACertList=sslCACertList,
126                        setSignatureHandler=setSignatureHandler,
127                        reqBinSecTokValType=reqBinSecTokValType,
128                        signingCertFilePath=signingCertFilePath,
129                        signingCertChain=signingCertChain,
130                        signingPriKeyFilePath=clntPriKeyFilePath,
131                        signingPriKeyPwd=self.clntPriKeyPwd,
132                        caCertFilePathList=caCertFilePathList,
133                        refC14nInclNS=refC14nInclNS,
134                        signedInfoC14nInclNS=signedInfoC14nInclNS,
135                        tracefile=sys.stderr)
136           
137   
138    def test1GetX509Cert(self):
139        '''test1GetX509Cert: retrieve Attribute Authority's X.509 cert.'''
140        resp = self.clnt.getX509Cert()
141        print "Attribute Authority X.509 cert.:\n" + resp
142
143    def test2GetHostInfo(self):
144        """test2GetHostInfo: retrieve info for AA host"""
145        hostInfo = self.clnt.getHostInfo()
146        print "Host Info:\n %s" % hostInfo
147       
148
149    def test3GetTrustedHostInfo(self):
150        """test3GetTrustedHostInfo: retrieve trusted host info matching a
151        given role"""
152        trustedHostInfo = self.clnt.getTrustedHostInfo(\
153                                 self.cfg['test3GetTrustedHostInfo']['role'])
154        for hostname, hostInfo in trustedHostInfo.items():
155            assert hostname, "Hostname not set"
156            for k, v in hostInfo.items():
157                assert k, "hostInfo value key unset"
158
159        print "Trusted Host Info:\n %s" % trustedHostInfo
160
161
162    def test4GetTrustedHostInfoWithNoRole(self):
163        """test4GetTrustedHostInfoWithNoRole: retrieve trusted host info
164        irrespective of role"""
165        trustedHostInfo = self.clnt.getTrustedHostInfo()
166        for hostname, hostInfo in trustedHostInfo.items():
167            assert hostname, "Hostname not set"
168            for k, v in hostInfo.items():
169                assert k, "hostInfo value key unset"
170                assert v, ("%s value not set" % k)
171                   
172        print "Trusted Host Info:\n %s" % trustedHostInfo
173       
174
175    def test4aGetAllHostsInfo(self):
176        """test4aGetAllHostsInfo: retrieve info for all hosts"""
177        allHostInfo = self.clnt.getAllHostsInfo()
178        for hostname, hostInfo in allHostInfo.items():
179            assert hostname, "Hostname not set"
180            for k, v in hostInfo.items():
181                assert k, "hostInfo value key unset"
182                   
183        print "All Hosts Info:\n %s" % allHostInfo
184
185
186    def test5GetAttCert(self):       
187        """test5GetAttCert: Request attribute certificate from NDG Attribute
188        Authority Web Service."""
189   
190        # Read user Certificate into a string ready for passing via WS
191        try:
192            userCertFilePath = \
193            xpdVars(self.cfg['test5GetAttCert'].get('issuingclntcertfilepath'))
194            userCertTxt = open(userCertFilePath, 'r').read()
195       
196        except TypeError:
197            # No issuing cert set
198            userCertTxt = None
199               
200        except IOError, ioErr:
201            raise "Error reading certificate file \"%s\": %s" % \
202                                    (ioErr.filename, ioErr.strerror)
203
204        # Make attribute certificate request
205        attCert = self.clnt.getAttCert(userCert=userCertTxt)
206       
207        print "Attribute Certificate: \n\n:" + str(attCert)
208       
209        attCert.filePath = \
210                        xpdVars(self.cfg['test5GetAttCert']['attcertfilepath'])
211        attCert.write()
212       
213       
214    def test6GetAttCertWithUserIdSet(self):       
215        """test6GetAttCertWithUserIdSet: Request attribute certificate from
216        NDG Attribute Authority Web Service setting a specific user Id
217        independent of the signer of the SOAP request."""
218   
219        # Read user Certificate into a string ready for passing via WS
220        try:
221            userCertFilePath = xpdVars(\
222    self.cfg['test6GetAttCertWithUserIdSet'].get('issuingclntcertfilepath'))
223            userCertTxt = open(userCertFilePath, 'r').read()
224       
225        except TypeError:
226            # No issuing cert set
227            userCertTxt = None
228               
229        except IOError, ioErr:
230            raise "Error reading certificate file \"%s\": %s" % \
231                                    (ioErr.filename, ioErr.strerror)
232
233        # Make attribute certificate request
234        userId = self.cfg['test6GetAttCertWithUserIdSet']['userid']
235        attCert = self.clnt.getAttCert(userId=userId,
236                                       userCert=userCertTxt)
237       
238        print "Attribute Certificate: \n\n:" + str(attCert)
239       
240        attCert.filePath = \
241                        xpdVars(self.cfg['test5GetAttCert']['attcertfilepath'])
242        attCert.write()
243
244
245    def test7GetMappedAttCert(self):       
246        """test7GetMappedAttCert: Request mapped attribute certificate from
247        NDG Attribute Authority Web Service."""
248   
249        # Read user Certificate into a string ready for passing via WS
250        try:
251            userCertFilePath = xpdVars(\
252            self.cfg['test7GetMappedAttCert'].get('issuingclntcertfilepath'))
253            userCertTxt = open(userCertFilePath, 'r').read()
254       
255        except TypeError:
256            # No issuing cert set
257            userCertTxt = None
258               
259        except IOError, ioErr:
260            raise "Error reading certificate file \"%s\": %s" % \
261                                    (ioErr.filename, ioErr.strerror)
262   
263   
264        # Simlarly for Attribute Certificate
265        try:
266            userAttCert = AttCertRead(xpdVars(\
267                self.cfg['test7GetMappedAttCert']['userattcertfilepath']))
268           
269        except IOError, ioErr:
270            raise "Error reading attribute certificate file \"%s\": %s" %\
271                                    (ioErr.filename, ioErr.strerror)
272
273        try:
274            if self.cfg['test7GetMappedAttCert'].get('clntprikeypwd') is None:
275                clntPriKeyPwd = getpass.getpass(\
276                            prompt="\nsetUp - client private key password: ")
277            else:
278                clntPriKeyPwd = \
279                        self.cfg['test7GetMappedAttCert'].get('clntprikeypwd')
280        except KeyboardInterrupt:
281            sys.exit(0)
282
283        # List of CA certificates for use in validation of certs used in
284        # signature for server reponse
285        try:
286            caCertFilePathList = [xpdVars(file) for file in \
287            self.cfg['test7GetMappedAttCert']['cacertfilepathlist'].split()]
288        except:
289            caCertFilePathList = []
290           
291           
292        clntCertFilePath = xpdVars(\
293                self.cfg['test7GetMappedAttCert'].get('clntcertfilepath'))
294        clntPriKeyFilePath = xpdVars(\
295                self.cfg['test7GetMappedAttCert'].get('clntprikeyfilepath'))
296               
297        reqBinSecTokValType = \
298                self.cfg['test7GetMappedAttCert'].get('reqbinsectokvaltype')
299
300        # Check certificate types proxy or standard
301        if reqBinSecTokValType == SigHdlr.binSecTokValType["X509PKIPathv1"]:
302            signingCertChain = \
303                        self._getCertChainFromProxyCertFile(clntCertFilePath)
304            signingCertFilePath = None
305        else:
306            signingCertChain = None
307            signingCertFilePath = clntCertFilePath
308
309        setSignatureHandler = \
310                eval(self.cfg['test7GetMappedAttCert']['setsignaturehandler'])
311       
312        # Make client to site B Attribute Authority
313        clnt = AttAuthorityClient(\
314                                uri=self.cfg['test7GetMappedAttCert']['uri'], 
315                                setSignatureHandler=setSignatureHandler,
316                                reqBinSecTokValType=reqBinSecTokValType,
317                                signingCertFilePath=signingCertFilePath,
318                                signingCertChain=signingCertChain,
319                                signingPriKeyFilePath=clntPriKeyFilePath,
320                                signingPriKeyPwd=clntPriKeyPwd,
321                                caCertFilePathList=caCertFilePathList,
322                                tracefile=sys.stderr)
323   
324        # Make attribute certificate request
325        attCert = clnt.getAttCert(userCert=userCertTxt,
326                                  userAttCert=userAttCert)
327        print "Attribute Certificate: \n\n:" + str(attCert)
328       
329        attCert.filePath = xpdVars(\
330                    self.cfg['test7GetMappedAttCert']['mappedattcertfilepath'])
331        attCert.write()
332       
333       
334    def test8GetMappedAttCertStressTest(self):       
335        """test8GetMappedAttCertStressTest: Request mapped attribute
336        certificate from NDG Attribute Authority Web Service."""
337   
338        # Read user Certificate into a string ready for passing via WS
339        try:
340            userCertFilePath = xpdVars(\
341    self.cfg['test8GetMappedAttCertStressTest'].get('issuingclntcertfilepath'))
342            userCertTxt = open(userCertFilePath, 'r').read()
343       
344        except TypeError:
345            # No issuing cert set
346            userCertTxt = None
347               
348        except IOError, ioErr:
349            raise "Error reading certificate file \"%s\": %s" % \
350                                    (ioErr.filename, ioErr.strerror)
351
352        try:
353            clntPriKeyPwd = \
354            self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd')
355            if clntPriKeyPwd is None:
356                clntPriKeyPwd = getpass.getpass(\
357                            prompt="\nsetUp - client private key password: ")
358        except KeyboardInterrupt:
359            sys.exit(0)
360
361        # List of CA certificates for use in validation of certs used in
362        # signature for server reponse
363        try:
364            caCertFilePathList = [xpdVars(file) for file in \
365    self.cfg['test8GetMappedAttCertStressTest']['cacertfilepathlist'].split()]
366        except:
367            caCertFilePathList = []
368
369
370        clntCertFilePath = xpdVars(\
371        self.cfg['test8GetMappedAttCertStressTest'].get('clntcertfilepath'))           
372
373        clntPriKeyFilePath = xpdVars(\
374        self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeyfilepath'))
375
376        reqBinSecTokValType = \
377        self.cfg['test8GetMappedAttCertStressTest'].get('reqbinsectokvaltype')
378       
379        # Check certificate types proxy or standard
380        if reqBinSecTokValType == SigHdlr.binSecTokValType["X509PKIPathv1"]:
381            signingCertChain = \
382                        self._getCertChainFromProxyCertFile(clntCertFilePath)
383            signingCertFilePath = None
384        else:
385            signingCertChain = None
386            signingCertFilePath = clntCertFilePath
387
388        setSignatureHandler = \
389    eval(self.cfg['test8GetMappedAttCertStressTest']['setsignaturehandler'])
390       
391        # Make client to site B Attribute Authority
392        clnt = AttAuthorityClient(\
393                        uri=self.cfg['test8GetMappedAttCertStressTest']['uri'], 
394                        setSignatureHandler=setSignatureHandler,
395                        reqBinSecTokValType=reqBinSecTokValType,
396                        signingCertChain=signingCertChain,
397                        signingCertFilePath=clntCertFilePath,
398                        signingPriKeyFilePath=clntPriKeyFilePath,
399                        signingPriKeyPwd=clntPriKeyPwd,
400                        caCertFilePathList=caCertFilePathList,
401                        tracefile=sys.stderr)
402
403        acFilePathList = [xpdVars(file) for file in \
404self.cfg['test8GetMappedAttCertStressTest']['userattcertfilepathlist'].split()]
405
406        for acFilePath in acFilePathList:
407            try:
408                userAttCert = AttCertRead(acFilePath)
409               
410            except IOError, ioErr:
411                raise "Error reading attribute certificate file \"%s\": %s" %\
412                                        (ioErr.filename, ioErr.strerror)
413       
414            # Make attribute certificate request
415            try:
416                attCert = clnt.getAttCert(userCert=userCertTxt,
417                                          userAttCert=userAttCert)
418            except Exception, e:
419                outFilePfx = 'test8GetMappedAttCertStressTest-%s' % \
420                        os.path.basename(acFilePath)   
421                msgFile = open(outFilePfx+".msg", 'w')
422                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
423             
424             
425#_____________________________________________________________________________       
426class AttAuthorityClientTestSuite(unittest.TestSuite):
427    def __init__(self):
428        map = map(AttAuthorityClientTestCase,
429                  (
430                    "test1GetX509Cert",
431                    "test2GetHostInfo",
432                    "test3GetTrustedHostInfo",
433                    "test4GetTrustedHostInfoWithNoRole",
434                    "test5GetAttCert",
435                    "test6GetAttCertWithUserIdSet",
436                    "test7GetMappedAttCert",
437                    "test8GetMappedAttCertStressTest",
438                  ))
439        unittest.TestSuite.__init__(self, map)
440                                       
441if __name__ == "__main__":
442    unittest.main()
Note: See TracBrowser for help on using the repository browser.