source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attAuthority/AttAuthorityClientTest.py @ 4171

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attAuthority/AttAuthorityClientTest.py@4171
Revision 4171, 17.7 KB checked in by pjkersha, 12 years ago (diff)

Fixes to SOAP and WS-Security middleware, added SOAP fault handling for exceptions

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2007 STFC & NERC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "P.J.Kershaw@rl.ac.uk"
13__revision__ = '$Id$'
14
15import unittest
16import os, sys, getpass, re
17from ConfigParser import SafeConfigParser
18import logging
19logging.basicConfig()
20
21from ndg.security.common.AttAuthority import AttAuthorityClient
22from ndg.security.common.AttCert import AttCertRead
23from ndg.security.common.X509 import X509CertParse, X509CertRead
24from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
25
26from os.path import expandvars as xpdVars
27from os.path import join as jnPath
28mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
29
30
31class AttAuthorityClientTestCase(unittest.TestCase):
32    clntPriKeyPwd = None
33    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
34
35    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
36        '''Read proxy cert and user cert from a single PEM file and put in
37        a list ready for input into SignatureHandler'''               
38        proxyCertFileTxt = open(proxyCertFilePath).read()
39       
40        pemPatRE = re.compile(self.__class__.pemPat, re.S)
41        x509CertList = pemPatRE.findall(proxyCertFileTxt)
42       
43        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
44                            x509CertList]
45   
46        # Expecting proxy cert first - move this to the end.  This will
47        # be the cert used to verify the message signature
48        signingCertChain.reverse()
49       
50        return signingCertChain
51
52
53    def setUp(self):
54
55        if 'NDGSEC_INT_DEBUG' in os.environ:
56            import pdb
57            pdb.set_trace()
58       
59        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
60            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
61                os.path.abspath(os.path.dirname(__file__))
62
63        configParser = SafeConfigParser()
64        configFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
65                                'attAuthorityClientTest.cfg')
66        configParser.read(configFilePath)
67       
68        self.cfg = {}
69        for section in configParser.sections():
70            self.cfg[section] = dict(configParser.items(section))
71
72        tracefile = sys.stderr
73
74        if self.clntPriKeyPwd is None:
75            try:
76                if self.cfg['setUp'].get('clntprikeypwd') is None:
77                    self.clntPriKeyPwd = getpass.getpass(\
78                            prompt="\nsetUp - client private key password: ")
79                else:
80                    self.clntPriKeyPwd=self.cfg['setUp'].get('clntprikeypwd')
81            except KeyboardInterrupt:
82                sys.exit(0)
83
84        # List of CA certificates for use in validation of certs used in
85        # signature for server reponse
86        try:
87            caCertFilePathList = [xpdVars(file) for file in \
88                            self.cfg['setUp']['cacertfilepathlist'].split()]
89        except KeyError:
90            caCertFilePathList = []
91         
92        try:
93            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
94                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
95        except KeyError:
96            sslCACertList = []
97           
98        clntCertFilePath = xpdVars(self.cfg['setUp'].get('clntcertfilepath'))         
99        clntPriKeyFilePath=xpdVars(self.cfg['setUp'].get('clntprikeyfilepath'))
100        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
101
102        # Check certificate types proxy or standard
103        if reqBinSecTokValType == SigHdlr.binSecTokValType["X509PKIPathv1"]:
104            signingCertChain = \
105                        self._getCertChainFromProxyCertFile(clntCertFilePath)
106            signingCertFilePath = None
107        else:
108            signingCertChain = None
109            signingCertFilePath = clntCertFilePath
110
111        # Inclusive namespace prefixes for Exclusive C14N
112        try:
113            refC14nInclNS = self.cfg['setUp']['wssrefinclns'].split()           
114        except KeyError:
115            refC14nInclNS = []
116
117        try:
118            signedInfoC14nInclNS = self.cfg['setUp']['wsssignedinfoinclns'].split()         
119        except KeyError:
120            signedInfoC14nInclNS = []
121               
122        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
123
124        # Instantiate WS proxy
125        self.clnt = AttAuthorityClient(uri=self.cfg['setUp']['uri'],
126                        sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
127                        sslCACertList=sslCACertList,
128                        setSignatureHandler=setSignatureHandler,
129                        reqBinSecTokValType=reqBinSecTokValType,
130                        signingCertFilePath=signingCertFilePath,
131                        signingCertChain=signingCertChain,
132                        signingPriKeyFilePath=clntPriKeyFilePath,
133                        signingPriKeyPwd=self.clntPriKeyPwd,
134                        caCertFilePathList=caCertFilePathList,
135                        refC14nInclNS=refC14nInclNS,
136                        signedInfoC14nInclNS=signedInfoC14nInclNS,
137                        tracefile=sys.stderr)
138           
139   
140    def test1GetX509Cert(self):
141        '''test1GetX509Cert: retrieve Attribute Authority's X.509 cert.'''
142        resp = self.clnt.getX509Cert()
143        print "Attribute Authority X.509 cert.:\n" + resp
144
145    def test2GetHostInfo(self):
146        """test2GetHostInfo: retrieve info for AA host"""
147        hostInfo = self.clnt.getHostInfo()
148        print "Host Info:\n %s" % hostInfo
149       
150
151    def test3GetTrustedHostInfo(self):
152        """test3GetTrustedHostInfo: retrieve trusted host info matching a
153        given role"""
154        trustedHostInfo = self.clnt.getTrustedHostInfo(\
155                                 self.cfg['test3GetTrustedHostInfo']['role'])
156        for hostname, hostInfo in trustedHostInfo.items():
157            assert hostname, "Hostname not set"
158            for k, v in hostInfo.items():
159                assert k, "hostInfo value key unset"
160
161        print "Trusted Host Info:\n %s" % trustedHostInfo
162
163
164    def test4GetTrustedHostInfoWithNoRole(self):
165        """test4GetTrustedHostInfoWithNoRole: retrieve trusted host info
166        irrespective of role"""
167        trustedHostInfo = self.clnt.getTrustedHostInfo()
168        for hostname, hostInfo in trustedHostInfo.items():
169            assert hostname, "Hostname not set"
170            for k, v in hostInfo.items():
171                assert k, "hostInfo value key unset"
172                assert v, ("%s value not set" % k)
173                   
174        print "Trusted Host Info:\n %s" % trustedHostInfo
175       
176
177    def test4aGetAllHostsInfo(self):
178        """test4aGetAllHostsInfo: retrieve info for all hosts"""
179        allHostInfo = self.clnt.getAllHostsInfo()
180        for hostname, hostInfo in allHostInfo.items():
181            assert hostname, "Hostname not set"
182            for k, v in hostInfo.items():
183                assert k, "hostInfo value key unset"
184                   
185        print "All Hosts Info:\n %s" % allHostInfo
186
187
188    def test5GetAttCert(self):       
189        """test5GetAttCert: Request attribute certificate from NDG Attribute
190        Authority Web Service."""
191   
192        # Read user Certificate into a string ready for passing via WS
193        try:
194            userCertFilePath = \
195            xpdVars(self.cfg['test5GetAttCert'].get('issuingclntcertfilepath'))
196            userCertTxt = open(userCertFilePath, 'r').read()
197       
198        except TypeError:
199            # No issuing cert set
200            userCertTxt = None
201               
202        except IOError, ioErr:
203            raise "Error reading certificate file \"%s\": %s" % \
204                                    (ioErr.filename, ioErr.strerror)
205
206        # Make attribute certificate request
207        attCert = self.clnt.getAttCert(userCert=userCertTxt)
208       
209        print "Attribute Certificate: \n\n:" + str(attCert)
210       
211        attCert.filePath = \
212                        xpdVars(self.cfg['test5GetAttCert']['attcertfilepath'])
213        attCert.write()
214       
215       
216    def test6GetAttCertWithUserIdSet(self):       
217        """test6GetAttCertWithUserIdSet: Request attribute certificate from
218        NDG Attribute Authority Web Service setting a specific user Id
219        independent of the signer of the SOAP request."""
220   
221        # Read user Certificate into a string ready for passing via WS
222        try:
223            userCertFilePath = xpdVars(\
224    self.cfg['test6GetAttCertWithUserIdSet'].get('issuingclntcertfilepath'))
225            userCertTxt = open(userCertFilePath, 'r').read()
226       
227        except TypeError:
228            # No issuing cert set
229            userCertTxt = None
230               
231        except IOError, ioErr:
232            raise "Error reading certificate file \"%s\": %s" % \
233                                    (ioErr.filename, ioErr.strerror)
234
235        # Make attribute certificate request
236        userId = self.cfg['test6GetAttCertWithUserIdSet']['userid']
237        attCert = self.clnt.getAttCert(userId=userId,
238                                       userCert=userCertTxt)
239       
240        print "Attribute Certificate: \n\n:" + str(attCert)
241       
242        attCert.filePath = \
243                        xpdVars(self.cfg['test5GetAttCert']['attcertfilepath'])
244        attCert.write()
245
246
247    def test7GetMappedAttCert(self):       
248        """test7GetMappedAttCert: Request mapped attribute certificate from
249        NDG Attribute Authority Web Service."""
250   
251        # Read user Certificate into a string ready for passing via WS
252        try:
253            userCertFilePath = xpdVars(\
254            self.cfg['test7GetMappedAttCert'].get('issuingclntcertfilepath'))
255            userCertTxt = open(userCertFilePath, 'r').read()
256       
257        except TypeError:
258            # No issuing cert set
259            userCertTxt = None
260               
261        except IOError, ioErr:
262            raise "Error reading certificate file \"%s\": %s" % \
263                                    (ioErr.filename, ioErr.strerror)
264   
265   
266        # Simlarly for Attribute Certificate
267        try:
268            userAttCert = AttCertRead(xpdVars(\
269                self.cfg['test7GetMappedAttCert']['userattcertfilepath']))
270           
271        except IOError, ioErr:
272            raise "Error reading attribute certificate file \"%s\": %s" %\
273                                    (ioErr.filename, ioErr.strerror)
274
275        try:
276            if self.cfg['test7GetMappedAttCert'].get('clntprikeypwd') is None:
277                clntPriKeyPwd = getpass.getpass(\
278                            prompt="\nsetUp - client private key password: ")
279            else:
280                clntPriKeyPwd = \
281                        self.cfg['test7GetMappedAttCert'].get('clntprikeypwd')
282        except KeyboardInterrupt:
283            sys.exit(0)
284
285        # List of CA certificates for use in validation of certs used in
286        # signature for server reponse
287        try:
288            caCertFilePathList = [xpdVars(file) for file in \
289            self.cfg['test7GetMappedAttCert']['cacertfilepathlist'].split()]
290        except:
291            caCertFilePathList = []
292           
293           
294        clntCertFilePath = xpdVars(\
295                self.cfg['test7GetMappedAttCert'].get('clntcertfilepath'))
296        clntPriKeyFilePath = xpdVars(\
297                self.cfg['test7GetMappedAttCert'].get('clntprikeyfilepath'))
298               
299        reqBinSecTokValType = \
300                self.cfg['test7GetMappedAttCert'].get('reqbinsectokvaltype')
301
302        # Check certificate types proxy or standard
303        if reqBinSecTokValType == SigHdlr.binSecTokValType["X509PKIPathv1"]:
304            signingCertChain = \
305                        self._getCertChainFromProxyCertFile(clntCertFilePath)
306            signingCertFilePath = None
307        else:
308            signingCertChain = None
309            signingCertFilePath = clntCertFilePath
310
311        setSignatureHandler = \
312                eval(self.cfg['test7GetMappedAttCert']['setsignaturehandler'])
313       
314        # Make client to site B Attribute Authority
315        clnt = AttAuthorityClient(\
316                                uri=self.cfg['test7GetMappedAttCert']['uri'], 
317                                setSignatureHandler=setSignatureHandler,
318                                reqBinSecTokValType=reqBinSecTokValType,
319                                signingCertFilePath=signingCertFilePath,
320                                signingCertChain=signingCertChain,
321                                signingPriKeyFilePath=clntPriKeyFilePath,
322                                signingPriKeyPwd=clntPriKeyPwd,
323                                caCertFilePathList=caCertFilePathList,
324                                tracefile=sys.stderr)
325   
326        # Make attribute certificate request
327        attCert = clnt.getAttCert(userCert=userCertTxt,
328                                  userAttCert=userAttCert)
329        print "Attribute Certificate: \n\n:" + str(attCert)
330       
331        attCert.filePath = xpdVars(\
332                    self.cfg['test7GetMappedAttCert']['mappedattcertfilepath'])
333        attCert.write()
334       
335       
336    def test8GetMappedAttCertStressTest(self):       
337        """test8GetMappedAttCertStressTest: Request mapped attribute
338        certificate from NDG Attribute Authority Web Service."""
339   
340        # Read user Certificate into a string ready for passing via WS
341        try:
342            userCertFilePath = xpdVars(\
343    self.cfg['test8GetMappedAttCertStressTest'].get('issuingclntcertfilepath'))
344            userCertTxt = open(userCertFilePath, 'r').read()
345       
346        except TypeError:
347            # No issuing cert set
348            userCertTxt = None
349               
350        except IOError, ioErr:
351            raise "Error reading certificate file \"%s\": %s" % \
352                                    (ioErr.filename, ioErr.strerror)
353
354        try:
355            clntPriKeyPwd = \
356            self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd')
357            if clntPriKeyPwd is None:
358                clntPriKeyPwd = getpass.getpass(\
359                            prompt="\nsetUp - client private key password: ")
360        except KeyboardInterrupt:
361            sys.exit(0)
362
363        # List of CA certificates for use in validation of certs used in
364        # signature for server reponse
365        try:
366            caCertFilePathList = [xpdVars(file) for file in \
367    self.cfg['test8GetMappedAttCertStressTest']['cacertfilepathlist'].split()]
368        except:
369            caCertFilePathList = []
370
371
372        clntCertFilePath = xpdVars(\
373        self.cfg['test8GetMappedAttCertStressTest'].get('clntcertfilepath'))           
374
375        clntPriKeyFilePath = xpdVars(\
376        self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeyfilepath'))
377
378        reqBinSecTokValType = \
379        self.cfg['test8GetMappedAttCertStressTest'].get('reqbinsectokvaltype')
380       
381        # Check certificate types proxy or standard
382        if reqBinSecTokValType == SigHdlr.binSecTokValType["X509PKIPathv1"]:
383            signingCertChain = \
384                        self._getCertChainFromProxyCertFile(clntCertFilePath)
385            signingCertFilePath = None
386        else:
387            signingCertChain = None
388            signingCertFilePath = clntCertFilePath
389
390        setSignatureHandler = \
391    eval(self.cfg['test8GetMappedAttCertStressTest']['setsignaturehandler'])
392       
393        # Make client to site B Attribute Authority
394        clnt = AttAuthorityClient(\
395                        uri=self.cfg['test8GetMappedAttCertStressTest']['uri'], 
396                        setSignatureHandler=setSignatureHandler,
397                        reqBinSecTokValType=reqBinSecTokValType,
398                        signingCertChain=signingCertChain,
399                        signingCertFilePath=clntCertFilePath,
400                        signingPriKeyFilePath=clntPriKeyFilePath,
401                        signingPriKeyPwd=clntPriKeyPwd,
402                        caCertFilePathList=caCertFilePathList,
403                        tracefile=sys.stderr)
404
405        acFilePathList = [xpdVars(file) for file in \
406self.cfg['test8GetMappedAttCertStressTest']['userattcertfilepathlist'].split()]
407
408        for acFilePath in acFilePathList:
409            try:
410                userAttCert = AttCertRead(acFilePath)
411               
412            except IOError, ioErr:
413                raise "Error reading attribute certificate file \"%s\": %s" %\
414                                        (ioErr.filename, ioErr.strerror)
415       
416            # Make attribute certificate request
417            try:
418                attCert = clnt.getAttCert(userCert=userCertTxt,
419                                          userAttCert=userAttCert)
420            except Exception, e:
421                outFilePfx = 'test8GetMappedAttCertStressTest-%s' % \
422                        os.path.basename(acFilePath)   
423                msgFile = open(outFilePfx+".msg", 'w')
424                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
425             
426             
427#_____________________________________________________________________________       
428class AttAuthorityClientTestSuite(unittest.TestSuite):
429    def __init__(self):
430        map = map(AttAuthorityClientTestCase,
431                  (
432                    "test1GetX509Cert",
433                    "test2GetHostInfo",
434                    "test3GetTrustedHostInfo",
435                    "test4GetTrustedHostInfoWithNoRole",
436                    "test5GetAttCert",
437                    "test6GetAttCertWithUserIdSet",
438                    "test7GetMappedAttCert",
439                    "test8GetMappedAttCertStressTest",
440                  ))
441        unittest.TestSuite.__init__(self, map)
442                                       
443if __name__ == "__main__":
444    unittest.main()
Note: See TracBrowser for help on using the repository browser.