source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py @ 2530

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py@2530
Revision 2530, 13.8 KB checked in by pjkersha, 13 years ago (diff)

Working Session Manager unit tests for connect and disconmect calls and
getAttCert calls. Correct use of proxy certs with WS-Security signature
interface is also configured.

ndg.security.server/ndg/security/server/AttAuthority/server-config.tac:
removed blank line

ndg.security.server/ndg/security/server/conf/sessionMgrProperties.xml:
added setting for signature handler flag and CA cert

ndg.security.server/ndg/security/server/SessionMgr/server-config.tac:

  • fix to soap_disconnect - call SessionMgr?.deleteUserSession
  • fix to soap_getX509Cert - base64 encode DER format cert output
  • added 'useSignatureHandler' flag to enable WS-Security signature handling

to be omitted if required.

ndg.security.server/ndg/security/server/SessionMgr/init.py:

  • ref to CredWalletInvalidUserX509Cert
  • give explicit keyword names in connect2UserSession method signature
  • raise CredWalletInvalidUserX509Cert if Credential Wallet cert is invalid
  • SessionMgr?.deleteUserSession method - added userSess keyword; fixed userDN

setting to ensure its a string

ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py,
ndg.security.test/ndg/security/test/AttAuthority/attAuthorityClientTest.cfg:
cosmetic changes

ndg.security.test/ndg/security/test/SessionMgr/SessionMgrClientTest.py:

  • added _getCertChainFromProxyCertFile method to enable correct proxy cert

loading

  • added caCertFilePathList, reqBinSecTokValType, setSignatureHandler and

signingCertChain keyword settings to SessionMgrClient? initialisation

  • removed duplicated test6bCookieGetMappedAttCert method

ndg.security.test/ndg/security/test/SessionMgr/sessionMgrProperties.xml:

  • dropped serverCNprefix element setting - not needed for test certs used.

ndg.security.test/ndg/security/test/SessionMgr/sessionMgrClientTest.cfg:

  • added new params caCertFilePathList, reqBinSecTokValType,

setSignatureHandler and proxycertfilepath

ndg.security.common/ndg/security/common/SessionMgr/init.py:

SignatureHandler? to switched on/off

ndg.security.common/ndg/security/common/AttAuthority/init.py: fix to
pydoc for AttAuthorityClient?.init

ndg.security.common/ndg/security/common/CredWallet.py: major fixes for
SessionMgr? - AA calls -

  • CredWalletInvalidUserX509Cert new exception type raised if user cert is

invalid

  • separate setAAuri into a new method createAAClnt
  • getAttCert method can take an aaClnt keyword. This enables the client

object to the AA to call to be passed in. Default is the target AA,
self.aaClnt.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority client unit tests
3
4NERC Data Grid Project
5
6@author P J Kershaw 05/05/05, major update 16/01/07
7
8@copyright (C) 2007 CCLRC & NERC
9
10@license This software may be distributed under the terms of the Q Public
11License, version 1.0 or later.
12"""
13
14__revision__ = '$Id:$'
15
16import unittest
17import os, sys, getpass, re
18from ConfigParser import SafeConfigParser
19
20from ndg.security.common.AttAuthority import AttAuthorityClient
21from ndg.security.common.AttCert import AttCertRead
22from ndg.security.common.X509 import X509CertParse
23
24
25class AttAuthorityClientTestCase(unittest.TestCase):
26    clntPriKeyPwd = None
27    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
28
29    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
30        '''Read proxy cert and user cert from a single PEM file and put in
31        a list ready for input into SignatureHandler'''               
32        proxyCertFileTxt = open(proxyCertFilePath).read()
33       
34        pemPatRE = re.compile(self.__class__.pemPat, re.S)
35        x509CertList = pemPatRE.findall(proxyCertFileTxt)
36       
37        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
38                            x509CertList]
39   
40        # Expecting proxy cert first - move this to the end.  This will
41        # be the cert used to verify the message signature
42        signingCertChain.reverse()
43       
44        return signingCertChain
45
46
47    def setUp(self):
48
49        configParser = SafeConfigParser()
50        configParser.read("./attAuthorityClientTest.cfg")
51       
52        self.cfg = {}
53        for section in configParser.sections():
54            self.cfg[section] = dict(configParser.items(section))
55
56        tracefile = sys.stderr
57
58        if self.clntPriKeyPwd is None:
59            try:
60                if self.cfg['setUp'].get('clntprikeypwd') is None:
61                    self.clntPriKeyPwd = getpass.getpass(\
62                            prompt="\nsetUp - client private key password: ")
63                else:
64                    self.clntPriKeyPwd=self.cfg['setUp'].get('clntprikeypwd')
65            except KeyboardInterrupt:
66                sys.exit(0)
67
68        # List of CA certificates for use in validation of certs used in
69        # signature for server reponse
70        try:
71            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
72        except:
73            caCertFilePathList = []
74         
75         
76        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
77
78        # Check certificate types proxy or standard
79        proxyCertFilePath = self.cfg['setUp'].get('proxycertfilepath')
80        if proxyCertFilePath:
81            signingCertChain = \
82                        self._getCertChainFromProxyCertFile(proxyCertFilePath)
83           
84        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
85
86        # Instantiate WS proxy
87        self.clnt = AttAuthorityClient(uri=self.cfg['setUp']['uri'],
88            setSignatureHandler=setSignatureHandler,
89            reqBinSecTokValType=reqBinSecTokValType,
90            signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
91            signingCertChain=signingCertChain,
92            signingPriKeyFilePath=self.cfg['setUp'].get('clntprikeyfilepath'),
93            signingPriKeyPwd=self.clntPriKeyPwd,
94            caCertFilePathList=caCertFilePathList,
95            tracefile=sys.stderr)
96           
97   
98    def test1GetX509Cert(self):
99        '''test1GetX509Cert: retrieve Attribute Authority's X.509 cert.'''
100        resp = self.clnt.getX509Cert()
101        print "Attribute Authority X.509 cert.:\n" + resp
102
103    def test2GetHostInfo(self):
104        """test2GetHostInfo: retrieve info for AA host"""
105        hostInfo = self.clnt.getHostInfo()
106        print "Host Info:\n %s" % hostInfo
107       
108
109    def test3GetTrustedHostInfo(self):
110        """test3GetTrustedHostInfo: retrieve trusted host info matching a
111        given role"""
112        trustedHostInfo = self.clnt.getTrustedHostInfo(\
113                                 self.cfg['test3GetTrustedHostInfo']['role'])
114        print "Trusted Host Info:\n %s" % trustedHostInfo
115
116
117    def test4GetTrustedHostInfoWithNoRole(self):
118        """test4GetTrustedHostInfoWithNoRole: retrieve trusted host info
119        irrespective of role"""
120        trustedHostInfo = self.clnt.getTrustedHostInfo()
121        print "Trusted Host Info:\n %s" % trustedHostInfo
122
123
124    def test5GetAttCert(self):       
125        """test5GetAttCert: Request attribute certificate from NDG Attribute
126        Authority Web Service."""
127   
128        # Read user Certificate into a string ready for passing via WS
129        try:
130            userCertFilePath = \
131                self.cfg['test5GetAttCert'].get('issuingclntcertfilepath')
132            userCertTxt = open(userCertFilePath, 'r').read()
133       
134        except TypeError:
135            # No issuing cert set
136            userCertTxt = None
137               
138        except IOError, ioErr:
139            raise "Error reading certificate file \"%s\": %s" % \
140                                    (ioErr.filename, ioErr.strerror)
141
142        # Make attribute certificate request
143        attCert = self.clnt.getAttCert(userCert=userCertTxt)
144       
145        print "Attribute Certificate: \n\n:" + str(attCert)
146       
147        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
148        attCert.write()
149       
150    def test6GetAttCertWithUserIdSet(self):       
151        """test6GetAttCertWithUserIdSet: Request attribute certificate from
152        NDG Attribute Authority Web Service setting a specific user Id
153        independent of the signer of the SOAP request."""
154   
155        # Read user Certificate into a string ready for passing via WS
156        try:
157            userCertFilePath = \
158    self.cfg['test6GetAttCertWithUserIdSet'].get('issuingclntcertfilepath')
159            userCertTxt = open(userCertFilePath, 'r').read()
160       
161        except TypeError:
162            # No issuing cert set
163            userCertTxt = None
164               
165        except IOError, ioErr:
166            raise "Error reading certificate file \"%s\": %s" % \
167                                    (ioErr.filename, ioErr.strerror)
168
169        # Make attribute certificate request
170        userId = self.cfg['test6GetAttCertWithUserIdSet']['userid']
171        attCert = self.clnt.getAttCert(userId=userId,
172                                       userCert=userCertTxt)
173       
174        print "Attribute Certificate: \n\n:" + str(attCert)
175       
176        attCert.filePath = self.cfg['test5GetAttCert']['attcertfilepath']
177        attCert.write()
178
179    def test7GetMappedAttCert(self):       
180        """test7GetMappedAttCert: Request mapped attribute certificate from
181        NDG Attribute Authority Web Service."""
182   
183        # Read user Certificate into a string ready for passing via WS
184        try:
185            userCertFilePath = \
186            self.cfg['test7GetMappedAttCert'].get('issuingclntcertfilepath')
187            userCertTxt = open(userCertFilePath, 'r').read()
188       
189        except TypeError:
190            # No issuing cert set
191            userCertTxt = None
192               
193        except IOError, ioErr:
194            raise "Error reading certificate file \"%s\": %s" % \
195                                    (ioErr.filename, ioErr.strerror)
196   
197   
198        # Simlarly for Attribute Certificate
199        try:
200            userAttCert = AttCertRead(\
201                self.cfg['test7GetMappedAttCert']['userattcertfilepath'])
202           
203        except IOError, ioErr:
204            raise "Error reading attribute certificate file \"%s\": %s" %\
205                                    (ioErr.filename, ioErr.strerror)
206
207        try:
208            if self.cfg['test7GetMappedAttCert'].get('clntprikeypwd') is None:
209                clntPriKeyPwd = getpass.getpass(\
210                            prompt="\nsetUp - client private key password: ")
211            else:
212                clntPriKeyPwd = \
213                        self.cfg['test7GetMappedAttCert'].get('clntprikeypwd')
214        except KeyboardInterrupt:
215            sys.exit(0)
216
217        # List of CA certificates for use in validation of certs used in
218        # signature for server reponse
219        try:
220            caCertFilePathList=\
221            self.cfg['test7GetMappedAttCert']['cacertfilepathlist'].split()
222        except:
223            caCertFilePathList = []
224           
225        reqBinSecTokValType = \
226                self.cfg['test7GetMappedAttCert'].get('reqbinsectokvaltype')
227       
228        # Check certificate types proxy or standard
229        proxyCertFilePath = \
230                    self.cfg['test7GetMappedAttCert'].get('proxycertfilepath')
231        if proxyCertFilePath:
232            signingCertChain = \
233                        self._getCertChainFromProxyCertFile(proxyCertFilePath)       
234
235        setSignatureHandler = \
236                eval(self.cfg['test7GetMappedAttCert']['setsignaturehandler'])
237       
238        # Make client to site B Attribute Authority
239        clnt = AttAuthorityClient(\
240uri=self.cfg['test7GetMappedAttCert']['uri'], 
241setSignatureHandler=setSignatureHandler,
242reqBinSecTokValType=reqBinSecTokValType,
243signingCertFilePath=self.cfg['test7GetMappedAttCert'].get('clntcertfilepath'),
244signingCertChain=signingCertChain,
245signingPriKeyFilePath=self.cfg['test7GetMappedAttCert'].get('clntprikeyfilepath'),
246signingPriKeyPwd=clntPriKeyPwd,
247caCertFilePathList=caCertFilePathList,
248tracefile=sys.stderr)
249   
250        # Make attribute certificate request
251        attCert = clnt.getAttCert(userCert=userCertTxt,
252                                  userAttCert=userAttCert)
253        print "Attribute Certificate: \n\n:" + str(attCert)
254       
255        attCert.filePath = \
256                    self.cfg['test7GetMappedAttCert']['mappedattcertfilepath']
257        attCert.write()
258       
259       
260    def test8GetMappedAttCertStressTest(self):       
261        """test8GetMappedAttCertStressTest: Request mapped attribute certificate from
262        NDG Attribute Authority Web Service."""
263   
264        # Read user Certificate into a string ready for passing via WS
265        try:
266            userCertFilePath = \
267    self.cfg['test8GetMappedAttCertStressTest'].get('issuingclntcertfilepath')
268            userCertTxt = open(userCertFilePath, 'r').read()
269       
270        except TypeError:
271            # No issuing cert set
272            userCertTxt = None
273               
274        except IOError, ioErr:
275            raise "Error reading certificate file \"%s\": %s" % \
276                                    (ioErr.filename, ioErr.strerror)
277
278        try:
279            if self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd') is None:
280                clntPriKeyPwd = getpass.getpass(\
281                            prompt="\nsetUp - client private key password: ")
282            else:
283                clntPriKeyPwd = \
284            self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeypwd')
285        except KeyboardInterrupt:
286            sys.exit(0)
287
288        # List of CA certificates for use in validation of certs used in
289        # signature for server reponse
290        try:
291            caCertFilePathList=\
292    self.cfg['test8GetMappedAttCertStressTest']['cacertfilepathlist'].split()
293        except:
294            caCertFilePathList = []
295           
296        reqBinSecTokValType = \
297        self.cfg['test8GetMappedAttCertStressTest'].get('reqbinsectokvaltype')
298       
299        # Check certificate types proxy or standard
300        proxyCertFilePath = \
301        self.cfg['test8GetMappedAttCertStressTest'].get('proxycertfilepath')
302        if proxyCertFilePath:
303            signingCertChain = \
304                        self._getCertChainFromProxyCertFile(proxyCertFilePath)       
305
306        setSignatureHandler = \
307    eval(self.cfg['test8GetMappedAttCertStressTest']['setsignaturehandler'])
308       
309        # Make client to site B Attribute Authority
310        clnt = AttAuthorityClient(\
311uri=self.cfg['test8GetMappedAttCertStressTest']['uri'], 
312setSignatureHandler=setSignatureHandler,
313reqBinSecTokValType=reqBinSecTokValType,
314signingCertChain=signingCertChain,
315signingCertFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('clntcertfilepath'),
316signingPriKeyFilePath=self.cfg['test8GetMappedAttCertStressTest'].get('clntprikeyfilepath'),
317signingPriKeyPwd=clntPriKeyPwd,
318caCertFilePathList=caCertFilePathList,
319tracefile=sys.stderr)
320
321        acFilePathList = \
322self.cfg['test8GetMappedAttCertStressTest']['userattcertfilepathlist'].split()
323
324        for acFilePath in acFilePathList:
325            try:
326                userAttCert = AttCertRead(acFilePath)
327               
328            except IOError, ioErr:
329                raise "Error reading attribute certificate file \"%s\": %s" %\
330                                        (ioErr.filename, ioErr.strerror)
331       
332            # Make attribute certificate request
333            try:
334                attCert = clnt.getAttCert(userCert=userCertTxt,
335                                          userAttCert=userAttCert)
336            except Exception, e:
337                outFilePfx = 'test8GetMappedAttCertStressTest-%s' % \
338                        os.path.basename(acFilePath)   
339                msgFile = open(outFilePfx+".msg", 'w')
340                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
341             
342#_____________________________________________________________________________       
343class AttAuthorityClientTestSuite(unittest.TestSuite):
344    def __init__(self):
345        map = map(AttAuthorityClientTestCase,
346                  (
347                    "test1GetX509Cert",
348                    "test2GetHostInfo",
349                    "test3GetTrustedHostInfo",
350                    "test4GetTrustedHostInfoWithNoRole",
351                    "test5GetAttCert",
352                    "test6GetAttCertWithUserIdSet",
353                    "test7GetMappedAttCert",
354                    "test8GetMappedAttCertStressTest",
355                  ))
356        unittest.TestSuite.__init__(self, map)
357                                       
358if __name__ == "__main__":
359    unittest.main()
Note: See TracBrowser for help on using the repository browser.