source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py @ 3135

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py@3135
Revision 3135, 15.3 KB checked in by pjkersha, 13 years ago (diff)

Working Attribute Authority unit tests with WS-Security multiple CAs support. This will be needed for deployment of MyProxy? with Simple CA at partner sites.

Added CA cert and certs and keys for a *TEST* CA for use with unit tests. This CA is NOT for production use.

python/ndg.security.server/setup.py: include .crt certs in conf/ package data

python/ndg.security.server/ndg/security/server/AttAuthority/init.py: added sslCACertDir param. It enables M2Crypto SSL server side to pick up multiple CA certs for a dir.

python/ndg.security.server/ndg/security/server/conf/certs/ca/init.py: make new ca/ dir a package so that it's exported with egg package data.

python/ndg.security.server/ndg/security/server/conf/sessionMgr.tac,
python/ndg.security.server/ndg/security/server/conf/attAuthority.tac:

  • alter WS-Security SOAP handler init to accept multiple CA certs.
  • load multiple CA certs from sslCACertDir key of SessionMgr/AttAuthority? instance

python/ndg.security.server/ndg/security/server/conf/attAuthorityProperties.xml,
python/ndg.security.test/ndg/security/test/AttAuthority/siteBAttAuthorityProperties.xml,
python/ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrProperties.xml

  • added new sslCACertDir elem
  • fixed caCertFile - only single elem required

python/ndg.security.test/setup.py: include TEST CA and certs and keys issued from it for use in unit tests. These are fro test only.

python/ndg.security.test/ndg/security/test/AttAuthority/ca/ndg-test-ca.crt,
python/ndg.security.test/ndg/security/test/AttAuthority/siteA-aa.key,
python/ndg.security.test/ndg/security/test/AttAuthority/siteA-aa.crt: test CA certs and key.

python/ndg.security.test/ndg/security/test/AttAuthority/init.py: fix description

python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py: ditto + added NDGSEC_INT_DEBUG env var option

python/ndg.security.test/ndg/security/test/AttAuthority/attAuthorityClientTest.cfg: fixed for new location of CA cert in ca/ sub-dir

python/ndg.security.test/ndg/security/test/sessionMgrClient/ca/init.py,
python/ndg.security.test/ndg/security/test/sessionMgr/ca/init.py,
python/ndg.security.test/ndg/security/test/AttAuthority/ca/init.py: ensure ca/ dir gets included in egg package data

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2007 STFC & NERC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "P.J.Kershaw@rl.ac.uk"
13__revision__ = '$Id$'
14
15import unittest
16import os, sys, getpass, re
17from ConfigParser import SafeConfigParser
18
19from ndg.security.common.AttAuthority import AttAuthorityClient
20from ndg.security.common.AttCert import AttCertRead
21from ndg.security.common.X509 import X509CertParse, X509CertRead
22
23
24class AttAuthorityClientTestCase(unittest.TestCase):
25    clntPriKeyPwd = None
26    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
27
28    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
29        '''Read proxy cert and user cert from a single PEM file and put in
30        a list ready for input into SignatureHandler'''               
31        proxyCertFileTxt = open(proxyCertFilePath).read()
32       
33        pemPatRE = re.compile(self.__class__.pemPat, re.S)
34        x509CertList = pemPatRE.findall(proxyCertFileTxt)
35       
36        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
37                            x509CertList]
38   
39        # Expecting proxy cert first - move this to the end.  This will
40        # be the cert used to verify the message signature
41        signingCertChain.reverse()
42       
43        return signingCertChain
44
45
46    def setUp(self):
47
48        configParser = SafeConfigParser()
49        configParser.read("./attAuthorityClientTest.cfg")
50       
51        self.cfg = {}
52        for section in configParser.sections():
53            self.cfg[section] = dict(configParser.items(section))
54
55        tracefile = sys.stderr
56
57        if self.clntPriKeyPwd is None:
58            try:
59                if self.cfg['setUp'].get('clntprikeypwd') is None:
60                    self.clntPriKeyPwd = getpass.getpass(\
61                            prompt="\nsetUp - client private key password: ")
62                else:
63                    self.clntPriKeyPwd=self.cfg['setUp'].get('clntprikeypwd')
64            except KeyboardInterrupt:
65                sys.exit(0)
66
67        # List of CA certificates for use in validation of certs used in
68        # signature for server reponse
69        try:
70            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
71        except KeyError:
72            caCertFilePathList = []
73         
74        try:
75            sslCACertList = [X509CertRead(file) for file in \
76                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
77        except KeyError:
78            sslCACertList = []
79           
80         
81        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
82
83        # Check certificate types proxy or standard
84        proxyCertFilePath = self.cfg['setUp'].get('proxycertfilepath')
85        if proxyCertFilePath:
86            signingCertChain = \
87                        self._getCertChainFromProxyCertFile(proxyCertFilePath)
88        else:
89            signingCertChain = None
90               
91        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
92
93        # Instantiate WS proxy
94        self.clnt = AttAuthorityClient(uri=self.cfg['setUp']['uri'],
95            sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
96            sslCACertList=sslCACertList,
97            setSignatureHandler=setSignatureHandler,
98            reqBinSecTokValType=reqBinSecTokValType,
99            signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
100            signingCertChain=signingCertChain,
101            signingPriKeyFilePath=self.cfg['setUp'].get('clntprikeyfilepath'),
102            signingPriKeyPwd=self.clntPriKeyPwd,
103            caCertFilePathList=caCertFilePathList,
104            tracefile=sys.stderr)
105       
106        if 'NDGSEC_INT_DEBUG' in os.environ:
107            import pdb
108            pdb.set_trace()
109           
110   
111    def test1GetX509Cert(self):
112        '''test1GetX509Cert: retrieve Attribute Authority's X.509 cert.'''
113        resp = self.clnt.getX509Cert()
114        print "Attribute Authority X.509 cert.:\n" + resp
115
116    def test2GetHostInfo(self):
117        """test2GetHostInfo: retrieve info for AA host"""
118        hostInfo = self.clnt.getHostInfo()
119        print "Host Info:\n %s" % hostInfo
120       
121
122    def test3GetTrustedHostInfo(self):
123        """test3GetTrustedHostInfo: retrieve trusted host info matching a
124        given role"""
125        trustedHostInfo = self.clnt.getTrustedHostInfo(\
126                                 self.cfg['test3GetTrustedHostInfo']['role'])
127        for hostname, hostInfo in trustedHostInfo.items():
128            assert hostname, "Hostname not set"
129            for k, v in hostInfo.items():
130                assert k, "hostInfo value key unset"
131
132        print "Trusted Host Info:\n %s" % trustedHostInfo
133
134
135    def test4GetTrustedHostInfoWithNoRole(self):
136        """test4GetTrustedHostInfoWithNoRole: retrieve trusted host info
137        irrespective of role"""
138        trustedHostInfo = self.clnt.getTrustedHostInfo()
139        for hostname, hostInfo in trustedHostInfo.items():
140            assert hostname, "Hostname not set"
141            for k, v in hostInfo.items():
142                assert k, "hostInfo value key unset"
143                assert v, ("%s value not set" % k)
144                   
145        print "Trusted Host Info:\n %s" % trustedHostInfo
146       
147
148    def test4aGetAllHostsInfo(self):
149        """test4aGetAllHostsInfo: retrieve info for all hosts"""
150        allHostInfo = self.clnt.getAllHostsInfo()
151        for hostname, hostInfo in allHostInfo.items():
152            assert hostname, "Hostname not set"
153            for k, v in hostInfo.items():
154                assert k, "hostInfo value key unset"
155                   
156        print "All Hosts Info:\n %s" % allHostInfo
157
158
159    def test5GetAttCert(self):       
160        """test5GetAttCert: Request attribute certificate from NDG Attribute
161        Authority Web Service."""
162   
163        # Read user Certificate into a string ready for passing via WS
164        try:
165            userCertFilePath = \
166                self.cfg['test5GetAttCert'].get('issuingclntcertfilepath')
167            userCertTxt = open(userCertFilePath, 'r').read()
168       
169        except TypeError:
170            # No issuing cert set
171            userCertTxt = None
172               
173        except IOError, ioErr:
174            raise "Error reading certificate file \"%s\": %s" % \
175                                    (ioErr.filename, ioErr.strerror)
176
177        # Make attribute certificate request
178        attCert = self.clnt.getAttCert(userCert=userCertTxt)
179       
180        print "Attribute Certificate: \n\n:" + str(attCert)
181       
182        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
183        attCert.write()
184       
185    def test6GetAttCertWithUserIdSet(self):       
186        """test6GetAttCertWithUserIdSet: Request attribute certificate from
187        NDG Attribute Authority Web Service setting a specific user Id
188        independent of the signer of the SOAP request."""
189   
190        # Read user Certificate into a string ready for passing via WS
191        try:
192            userCertFilePath = \
193    self.cfg['test6GetAttCertWithUserIdSet'].get('issuingclntcertfilepath')
194            userCertTxt = open(userCertFilePath, 'r').read()
195       
196        except TypeError:
197            # No issuing cert set
198            userCertTxt = None
199               
200        except IOError, ioErr:
201            raise "Error reading certificate file \"%s\": %s" % \
202                                    (ioErr.filename, ioErr.strerror)
203
204        # Make attribute certificate request
205        userId = self.cfg['test6GetAttCertWithUserIdSet']['userid']
206        attCert = self.clnt.getAttCert(userId=userId,
207                                       userCert=userCertTxt)
208       
209        print "Attribute Certificate: \n\n:" + str(attCert)
210       
211        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
212        attCert.write()
213
214    def test7GetMappedAttCert(self):       
215        """test7GetMappedAttCert: Request mapped attribute certificate from
216        NDG Attribute Authority Web Service."""
217   
218        # Read user Certificate into a string ready for passing via WS
219        try:
220            userCertFilePath = \
221            self.cfg['test7GetMappedAttCert'].get('issuingclntcertfilepath')
222            userCertTxt = open(userCertFilePath, 'r').read()
223       
224        except TypeError:
225            # No issuing cert set
226            userCertTxt = None
227               
228        except IOError, ioErr:
229            raise "Error reading certificate file \"%s\": %s" % \
230                                    (ioErr.filename, ioErr.strerror)
231   
232   
233        # Simlarly for Attribute Certificate
234        try:
235            userAttCert = AttCertRead(\
236                self.cfg['test7GetMappedAttCert']['userattcertfilepath'])
237           
238        except IOError, ioErr:
239            raise "Error reading attribute certificate file \"%s\": %s" %\
240                                    (ioErr.filename, ioErr.strerror)
241
242        try:
243            if self.cfg['test7GetMappedAttCert'].get('clntprikeypwd') is None:
244                clntPriKeyPwd = getpass.getpass(\
245                            prompt="\nsetUp - client private key password: ")
246            else:
247                clntPriKeyPwd = \
248                        self.cfg['test7GetMappedAttCert'].get('clntprikeypwd')
249        except KeyboardInterrupt:
250            sys.exit(0)
251
252        # List of CA certificates for use in validation of certs used in
253        # signature for server reponse
254        try:
255            caCertFilePathList=\
256            self.cfg['test7GetMappedAttCert']['cacertfilepathlist'].split()
257        except:
258            caCertFilePathList = []
259           
260        reqBinSecTokValType = \
261                self.cfg['test7GetMappedAttCert'].get('reqbinsectokvaltype')
262       
263        # Check certificate types proxy or standard
264        proxyCertFilePath = \
265                    self.cfg['test7GetMappedAttCert'].get('proxycertfilepath')
266        if proxyCertFilePath:
267            signingCertChain = \
268                        self._getCertChainFromProxyCertFile(proxyCertFilePath)       
269        else:
270            signingCertChain = None
271
272        setSignatureHandler = \
273                eval(self.cfg['test7GetMappedAttCert']['setsignaturehandler'])
274       
275        # Make client to site B Attribute Authority
276        clnt = AttAuthorityClient(\
277uri=self.cfg['test7GetMappedAttCert']['uri'], 
278setSignatureHandler=setSignatureHandler,
279reqBinSecTokValType=reqBinSecTokValType,
280signingCertFilePath=self.cfg['test7GetMappedAttCert'].get('clntcertfilepath'),
281signingCertChain=signingCertChain,
282signingPriKeyFilePath=self.cfg['test7GetMappedAttCert'].get('clntprikeyfilepath'),
283signingPriKeyPwd=clntPriKeyPwd,
284caCertFilePathList=caCertFilePathList,
285tracefile=sys.stderr)
286   
287        # Make attribute certificate request
288        attCert = clnt.getAttCert(userCert=userCertTxt,
289                                  userAttCert=userAttCert)
290        print "Attribute Certificate: \n\n:" + str(attCert)
291       
292        attCert.filePath = \
293                    self.cfg['test7GetMappedAttCert']['mappedattcertfilepath']
294        attCert.write()
295       
296       
297    def test8GetMappedAttCertStressTest(self):       
298        """test8GetMappedAttCertStressTest: Request mapped attribute certificate from
299        NDG Attribute Authority Web Service."""
300   
301        # Read user Certificate into a string ready for passing via WS
302        try:
303            userCertFilePath = \
304    self.cfg['test8GetMappedAttCertStressTest'].get('issuingclntcertfilepath')
305            userCertTxt = open(userCertFilePath, 'r').read()
306       
307        except TypeError:
308            # No issuing cert set
309            userCertTxt = None
310               
311        except IOError, ioErr:
312            raise "Error reading certificate file \"%s\": %s" % \
313                                    (ioErr.filename, ioErr.strerror)
314
315        try:
316            if self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd') is None:
317                clntPriKeyPwd = getpass.getpass(\
318                            prompt="\nsetUp - client private key password: ")
319            else:
320                clntPriKeyPwd = \
321            self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd')
322        except KeyboardInterrupt:
323            sys.exit(0)
324
325        # List of CA certificates for use in validation of certs used in
326        # signature for server reponse
327        try:
328            caCertFilePathList=\
329    self.cfg['test8GetMappedAttCertStressTest']['cacertfilepathlist'].split()
330        except:
331            caCertFilePathList = []
332           
333        reqBinSecTokValType = \
334        self.cfg['test8GetMappedAttCertStressTest'].get('reqbinsectokvaltype')
335       
336        # Check certificate types proxy or standard
337        proxyCertFilePath = \
338        self.cfg['test8GetMappedAttCertStressTest'].get('proxycertfilepath')
339        if proxyCertFilePath:
340            signingCertChain = \
341                        self._getCertChainFromProxyCertFile(proxyCertFilePath)       
342        else:
343            signingCertChain = None
344
345        setSignatureHandler = \
346    eval(self.cfg['test8GetMappedAttCertStressTest']['setsignaturehandler'])
347       
348        # Make client to site B Attribute Authority
349        clnt = AttAuthorityClient(\
350uri=self.cfg['test8GetMappedAttCertStressTest']['uri'], 
351setSignatureHandler=setSignatureHandler,
352reqBinSecTokValType=reqBinSecTokValType,
353signingCertChain=signingCertChain,
354signingCertFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('clntcertfilepath'),
355signingPriKeyFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeyfilepath'),
356signingPriKeyPwd=clntPriKeyPwd,
357caCertFilePathList=caCertFilePathList,
358tracefile=sys.stderr)
359
360        acFilePathList = \
361self.cfg['test8GetMappedAttCertStressTest']['userattcertfilepathlist'].split()
362
363        for acFilePath in acFilePathList:
364            try:
365                userAttCert = AttCertRead(acFilePath)
366               
367            except IOError, ioErr:
368                raise "Error reading attribute certificate file \"%s\": %s" %\
369                                        (ioErr.filename, ioErr.strerror)
370       
371            # Make attribute certificate request
372            try:
373                attCert = clnt.getAttCert(userCert=userCertTxt,
374                                          userAttCert=userAttCert)
375            except Exception, e:
376                outFilePfx = 'test8GetMappedAttCertStressTest-%s' % \
377                        os.path.basename(acFilePath)   
378                msgFile = open(outFilePfx+".msg", 'w')
379                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
380             
381#_____________________________________________________________________________       
382class AttAuthorityClientTestSuite(unittest.TestSuite):
383    def __init__(self):
384        map = map(AttAuthorityClientTestCase,
385                  (
386                    "test1GetX509Cert",
387                    "test2GetHostInfo",
388                    "test3GetTrustedHostInfo",
389                    "test4GetTrustedHostInfoWithNoRole",
390                    "test5GetAttCert",
391                    "test6GetAttCertWithUserIdSet",
392                    "test7GetMappedAttCert",
393                    "test8GetMappedAttCertStressTest",
394                  ))
395        unittest.TestSuite.__init__(self, map)
396                                       
397if __name__ == "__main__":
398    unittest.main()
Note: See TracBrowser for help on using the repository browser.