source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py @ 2515

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py@2515
Revision 2515, 13.8 KB checked in by pjkersha, 13 years ago (diff)
  • Working version of WS-Security interface with proxy certificates - chain

of trust containing proxy cert and user cert is passed in a base 64 encoded
DER in a 'X509PKIPathv1' type BinarySecurityToken?.

ndg.security.server/ndg/security/server/AttAuthority/server-config.tac:

  • fix to soap_getX509Cert() - return base 64 encoded DER instead of PEM

format

ndg.security.server/ndg/security/server/AttAuthority/init.py,
ndg.security.server/ndg/security/server/ca/init.py,
ndg.security.server/ndg/security/server/SessionMgr/init.py,
ndg.security.client/ndg/security/client/SimpleCAClient.py:

  • added repr and get methods to better emulate dict behaviour

ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py,
ndg.security.test/ndg/security/test/AttAuthority/attAuthorityClientTest.cfg:

  • modified to enable correct passing of proxy certificates with WS-Security
  • all unit tests work with these changes

ndg.security.common/ndg/security/common/X509.py:

  • fix to X509Cert.toString method - added 'return'
  • fix to X500DN comparison operators - use eq and ne deleted cmp
  • various fixes to X509Stack particular iter and verifyCertChain.
  • get method now behaves like dict parent class

ndg.security.common/ndg/security/common/AttCert.py:

  • fixed bug in holderDN attribute - now correctly set to call getHolderDN

NOT getHolder!

ndg.security.common/ndg/security/common/AttAuthority/init.py:

  • added setSignatureHandler flag to init

ndg.security.common/ndg/security/common/wsSecurity.py:

  • working version to handle proxy certificates correctly - uses

'X509PKIPathv1' type BinarySecurityToken?.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority client unit tests
3
4NERC Data Grid Project
5
6@author P J Kershaw 05/05/05, major update 16/01/07
7
8@copyright (C) 2007 CCLRC & NERC
9
10@license This software may be distributed under the terms of the Q Public
11License, version 1.0 or later.
12"""
13
14__revision__ = '$Id$'
15
16import unittest
17import os, sys, getpass, re
18from ConfigParser import SafeConfigParser
19
20from ndg.security.common.AttAuthority import AttAuthorityClient
21from ndg.security.common.AttCert import AttCertRead
22from ndg.security.common.X509 import X509CertParse
23
24
25class AttAuthorityClientTestCase(unittest.TestCase):
26    clntPriKeyPwd = None
27    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
28
29    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
30        '''Read proxy cert and user cert from a single PEM file and put in
31        a list ready for input into SignatureHandler'''               
32        proxyCertFileTxt = open(proxyCertFilePath).read()
33       
34        pemPatRE = re.compile(self.__class__.pemPat, re.S)
35        x509CertList = pemPatRE.findall(proxyCertFileTxt)
36       
37        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
38                            x509CertList]
39   
40        # Expecting proxy cert first - move this to the end.  This will
41        # be the cert used to verify the message signature
42        signingCertChain.reverse()
43       
44        return signingCertChain
45
46
47    def setUp(self):
48
49        configParser = SafeConfigParser()
50        configParser.read("./attAuthorityClientTest.cfg")
51       
52        self.cfg = {}
53        for section in configParser.sections():
54            self.cfg[section] = dict(configParser.items(section))
55
56        tracefile = sys.stderr
57
58        if self.clntPriKeyPwd is None:
59            try:
60                if self.cfg['setUp'].get('clntprikeypwd') is None:
61                    self.clntPriKeyPwd = getpass.getpass(\
62                            prompt="\nsetUp - client private key password: ")
63                else:
64                    self.clntPriKeyPwd=self.cfg['setUp'].get('clntprikeypwd')
65            except KeyboardInterrupt:
66                sys.exit(0)
67
68        # List of CA certificates for use in validation of certs used in
69        # signature for server reponse
70        try:
71            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
72        except:
73            caCertFilePathList = []
74         
75         
76        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
77
78        # Check certificate types proxy or standard
79        proxyCertFilePath = self.cfg['setUp'].get('proxycertfilepath')
80        if proxyCertFilePath:
81            signingCertChain = \
82                        self._getCertChainFromProxyCertFile(proxyCertFilePath)
83           
84        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
85
86        # Instantiate WS proxy
87        self.clnt = AttAuthorityClient(uri=self.cfg['setUp']['uri'],
88            setSignatureHandler=setSignatureHandler,
89            reqBinSecTokValType=reqBinSecTokValType,
90            signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
91            signingCertChain=signingCertChain,
92            signingPriKeyFilePath=self.cfg['setUp'].get('clntprikeyfilepath'),
93            signingPriKeyPwd=self.clntPriKeyPwd,
94            caCertFilePathList=caCertFilePathList,
95            tracefile=sys.stderr)
96           
97   
98    def test1GetX509Cert(self):
99        '''test1GetX509Cert: retrieve Attribute Authority's X.509 cert.'''
100        resp = self.clnt.getX509Cert()
101        print "Attribute Authority X.509 cert.:\n" + resp
102
103    def test2GetHostInfo(self):
104        """test2GetHostInfo: retrieve info for AA host"""
105        hostInfo = self.clnt.getHostInfo()
106        print "Host Info:\n %s" % hostInfo
107       
108
109    def test3GetTrustedHostInfo(self):
110        """test3GetTrustedHostInfo: retrieve trusted host info matching a
111        given role"""
112        trustedHostInfo = self.clnt.getTrustedHostInfo(\
113                                 self.cfg['test3GetTrustedHostInfo']['role'])
114        print "Trusted Host Info:\n %s" % trustedHostInfo
115
116
117    def test4GetTrustedHostInfoWithNoRole(self):
118        """test4GetTrustedHostInfoWithNoRole: retrieve trusted host info
119        irrespective of role"""
120        trustedHostInfo = self.clnt.getTrustedHostInfo()
121        print "Trusted Host Info:\n %s" % trustedHostInfo
122
123
124    def test5GetAttCert(self):       
125        """test5GetAttCert: Request attribute certificate from NDG Attribute
126        Authority Web Service."""
127   
128        # Read user Certificate into a string ready for passing via WS
129        try:
130            userCertFilePath = \
131                self.cfg['test5GetAttCert'].get('issuingclntcertfilepath')
132            userCertTxt = open(userCertFilePath, 'r').read()
133       
134        except TypeError:
135            # No issuing cert set
136            userCertTxt = None
137               
138        except IOError, ioErr:
139            raise "Error reading certificate file \"%s\": %s" % \
140                                    (ioErr.filename, ioErr.strerror)
141
142        # Make attribute certificate request
143        attCert = self.clnt.getAttCert(userCert=userCertTxt)
144       
145        print "Attribute Certificate: \n\n:" + str(attCert)
146       
147        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
148        attCert.write()
149       
150    def test6GetAttCertWithUserIdSet(self):       
151        """test6GetAttCertWithUserIdSet: Request attribute certificate from
152        NDG Attribute Authority Web Service setting a specific user Id
153        independent of the signer of the SOAP request."""
154   
155        # Read user Certificate into a string ready for passing via WS
156        try:
157            userCertFilePath = \
158    self.cfg['test6GetAttCertWithUserIdSet'].get('issuingclntcertfilepath')
159            userCertTxt = open(userCertFilePath, 'r').read()
160       
161        except TypeError:
162            # No issuing cert set
163            userCertTxt = None
164               
165        except IOError, ioErr:
166            raise "Error reading certificate file \"%s\": %s" % \
167                                    (ioErr.filename, ioErr.strerror)
168
169        # Make attribute certificate request
170        userId = self.cfg['test6GetAttCertWithUserIdSet']['userid']
171        attCert = self.clnt.getAttCert(userId=userId,
172                                       userCert=userCertTxt)
173       
174        print "Attribute Certificate: \n\n:" + str(attCert)
175       
176        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
177        attCert.write()
178
179    def test7GetMappedAttCert(self):       
180        """test7GetMappedAttCert: Request mapped attribute certificate from
181        NDG Attribute Authority Web Service."""
182   
183        # Read user Certificate into a string ready for passing via WS
184        try:
185            userCertFilePath = \
186            self.cfg['test7GetMappedAttCert'].get('issuingclntcertfilepath')
187            userCertTxt = open(userCertFilePath, 'r').read()
188       
189        except TypeError:
190            # No issuing cert set
191            userCertTxt = None
192               
193        except IOError, ioErr:
194            raise "Error reading certificate file \"%s\": %s" % \
195                                    (ioErr.filename, ioErr.strerror)
196   
197   
198        # Simlarly for Attribute Certificate
199        try:
200            userAttCert = AttCertRead(\
201                self.cfg['test7GetMappedAttCert']['userattcertfilepath'])
202           
203        except IOError, ioErr:
204            raise "Error reading attribute certificate file \"%s\": %s" %\
205                                    (ioErr.filename, ioErr.strerror)
206
207        try:
208            if self.cfg['test7GetMappedAttCert'].get('clntprikeypwd') is None:
209                clntPriKeyPwd = getpass.getpass(\
210                            prompt="\nsetUp - client private key password: ")
211            else:
212                clntPriKeyPwd = \
213                        self.cfg['test7GetMappedAttCert'].get('clntprikeypwd')
214        except KeyboardInterrupt:
215            sys.exit(0)
216
217        # List of CA certificates for use in validation of certs used in
218        # signature for server reponse
219        try:
220            caCertFilePathList=\
221            self.cfg['test7GetMappedAttCert']['cacertfilepathlist'].split()
222        except:
223            caCertFilePathList = []
224           
225        reqBinSecTokValType = \
226                self.cfg['test7GetMappedAttCert'].get('reqbinsectokvaltype')
227       
228        # Check certificate types proxy or standard
229        proxyCertFilePath = \
230                    self.cfg['test7GetMappedAttCert'].get('proxycertfilepath')
231        if proxyCertFilePath:
232            signingCertChain = \
233                        self._getCertChainFromProxyCertFile(proxyCertFilePath)       
234
235        setSignatureHandler = \
236                eval(self.cfg['test7GetMappedAttCert']['setsignaturehandler'])
237       
238        # Make client to site B Attribute Authority
239        clnt = AttAuthorityClient(\
240uri=self.cfg['test7GetMappedAttCert']['uri'], 
241setSignatureHandler=setSignatureHandler,
242reqBinSecTokValType=reqBinSecTokValType,
243signingCertFilePath=self.cfg['test7GetMappedAttCert'].get('clntcertfilepath'),
244signingCertChain=signingCertChain,
245signingPriKeyFilePath=self.cfg['test7GetMappedAttCert'].get('clntprikeyfilepath'),
246signingPriKeyPwd=clntPriKeyPwd,
247caCertFilePathList=caCertFilePathList,
248tracefile=sys.stderr)
249   
250        # Make attribute certificate request
251        attCert = clnt.getAttCert(userCert=userCertTxt,
252                                  userAttCert=userAttCert)
253        print "Attribute Certificate: \n\n:" + str(attCert)
254       
255        attCert.filePath = \
256                    self.cfg['test7GetMappedAttCert']['mappedattcertfilepath']
257        attCert.write()
258       
259       
260    def test8GetMappedAttCertStressTest(self):       
261        """test8GetMappedAttCertStressTest: Request mapped attribute certificate from
262        NDG Attribute Authority Web Service."""
263   
264        # Read user Certificate into a string ready for passing via WS
265        try:
266            userCertFilePath = \
267    self.cfg['test8GetMappedAttCertStressTest'].get('issuingclntcertfilepath')
268            userCertTxt = open(userCertFilePath, 'r').read()
269       
270        except TypeError:
271            # No issuing cert set
272            userCertTxt = None
273               
274        except IOError, ioErr:
275            raise "Error reading certificate file \"%s\": %s" % \
276                                    (ioErr.filename, ioErr.strerror)
277
278        try:
279            if self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd') is None:
280                clntPriKeyPwd = getpass.getpass(\
281                            prompt="\nsetUp - client private key password: ")
282            else:
283                clntPriKeyPwd = \
284            self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd')
285        except KeyboardInterrupt:
286            sys.exit(0)
287
288        # List of CA certificates for use in validation of certs used in
289        # signature for server reponse
290        try:
291            caCertFilePathList=\
292    self.cfg['test8GetMappedAttCertStressTest']['cacertfilepathlist'].split()
293        except:
294            caCertFilePathList = []
295           
296        reqBinSecTokValType = \
297        self.cfg['test8GetMappedAttCertStressTest'].get('reqbinsectokvaltype')
298       
299        # Check certificate types proxy or standard
300        proxyCertFilePath = \
301        self.cfg['test8GetMappedAttCertStressTest'].get('proxycertfilepath')
302        if proxyCertFilePath:
303            signingCertChain = \
304                        self._getCertChainFromProxyCertFile(proxyCertFilePath)       
305
306        setSignatureHandler = \
307    eval(self.cfg['test8GetMappedAttCertStressTest']['setsignaturehandler'])
308       
309        # Make client to site B Attribute Authority
310        clnt = AttAuthorityClient(\
311uri=self.cfg['test8GetMappedAttCertStressTest']['uri'], 
312setSignatureHandler=setSignatureHandler,
313reqBinSecTokValType=reqBinSecTokValType,
314signingCertChain=signingCertChain,
315signingCertFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('clntcertfilepath'),
316signingPriKeyFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeyfilepath'),
317signingPriKeyPwd=clntPriKeyPwd,
318caCertFilePathList=caCertFilePathList,
319tracefile=sys.stderr)
320
321        acFilePathList = \
322self.cfg['test8GetMappedAttCertStressTest']['userattcertfilepathlist'].split()
323
324        for acFilePath in acFilePathList:
325            try:
326                userAttCert = AttCertRead(acFilePath)
327               
328            except IOError, ioErr:
329                raise "Error reading attribute certificate file \"%s\": %s" %\
330                                        (ioErr.filename, ioErr.strerror)
331       
332            # Make attribute certificate request
333            try:
334                attCert = clnt.getAttCert(userCert=userCertTxt,
335                                          userAttCert=userAttCert)
336            except Exception, e:
337                outFilePfx = 'test8GetMappedAttCertStressTest-%s' % \
338                        os.path.basename(acFilePath)   
339                msgFile = open(outFilePfx+".msg", 'w')
340                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
341             
342#_____________________________________________________________________________       
343class AttAuthorityClientTestSuite(unittest.TestSuite):
344    def __init__(self):
345        map = map(AttAuthorityClientTestCase,
346                  (
347                    "test1GetX509Cert",
348                    "test2GetHostInfo",
349                    "test3GetTrustedHostInfo",
350                    "test4GetTrustedHostInfoWithNoRole",
351                    "test5GetAttCert",
352                    "test6GetAttCertWithUserIdSet",
353                    "test7GetMappedAttCert",
354                    "test8GetMappedAttCertStressTest",
355                  ))
356        unittest.TestSuite.__init__(self, map)
357                                       
358if __name__ == "__main__":
359    unittest.main()
Note: See TracBrowser for help on using the repository browser.