source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py @ 2510

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py@2510
Revision 2510, 10.9 KB checked in by pjkersha, 13 years ago (diff)

ndg.security.server/ndg/security/server/AttAuthority/server-config.tac:
fix to caCertFilePathList input to SignatureHandler?. Correctly initialise
if not set.

ndg.security.server/ndg/security/server/AttAuthority/init.py:
Corrected error message text for where a user is not registered or no
mapping is available: ref. userId rather than AC holder DN to allow for the
case in DEWS where a userId distinct from a Proxy cert. DN is used.

ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py:
added test8GetMappedAttCertStressTest test for WebSphere? integration tests.
It makes multiple calls with different ACs input to check for errors in
signature or verification.

ndg.security.test/ndg/security/test/AttAuthority/attAuthorityClientTest.cfg:
added additional config params for the above.

ndg.security.test/ndg/security/test/MyProxy/myProxyProperties.xml and
ndg.security.test/ndg/security/test/MyProxy/myProxyClientTest.cfg:
switched cert ID of test machine.

ndg.security.common/ndg/security/common/X509.py:

  • new X509Cert methods asDER and asPEM to convert to these formats.

toString now calls to asPEM

  • new class X509Stack to wrap M2Crypto.X509.X509_Stack. This includes an

extra method, verifyCertChain, to verify a chain of trust in the certs
contained in the stack.

  • standalone function, X509StackParseFromDER, wraps

M2Crypto.X509.new_stack_from_der

  • fix to X500DN class to enable correct parsing of proxy certificate DNs.

These have multiple CN entries. These are represented by changing the CN
dict entry to a tuple when initialised.

ndg.security.common/ndg/security/common/wsSecurity.py: changes to enable
handling of certificate chains in WSSE BinarySecurityToken? elements. This
will enable use of proxy certificates with signatures as their chain of
trust is proxy cert -> user cert -> CA cert rather than just cert -> CA cert.

types.

BinarySecurityToken? ValueType? to use

  • SignatureHandler?.init includes new signingCertChain keyword.
  • signingCertChain attribute of class enables setting of an X509Stack object

to assign to BinarySecurityToken?.

then Base 64 encode rather than converting into PEM and then having to
strip BEGIN CERT / END CERT delimiters.

to enable check of Canonicalization - REMOVE in future check in.

BinarySecurityToken? ValueTypes? - 'X509PKIPathv1', 'X509' and 'X509v3'

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority client unit tests
3
4NERC Data Grid Project
5
6@author P J Kershaw 05/05/05, major update 16/01/07
7
8@copyright (C) 2007 CCLRC & NERC
9
10@license This software may be distributed under the terms of the Q Public
11License, version 1.0 or later.
12"""
13
14__revision__ = '$Id$'
15
16import unittest
17import os, sys, getpass
18from ConfigParser import SafeConfigParser
19
20from ndg.security.common.AttAuthority import AttAuthorityClient
21from ndg.security.common.AttCert import AttCertRead
22
23
24class AttAuthorityClientTestCase(unittest.TestCase):
25    userPriKeyPwd = None
26   
27    def setUp(self):
28
29        configParser = SafeConfigParser()
30        configParser.read("./attAuthorityClientTest.cfg")
31       
32        self.cfg = {}
33        for section in configParser.sections():
34            self.cfg[section] = dict(configParser.items(section))
35
36        tracefile = sys.stderr
37
38        if self.userPriKeyPwd is None:
39            try:
40                if self.cfg['setUp'].get('userprikeypwd') is None:
41                    self.userPriKeyPwd = getpass.getpass(\
42                            prompt="\nsetUp - client private key password: ")
43                else:
44                    self.userPriKeyPwd=self.cfg['setUp'].get('clntprikeypwd')
45            except KeyboardInterrupt:
46                sys.exit(0)
47
48        # List of CA certificates for use in validation of certs used in
49        # signature for server reponse
50        try:
51            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
52        except:
53            caCertFilePathList = []
54           
55        # Instantiate WS proxy
56        self.clnt = AttAuthorityClient(uri=self.cfg['setUp']['uri'],
57           signingCertFilePath=self.cfg['setUp'].get('usercertfilepath'),
58           signingPriKeyFilePath=self.cfg['setUp'].get('userprikeyfilepath'),
59           signingPriKeyPwd=self.userPriKeyPwd,
60           caCertFilePathList=caCertFilePathList,
61           tracefile=sys.stderr)
62           
63   
64    def test1GetX509Cert(self):
65        '''test1GetX509Cert: retrieve Attribute Authority's X.509 cert.'''
66        resp = self.clnt.getX509Cert()
67        print "Attribute Authority X.509 cert.:\n" + resp
68
69    def test2GetHostInfo(self):
70        """test2GetHostInfo: retrieve info for AA host"""
71        hostInfo = self.clnt.getHostInfo()
72        print "Host Info:\n %s" % hostInfo
73       
74
75    def test3GetTrustedHostInfo(self):
76        """test3GetTrustedHostInfo: retrieve trusted host info matching a
77        given role"""
78        trustedHostInfo = self.clnt.getTrustedHostInfo(\
79                                 self.cfg['test3GetTrustedHostInfo']['role'])
80        print "Trusted Host Info:\n %s" % trustedHostInfo
81
82
83    def test4GetTrustedHostInfoWithNoRole(self):
84        """test4GetTrustedHostInfoWithNoRole: retrieve trusted host info
85        irrespective of role"""
86        trustedHostInfo = self.clnt.getTrustedHostInfo()
87        print "Trusted Host Info:\n %s" % trustedHostInfo
88
89
90    def test5GetAttCert(self):       
91        """test5GetAttCert: Request attribute certificate from NDG Attribute
92        Authority Web Service."""
93   
94        # Read user Certificate into a string ready for passing via WS
95        try:
96            userCertFilePath = \
97                self.cfg['test5GetAttCert'].get('issuingusercertfilepath')
98            userCertTxt = open(userCertFilePath, 'r').read()
99       
100        except TypeError:
101            # No issuing cert set
102            userCertTxt = None
103               
104        except IOError, ioErr:
105            raise "Error reading certificate file \"%s\": %s" % \
106                                    (ioErr.filename, ioErr.strerror)
107
108        # Make attribute certificate request
109        attCert = self.clnt.getAttCert(userCert=userCertTxt)
110       
111        print "Attribute Certificate: \n\n:" + str(attCert)
112       
113        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
114        attCert.write()
115       
116    def test6GetAttCertWithUserIdSet(self):       
117        """test6GetAttCertWithUserIdSet: Request attribute certificate from
118        NDG Attribute Authority Web Service setting a specific user Id
119        independent of the signer of the SOAP request."""
120   
121        # Read user Certificate into a string ready for passing via WS
122        try:
123            userCertFilePath = \
124    self.cfg['test6GetAttCertWithUserIdSet'].get('issuingusercertfilepath')
125            userCertTxt = open(userCertFilePath, 'r').read()
126       
127        except TypeError:
128            # No issuing cert set
129            userCertTxt = None
130               
131        except IOError, ioErr:
132            raise "Error reading certificate file \"%s\": %s" % \
133                                    (ioErr.filename, ioErr.strerror)
134
135        # Make attribute certificate request
136        userId = self.cfg['test6GetAttCertWithUserIdSet']['userid']
137        attCert = self.clnt.getAttCert(userId=userId,
138                                       userCert=userCertTxt)
139       
140        print "Attribute Certificate: \n\n:" + str(attCert)
141       
142        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
143        attCert.write()
144
145    def test7GetMappedAttCert(self):       
146        """test7GetMappedAttCert: Request mapped attribute certificate from
147        NDG Attribute Authority Web Service."""
148   
149        # Read user Certificate into a string ready for passing via WS
150        try:
151            userCertFilePath = \
152            self.cfg['test7GetMappedAttCert'].get('issuingusercertfilepath')
153            userCertTxt = open(userCertFilePath, 'r').read()
154       
155        except TypeError:
156            # No issuing cert set
157            userCertTxt = None
158               
159        except IOError, ioErr:
160            raise "Error reading certificate file \"%s\": %s" % \
161                                    (ioErr.filename, ioErr.strerror)
162   
163   
164        # Simlarly for Attribute Certificate
165        try:
166            userAttCert = AttCertRead(\
167                self.cfg['test7GetMappedAttCert']['userattcertfilepath'])
168           
169        except IOError, ioErr:
170            raise "Error reading attribute certificate file \"%s\": %s" %\
171                                    (ioErr.filename, ioErr.strerror)
172
173        try:
174            if self.cfg['test7GetMappedAttCert'].get('userprikeypwd') is None:
175                userPriKeyPwd = getpass.getpass(\
176                            prompt="\nsetUp - client private key password: ")
177            else:
178                userPriKeyPwd = \
179                        self.cfg['test7GetMappedAttCert'].get('userprikeypwd')
180        except KeyboardInterrupt:
181            sys.exit(0)
182
183        # List of CA certificates for use in validation of certs used in
184        # signature for server reponse
185        try:
186            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
187        except:
188            caCertFilePathList = []
189       
190        # Make client to site B Attribute Authority
191        clnt = AttAuthorityClient(\
192uri=self.cfg['test7GetMappedAttCert']['uri'], 
193signingCertFilePath=self.cfg['test7GetMappedAttCert'].get('usercertfilepath'),
194signingPriKeyFilePath=self.cfg['test7GetMappedAttCert'].get('userprikeyfilepath'),
195signingPriKeyPwd=userPriKeyPwd,
196caCertFilePathList=caCertFilePathList,
197tracefile=sys.stderr)
198   
199        # Make attribute certificate request
200        attCert = clnt.getAttCert(userCert=userCertTxt,
201                                  userAttCert=userAttCert)
202        print "Attribute Certificate: \n\n:" + str(attCert)
203       
204        attCert.filePath = \
205                    self.cfg['test7GetMappedAttCert']['mappedattcertfilepath']
206        attCert.write()
207       
208       
209    def test8GetMappedAttCertStressTest(self):       
210        """test8GetMappedAttCertStressTest: Request mapped attribute certificate from
211        NDG Attribute Authority Web Service."""
212   
213        # Read user Certificate into a string ready for passing via WS
214        try:
215            userCertFilePath = \
216    self.cfg['test8GetMappedAttCertStressTest'].get('issuingusercertfilepath')
217            userCertTxt = open(userCertFilePath, 'r').read()
218       
219        except TypeError:
220            # No issuing cert set
221            userCertTxt = None
222               
223        except IOError, ioErr:
224            raise "Error reading certificate file \"%s\": %s" % \
225                                    (ioErr.filename, ioErr.strerror)
226
227        try:
228            if self.cfg['test8GetMappedAttCertStressTest'].get('userprikeypwd') is None:
229                userPriKeyPwd = getpass.getpass(\
230                            prompt="\nsetUp - client private key password: ")
231            else:
232                userPriKeyPwd = \
233            self.cfg['test8GetMappedAttCertStressTest'].get('userprikeypwd')
234        except KeyboardInterrupt:
235            sys.exit(0)
236
237        # List of CA certificates for use in validation of certs used in
238        # signature for server reponse
239        try:
240            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
241        except:
242            caCertFilePathList = []
243       
244        # Make client to site B Attribute Authority
245        clnt = AttAuthorityClient(\
246uri=self.cfg['test8GetMappedAttCertStressTest']['uri'], 
247signingCertFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('usercertfilepath'),
248signingPriKeyFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('userprikeyfilepath'),
249signingPriKeyPwd=userPriKeyPwd,
250caCertFilePathList=caCertFilePathList,
251tracefile=sys.stderr)
252
253        acFilePathList = \
254self.cfg['test8GetMappedAttCertStressTest']['userattcertfilepathlist'].split()
255
256        for acFilePath in acFilePathList:
257            try:
258                userAttCert = AttCertRead(acFilePath)
259               
260            except IOError, ioErr:
261                raise "Error reading attribute certificate file \"%s\": %s" %\
262                                        (ioErr.filename, ioErr.strerror)
263       
264            # Make attribute certificate request
265            try:
266                attCert = clnt.getAttCert(userCert=userCertTxt,
267                                          userAttCert=userAttCert)
268            except Exception, e:
269                msgFile = open(outFilePfx+".msg", 'w')
270                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
271             
272#_____________________________________________________________________________       
273class AttAuthorityClientTestSuite(unittest.TestSuite):
274    def __init__(self):
275        map = map(AttAuthorityClientTestCase,
276                  (
277                    "test1GetX509Cert",
278                    "test2GetHostInfo",
279                    "test3GetTrustedHostInfo",
280                    "test4GetTrustedHostInfoWithNoRole",
281                    "test5GetAttCert",
282                    "test6GetAttCertWithUserIdSet",
283                    "test7GetMappedAttCert",
284                    "test8GetMappedAttCertStressTest",
285                  ))
286        unittest.TestSuite.__init__(self, map)
287                                       
288if __name__ == "__main__":
289    unittest.main()
Note: See TracBrowser for help on using the repository browser.