source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py @ 3001

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py@3001
Revision 3001, 14.6 KB checked in by pjkersha, 13 years ago (diff)

python/ndg.security.server/ndg/security/server/AttAuthority/init.py: role mapping entries are not necessary for <trusted> blocks in map config file - altered readMapConfig accordingly.

python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py: fix to test8GetMappedAttCertStressTest - set signingCertChain to None in AttAuthorityClient? when not using proxy certs.

python/ndg.security.test/ndg/security/test/AttAuthority/attAuthorityClientTest.cfg: try out GetMappedAttCertStressTest? with standard certs rather than proxy certs.

python/ndg.security.test/ndg/security/test/SessionMgr/README: fixed instructions for running tests.

python/ndg.security.common/ndg/security/common/CredWallet.py: fixed typo self.debug -> log.debug.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2007 STFC & NERC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "P.J.Kershaw@rl.ac.uk"
13__revision__ = '$Id$'
14
15import unittest
16import os, sys, getpass, re
17from ConfigParser import SafeConfigParser
18
19from ndg.security.common.AttAuthority import AttAuthorityClient
20from ndg.security.common.AttCert import AttCertRead
21from ndg.security.common.X509 import X509CertParse, X509CertRead
22
23
24class AttAuthorityClientTestCase(unittest.TestCase):
25    clntPriKeyPwd = None
26    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
27
28    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
29        '''Read proxy cert and user cert from a single PEM file and put in
30        a list ready for input into SignatureHandler'''               
31        proxyCertFileTxt = open(proxyCertFilePath).read()
32       
33        pemPatRE = re.compile(self.__class__.pemPat, re.S)
34        x509CertList = pemPatRE.findall(proxyCertFileTxt)
35       
36        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
37                            x509CertList]
38   
39        # Expecting proxy cert first - move this to the end.  This will
40        # be the cert used to verify the message signature
41        signingCertChain.reverse()
42       
43        return signingCertChain
44
45
46    def setUp(self):
47
48        configParser = SafeConfigParser()
49        configParser.read("./attAuthorityClientTest.cfg")
50       
51        self.cfg = {}
52        for section in configParser.sections():
53            self.cfg[section] = dict(configParser.items(section))
54
55        tracefile = sys.stderr
56
57        if self.clntPriKeyPwd is None:
58            try:
59                if self.cfg['setUp'].get('clntprikeypwd') is None:
60                    self.clntPriKeyPwd = getpass.getpass(\
61                            prompt="\nsetUp - client private key password: ")
62                else:
63                    self.clntPriKeyPwd=self.cfg['setUp'].get('clntprikeypwd')
64            except KeyboardInterrupt:
65                sys.exit(0)
66
67        # List of CA certificates for use in validation of certs used in
68        # signature for server reponse
69        try:
70            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
71        except KeyError:
72            caCertFilePathList = []
73         
74        try:
75            sslCACertList = [X509CertRead(file) for file in \
76                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
77        except KeyError:
78            sslCACertList = []
79           
80         
81        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
82
83        # Check certificate types proxy or standard
84        proxyCertFilePath = self.cfg['setUp'].get('proxycertfilepath')
85        if proxyCertFilePath:
86            signingCertChain = \
87                        self._getCertChainFromProxyCertFile(proxyCertFilePath)
88        else:
89            signingCertChain = None
90               
91        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
92
93        # Instantiate WS proxy
94        self.clnt = AttAuthorityClient(uri=self.cfg['setUp']['uri'],
95            sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
96            sslCACertList=sslCACertList,
97            setSignatureHandler=setSignatureHandler,
98            reqBinSecTokValType=reqBinSecTokValType,
99            signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
100            signingCertChain=signingCertChain,
101            signingPriKeyFilePath=self.cfg['setUp'].get('clntprikeyfilepath'),
102            signingPriKeyPwd=self.clntPriKeyPwd,
103            caCertFilePathList=caCertFilePathList,
104            tracefile=sys.stderr)
105           
106   
107    def test1GetX509Cert(self):
108        '''test1GetX509Cert: retrieve Attribute Authority's X.509 cert.'''
109        resp = self.clnt.getX509Cert()
110        print "Attribute Authority X.509 cert.:\n" + resp
111
112    def test2GetHostInfo(self):
113        """test2GetHostInfo: retrieve info for AA host"""
114        hostInfo = self.clnt.getHostInfo()
115        print "Host Info:\n %s" % hostInfo
116       
117
118    def test3GetTrustedHostInfo(self):
119        """test3GetTrustedHostInfo: retrieve trusted host info matching a
120        given role"""
121        trustedHostInfo = self.clnt.getTrustedHostInfo(\
122                                 self.cfg['test3GetTrustedHostInfo']['role'])
123        print "Trusted Host Info:\n %s" % trustedHostInfo
124
125
126    def test4GetTrustedHostInfoWithNoRole(self):
127        """test4GetTrustedHostInfoWithNoRole: retrieve trusted host info
128        irrespective of role"""
129        trustedHostInfo = self.clnt.getTrustedHostInfo()
130        print "Trusted Host Info:\n %s" % trustedHostInfo
131       
132
133    def test4aGetAllHostsInfo(self):
134        """test4aGetAllHostsInfo: retrieve info for all hosts"""
135        hostInfo = self.clnt.getAllHostsInfo()
136        print "All Hosts Info:\n %s" % hostInfo
137
138
139    def test5GetAttCert(self):       
140        """test5GetAttCert: Request attribute certificate from NDG Attribute
141        Authority Web Service."""
142   
143        # Read user Certificate into a string ready for passing via WS
144        try:
145            userCertFilePath = \
146                self.cfg['test5GetAttCert'].get('issuingclntcertfilepath')
147            userCertTxt = open(userCertFilePath, 'r').read()
148       
149        except TypeError:
150            # No issuing cert set
151            userCertTxt = None
152               
153        except IOError, ioErr:
154            raise "Error reading certificate file \"%s\": %s" % \
155                                    (ioErr.filename, ioErr.strerror)
156
157        # Make attribute certificate request
158        attCert = self.clnt.getAttCert(userCert=userCertTxt)
159       
160        print "Attribute Certificate: \n\n:" + str(attCert)
161       
162        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
163        attCert.write()
164       
165    def test6GetAttCertWithUserIdSet(self):       
166        """test6GetAttCertWithUserIdSet: Request attribute certificate from
167        NDG Attribute Authority Web Service setting a specific user Id
168        independent of the signer of the SOAP request."""
169   
170        # Read user Certificate into a string ready for passing via WS
171        try:
172            userCertFilePath = \
173    self.cfg['test6GetAttCertWithUserIdSet'].get('issuingclntcertfilepath')
174            userCertTxt = open(userCertFilePath, 'r').read()
175       
176        except TypeError:
177            # No issuing cert set
178            userCertTxt = None
179               
180        except IOError, ioErr:
181            raise "Error reading certificate file \"%s\": %s" % \
182                                    (ioErr.filename, ioErr.strerror)
183
184        # Make attribute certificate request
185        userId = self.cfg['test6GetAttCertWithUserIdSet']['userid']
186        attCert = self.clnt.getAttCert(userId=userId,
187                                       userCert=userCertTxt)
188       
189        print "Attribute Certificate: \n\n:" + str(attCert)
190       
191        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
192        attCert.write()
193
194    def test7GetMappedAttCert(self):       
195        """test7GetMappedAttCert: Request mapped attribute certificate from
196        NDG Attribute Authority Web Service."""
197   
198        # Read user Certificate into a string ready for passing via WS
199        try:
200            userCertFilePath = \
201            self.cfg['test7GetMappedAttCert'].get('issuingclntcertfilepath')
202            userCertTxt = open(userCertFilePath, 'r').read()
203       
204        except TypeError:
205            # No issuing cert set
206            userCertTxt = None
207               
208        except IOError, ioErr:
209            raise "Error reading certificate file \"%s\": %s" % \
210                                    (ioErr.filename, ioErr.strerror)
211   
212   
213        # Simlarly for Attribute Certificate
214        try:
215            userAttCert = AttCertRead(\
216                self.cfg['test7GetMappedAttCert']['userattcertfilepath'])
217           
218        except IOError, ioErr:
219            raise "Error reading attribute certificate file \"%s\": %s" %\
220                                    (ioErr.filename, ioErr.strerror)
221
222        try:
223            if self.cfg['test7GetMappedAttCert'].get('clntprikeypwd') is None:
224                clntPriKeyPwd = getpass.getpass(\
225                            prompt="\nsetUp - client private key password: ")
226            else:
227                clntPriKeyPwd = \
228                        self.cfg['test7GetMappedAttCert'].get('clntprikeypwd')
229        except KeyboardInterrupt:
230            sys.exit(0)
231
232        # List of CA certificates for use in validation of certs used in
233        # signature for server reponse
234        try:
235            caCertFilePathList=\
236            self.cfg['test7GetMappedAttCert']['cacertfilepathlist'].split()
237        except:
238            caCertFilePathList = []
239           
240        reqBinSecTokValType = \
241                self.cfg['test7GetMappedAttCert'].get('reqbinsectokvaltype')
242       
243        # Check certificate types proxy or standard
244        proxyCertFilePath = \
245                    self.cfg['test7GetMappedAttCert'].get('proxycertfilepath')
246        if proxyCertFilePath:
247            signingCertChain = \
248                        self._getCertChainFromProxyCertFile(proxyCertFilePath)       
249        else:
250            signingCertChain = None
251
252        setSignatureHandler = \
253                eval(self.cfg['test7GetMappedAttCert']['setsignaturehandler'])
254       
255        # Make client to site B Attribute Authority
256        clnt = AttAuthorityClient(\
257uri=self.cfg['test7GetMappedAttCert']['uri'], 
258setSignatureHandler=setSignatureHandler,
259reqBinSecTokValType=reqBinSecTokValType,
260signingCertFilePath=self.cfg['test7GetMappedAttCert'].get('clntcertfilepath'),
261signingCertChain=signingCertChain,
262signingPriKeyFilePath=self.cfg['test7GetMappedAttCert'].get('clntprikeyfilepath'),
263signingPriKeyPwd=clntPriKeyPwd,
264caCertFilePathList=caCertFilePathList,
265tracefile=sys.stderr)
266   
267        # Make attribute certificate request
268        attCert = clnt.getAttCert(userCert=userCertTxt,
269                                  userAttCert=userAttCert)
270        print "Attribute Certificate: \n\n:" + str(attCert)
271       
272        attCert.filePath = \
273                    self.cfg['test7GetMappedAttCert']['mappedattcertfilepath']
274        attCert.write()
275       
276       
277    def test8GetMappedAttCertStressTest(self):       
278        """test8GetMappedAttCertStressTest: Request mapped attribute certificate from
279        NDG Attribute Authority Web Service."""
280   
281        # Read user Certificate into a string ready for passing via WS
282        try:
283            userCertFilePath = \
284    self.cfg['test8GetMappedAttCertStressTest'].get('issuingclntcertfilepath')
285            userCertTxt = open(userCertFilePath, 'r').read()
286       
287        except TypeError:
288            # No issuing cert set
289            userCertTxt = None
290               
291        except IOError, ioErr:
292            raise "Error reading certificate file \"%s\": %s" % \
293                                    (ioErr.filename, ioErr.strerror)
294
295        try:
296            if self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd') is None:
297                clntPriKeyPwd = getpass.getpass(\
298                            prompt="\nsetUp - client private key password: ")
299            else:
300                clntPriKeyPwd = \
301            self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd')
302        except KeyboardInterrupt:
303            sys.exit(0)
304
305        # List of CA certificates for use in validation of certs used in
306        # signature for server reponse
307        try:
308            caCertFilePathList=\
309    self.cfg['test8GetMappedAttCertStressTest']['cacertfilepathlist'].split()
310        except:
311            caCertFilePathList = []
312           
313        reqBinSecTokValType = \
314        self.cfg['test8GetMappedAttCertStressTest'].get('reqbinsectokvaltype')
315       
316        # Check certificate types proxy or standard
317        proxyCertFilePath = \
318        self.cfg['test8GetMappedAttCertStressTest'].get('proxycertfilepath')
319        if proxyCertFilePath:
320            signingCertChain = \
321                        self._getCertChainFromProxyCertFile(proxyCertFilePath)       
322        else:
323            signingCertChain = None
324
325        setSignatureHandler = \
326    eval(self.cfg['test8GetMappedAttCertStressTest']['setsignaturehandler'])
327       
328        # Make client to site B Attribute Authority
329        clnt = AttAuthorityClient(\
330uri=self.cfg['test8GetMappedAttCertStressTest']['uri'], 
331setSignatureHandler=setSignatureHandler,
332reqBinSecTokValType=reqBinSecTokValType,
333signingCertChain=signingCertChain,
334signingCertFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('clntcertfilepath'),
335signingPriKeyFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeyfilepath'),
336signingPriKeyPwd=clntPriKeyPwd,
337caCertFilePathList=caCertFilePathList,
338tracefile=sys.stderr)
339        import pdb;pdb.set_trace()
340        acFilePathList = \
341self.cfg['test8GetMappedAttCertStressTest']['userattcertfilepathlist'].split()
342
343        for acFilePath in acFilePathList:
344            try:
345                userAttCert = AttCertRead(acFilePath)
346               
347            except IOError, ioErr:
348                raise "Error reading attribute certificate file \"%s\": %s" %\
349                                        (ioErr.filename, ioErr.strerror)
350       
351            # Make attribute certificate request
352            try:
353                attCert = clnt.getAttCert(userCert=userCertTxt,
354                                          userAttCert=userAttCert)
355            except Exception, e:
356                outFilePfx = 'test8GetMappedAttCertStressTest-%s' % \
357                        os.path.basename(acFilePath)   
358                msgFile = open(outFilePfx+".msg", 'w')
359                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
360             
361#_____________________________________________________________________________       
362class AttAuthorityClientTestSuite(unittest.TestSuite):
363    def __init__(self):
364        map = map(AttAuthorityClientTestCase,
365                  (
366                    "test1GetX509Cert",
367                    "test2GetHostInfo",
368                    "test3GetTrustedHostInfo",
369                    "test4GetTrustedHostInfoWithNoRole",
370                    "test5GetAttCert",
371                    "test6GetAttCertWithUserIdSet",
372                    "test7GetMappedAttCert",
373                    "test8GetMappedAttCertStressTest",
374                  ))
375        unittest.TestSuite.__init__(self, map)
376                                       
377if __name__ == "__main__":
378    unittest.main()
Note: See TracBrowser for help on using the repository browser.