source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py @ 6571

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py@6571
Revision 6571, 10.7 KB checked in by pjkersha, 10 years ago (diff)

Refactored SAML SOAP Binding unit test class into a separate module test_samlattributeauthorityclient

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority WSDL SOAP client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os, re
17   
18from os.path import expandvars as xpdVars
19
20from ndg.security.common.attributeauthority import (AttributeAuthorityClient, 
21                                                NoMatchingRoleInTrustedHosts)
22from ndg.security.common.AttCert import AttCertRead
23from ndg.security.common.X509 import X509CertParse
24
25from ndg.security.test.unit.attributeauthorityclient import \
26                                        AttributeAuthorityClientBaseTestCase   
27     
28     
29class AttributeAuthorityClientTestCase(AttributeAuthorityClientBaseTestCase):
30    '''NDG Attribute Authority WSDL SOAP client unit tests'''
31    clntPriKeyPwd = None
32    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
33
34    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
35        '''Read proxy cert and user cert from a single PEM file and put in
36        a list ready for input into SignatureHandler'''               
37        proxyCertFileTxt = open(proxyCertFilePath).read()
38       
39        pemPatRE = re.compile(self.__class__.pemPat, re.S)
40        x509CertList = pemPatRE.findall(proxyCertFileTxt)
41       
42        signingCertChain = [X509CertParse(x509Cert) 
43                            for x509Cert in x509CertList]
44   
45        # Expecting proxy cert first - move this to the end.  This will
46        # be the cert used to verify the message signature
47        signingCertChain.reverse()
48       
49        return signingCertChain
50
51    def setUp(self):
52        super(AttributeAuthorityClientTestCase, self).setUp()
53               
54        if 'NDGSEC_INT_DEBUG' in os.environ:
55            import pdb
56            pdb.set_trace()
57           
58        thisSection = self.cfg['setUp']
59       
60        # Instantiate WS proxy
61        self.siteAClnt = AttributeAuthorityClient(uri=thisSection['uri'],
62                                sslPeerCertCN=thisSection.get('sslPeerCertCN'),
63                                sslCACertList=self.sslCACertList,
64                                cfgFileSection='wsse',
65                                cfg=self.cfgParser)           
66
67    def test01GetHostInfo(self):
68        """test01GetHostInfo: retrieve info for AA host"""
69        hostInfo = self.siteAClnt.getHostInfo()
70        print "Host Info:\n %s" % hostInfo       
71
72    def test02GetTrustedHostInfo(self):
73        """test02GetTrustedHostInfo: retrieve trusted host info matching a
74        given role"""
75        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(
76                                 self.cfg['test02GetTrustedHostInfo']['role'])
77        for hostname, hostInfo in trustedHostInfo.items():
78            self.assert_(hostname, "Hostname not set")
79            for k in hostInfo.keys():
80                self.assert_(k, "hostInfo value key unset")
81
82        print "Trusted Host Info:\n %s" % trustedHostInfo
83
84    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
85        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
86        where the input role doesn't match any roles in the target AA's map
87        config file"""
88        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
89        try:
90            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
91            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
92           
93        except NoMatchingRoleInTrustedHosts, e:
94            print('As expected - no match for role "%s": %s' % 
95                  (_cfg['role'], e))
96
97    def test04GetTrustedHostInfoWithNoRole(self):
98        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
99        irrespective of role"""
100        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
101        for hostname, hostInfo in trustedHostInfo.items():
102            self.assert_(hostname, "Hostname not set")
103            for k in hostInfo.keys():
104                self.assert_(k, "hostInfo value key unset")
105                   
106        print "Trusted Host Info:\n %s" % trustedHostInfo
107       
108    def test05GetAllHostsInfo(self):
109        """test05GetAllHostsInfo: retrieve info for all hosts"""
110        allHostInfo = self.siteAClnt.getAllHostsInfo()
111        for hostname, hostInfo in allHostInfo.items():
112            self.assert_(hostname, "Hostname not set")
113            for k in hostInfo.keys():
114                self.assert_(k, "hostInfo value key unset")
115                   
116        print "All Hosts Info:\n %s" % allHostInfo
117
118    def test06GetAttCert(self):       
119        """test06GetAttCert: Request attribute certificate from NDG Attribute
120        Authority Web Service."""
121        _cfg = self.cfg['test06GetAttCert']
122       
123        # Read user Certificate into a string ready for passing via WS
124        try:
125            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
126            userX509CertTxt = open(userX509CertFilePath, 'r').read()
127       
128        except TypeError:
129            # No issuing cert set
130            userX509CertTxt = None
131               
132        except IOError, ioErr:
133            raise IOError("Error reading certificate file \"%s\": %s" % 
134                          (ioErr.filename, ioErr.strerror))
135
136        # Make attribute certificate request
137        attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt)
138       
139        print "Attribute Certificate: \n\n:" + str(attCert)
140       
141        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
142        attCert.write()     
143       
144    def test07GetAttCertWithUserIdSet(self):       
145        """test07GetAttCertWithUserIdSet: Request attribute certificate from
146        NDG Attribute Authority Web Service setting a specific user Id
147        independent of the signer of the SOAP request."""
148        _cfg = self.cfg['test07GetAttCertWithUserIdSet']
149       
150        # Read user Certificate into a string ready for passing via WS
151        try:
152            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
153            userX509CertTxt = open(userX509CertFilePath, 'r').read()
154       
155        except TypeError:
156            # No issuing cert set
157            userX509CertTxt = None
158               
159        except IOError, ioErr:
160            raise IOError("Error reading certificate file \"%s\": %s" % 
161                          (ioErr.filename, ioErr.strerror))
162
163        # Make attribute certificate request
164        userId = _cfg['userId']
165        attCert = self.siteAClnt.getAttCert(userId=userId,
166                                            userX509Cert=userX509CertTxt)
167       
168        print "Attribute Certificate: \n\n:" + str(attCert)
169       
170        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
171        attCert.write()
172
173    def test08GetMappedAttCert(self):       
174        """test08GetMappedAttCert: Request mapped attribute certificate from
175        NDG Attribute Authority Web Service."""
176        _cfg = self.cfg['test08GetMappedAttCert']
177       
178        # Read user Certificate into a string ready for passing via WS
179        try:
180            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
181            userX509CertTxt = open(userX509CertFilePath, 'r').read()
182       
183        except TypeError:
184            # No issuing cert set
185            userX509CertTxt = None
186               
187        except IOError, ioErr:
188            raise IOError("Error reading certificate file \"%s\": %s" % 
189                          (ioErr.filename, ioErr.strerror))
190   
191        # Simlarly for Attribute Certificate
192        try:
193            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath']))
194           
195        except IOError, ioErr:
196            raise Exception("Error reading attribute certificate file \"%s\": "
197                            "%s" % (ioErr.filename, ioErr.strerror))
198       
199        # Make client to site B Attribute Authority
200        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
201                                       cfgFileSection='wsse',
202                                       cfg=self.cfgParser)
203   
204        # Make attribute certificate request
205        attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
206                                       userAttCert=userAttCert)
207        print "Attribute Certificate: \n\n:" + str(attCert)
208       
209        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath'])
210        attCert.write()
211       
212       
213    def test09GetMappedAttCertStressTest(self):       
214        """test09GetMappedAttCertStressTest: Request mapped attribute
215        certificate from NDG Attribute Authority Web Service."""
216        _cfg = self.cfg['test09GetMappedAttCertStressTest']
217       
218        # Read user Certificate into a string ready for passing via WS
219        try:
220            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
221            userX509CertTxt = open(userX509CertFilePath, 'r').read()
222       
223        except TypeError:
224            # No issuing cert set
225            userX509CertTxt = None
226               
227        except IOError, ioErr:
228            raise IOError("Error reading certificate file \"%s\": %s" % 
229                          (ioErr.filename, ioErr.strerror))
230
231        # Make client to site B Attribute Authority
232        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
233                                       cfgFileSection='wsse',
234                                       cfg=self.cfgParser)
235
236        acFilePathList = [xpdVars(acFile) 
237                          for acFile in _cfg['userAttCertFilePathList'].split()]
238
239        for acFilePath in acFilePathList:
240            try:
241                userAttCert = AttCertRead(acFilePath)
242               
243            except IOError, ioErr:
244                raise Exception("Error reading attribute certificate file "
245                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
246       
247            # Make attribute certificate request
248            try:
249                attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
250                                               userAttCert=userAttCert)
251            except Exception, e:
252                outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \
253                        os.path.basename(acFilePath)   
254                msgFile = open(outFilePfx+".msg", 'w')
255                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
256               
257            self.assert_(attCert)
258
259       
260if __name__ == "__main__":
261    unittest.main()
Note: See TracBrowser for help on using the repository browser.