source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py @ 5681

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py@5681
Revision 5681, 13.8 KB checked in by pjkersha, 11 years ago (diff)

Integrated SOAP SAML Attribute Query interface into Attribute Authority Client unit tests.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
12import logging
13logging.basicConfig()
14
15import unittest
16import os, sys, getpass, re
17   
18from os.path import expandvars as xpdVars
19from os.path import join as jnPath
20mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
21
22from datetime import datetime
23from uuid import uuid4
24
25from ndg.security.test.unit import BaseTestCase
26
27from ndg.security.common.attributeauthority import AttributeAuthorityClient, \
28    NoMatchingRoleInTrustedHosts
29from ndg.security.common.AttCert import AttCertRead
30from ndg.security.common.X509 import X509CertParse, X509CertRead
31from ndg.security.common.utils.configfileparsers import \
32    CaseSensitiveConfigParser
33
34from saml.common.xml import SAMLConstants
35from saml.saml2.core import Response, Assertion, Attribute, AttributeValue, \
36    AttributeStatement, SAMLVersion, Subject, NameID, Issuer, AttributeQuery, \
37    XSStringAttributeValue, XSGroupRoleAttributeValue, Conditions, Status, \
38    StatusCode
39   
40from ndg.security.common.saml.bindings import SOAPBinding as SamlSoapBinding
41
42
43class AttributeAuthorityClientTestCase(BaseTestCase):
44    clntPriKeyPwd = None
45    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
46
47    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
48        '''Read proxy cert and user cert from a single PEM file and put in
49        a list ready for input into SignatureHandler'''               
50        proxyCertFileTxt = open(proxyCertFilePath).read()
51       
52        pemPatRE = re.compile(self.__class__.pemPat, re.S)
53        x509CertList = pemPatRE.findall(proxyCertFileTxt)
54       
55        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
56                            x509CertList]
57   
58        # Expecting proxy cert first - move this to the end.  This will
59        # be the cert used to verify the message signature
60        signingCertChain.reverse()
61       
62        return signingCertChain
63
64
65    def setUp(self):
66        super(AttributeAuthorityClientTestCase, self).setUp()
67       
68        if 'NDGSEC_INT_DEBUG' in os.environ:
69            import pdb
70            pdb.set_trace()
71       
72        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
73            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
74                os.path.abspath(os.path.dirname(__file__))
75
76        self.cfgParser = CaseSensitiveConfigParser()
77        cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
78                             'attAuthorityClientTest.cfg')
79        self.cfgParser.read(cfgFilePath)
80       
81        self.cfg = {}
82        for section in self.cfgParser.sections():
83            self.cfg[section] = dict(self.cfgParser.items(section))
84
85        try:
86            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
87                         self.cfg['setUp']['sslcaCertFilePathList'].split()]
88        except KeyError:
89            sslCACertList = []
90           
91        thisSection = self.cfg['setUp']
92       
93        # Instantiate WS proxy
94        self.siteAClnt = AttributeAuthorityClient(uri=thisSection['uri'],
95                                sslPeerCertCN=thisSection.get('sslPeerCertCN'),
96                                sslCACertList=sslCACertList,
97                                cfgFileSection='wsse',
98                                cfg=self.cfgParser)           
99
100    def test01GetHostInfo(self):
101        """test01GetHostInfo: retrieve info for AA host"""
102        hostInfo = self.siteAClnt.getHostInfo()
103        print "Host Info:\n %s" % hostInfo       
104
105    def test02GetTrustedHostInfo(self):
106        """test02GetTrustedHostInfo: retrieve trusted host info matching a
107        given role"""
108        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(\
109                                 self.cfg['test02GetTrustedHostInfo']['role'])
110        for hostname, hostInfo in trustedHostInfo.items():
111            self.assert_(hostname, "Hostname not set")
112            for k, v in hostInfo.items():
113                self.assert_(k, "hostInfo value key unset")
114
115        print "Trusted Host Info:\n %s" % trustedHostInfo
116
117    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
118        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
119        where the input role doesn't match any roles in the target AA's map
120        config file"""
121        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
122        try:
123            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
124            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
125           
126        except NoMatchingRoleInTrustedHosts, e:
127            print 'As expected - no match for role "%s": %s' % \
128                (_cfg['role'], e)
129
130
131    def test04GetTrustedHostInfoWithNoRole(self):
132        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
133        irrespective of role"""
134        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
135        for hostname, hostInfo in trustedHostInfo.items():
136            self.assert_(hostname, "Hostname not set")
137            for k, v in hostInfo.items():
138                self.assert_(k, "hostInfo value key unset")
139                   
140        print "Trusted Host Info:\n %s" % trustedHostInfo
141       
142
143    def test05GetAllHostsInfo(self):
144        """test05GetAllHostsInfo: retrieve info for all hosts"""
145        allHostInfo = self.siteAClnt.getAllHostsInfo()
146        for hostname, hostInfo in allHostInfo.items():
147            self.assert_(hostname, "Hostname not set")
148            for k, v in hostInfo.items():
149                self.assert_(k, "hostInfo value key unset")
150                   
151        print "All Hosts Info:\n %s" % allHostInfo
152
153
154    def test06GetAttCert(self):       
155        """test06GetAttCert: Request attribute certificate from NDG Attribute
156        Authority Web Service."""
157        _cfg = self.cfg['test06GetAttCert']
158       
159        # Read user Certificate into a string ready for passing via WS
160        try:
161            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
162            userX509CertTxt = open(userX509CertFilePath, 'r').read()
163       
164        except TypeError:
165            # No issuing cert set
166            userX509CertTxt = None
167               
168        except IOError, ioErr:
169            raise Exception("Error reading certificate file \"%s\": %s" % \
170                                    (ioErr.filename, ioErr.strerror))
171
172        # Make attribute certificate request
173        attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt)
174       
175        print "Attribute Certificate: \n\n:" + str(attCert)
176       
177        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
178        attCert.write()
179       
180       
181    def test07GetAttCertWithUserIdSet(self):       
182        """test07GetAttCertWithUserIdSet: Request attribute certificate from
183        NDG Attribute Authority Web Service setting a specific user Id
184        independent of the signer of the SOAP request."""
185        _cfg = self.cfg['test07GetAttCertWithUserIdSet']
186       
187        # Read user Certificate into a string ready for passing via WS
188        try:
189            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
190            userX509CertTxt = open(userX509CertFilePath, 'r').read()
191       
192        except TypeError:
193            # No issuing cert set
194            userX509CertTxt = None
195               
196        except IOError, ioErr:
197            raise Exception("Error reading certificate file \"%s\": %s" % \
198                                    (ioErr.filename, ioErr.strerror))
199
200        # Make attribute certificate request
201        userId = _cfg['userId']
202        attCert = self.siteAClnt.getAttCert(userId=userId,
203                                            userX509Cert=userX509CertTxt)
204       
205        print "Attribute Certificate: \n\n:" + str(attCert)
206       
207        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
208        attCert.write()
209
210
211    def test08GetMappedAttCert(self):       
212        """test08GetMappedAttCert: Request mapped attribute certificate from
213        NDG Attribute Authority Web Service."""
214        _cfg = self.cfg['test08GetMappedAttCert']
215       
216        # Read user Certificate into a string ready for passing via WS
217        try:
218            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
219            userX509CertTxt = open(userX509CertFilePath, 'r').read()
220       
221        except TypeError:
222            # No issuing cert set
223            userX509CertTxt = None
224               
225        except IOError, ioErr:
226            raise Exception("Error reading certificate file \"%s\": %s" % \
227                                    (ioErr.filename, ioErr.strerror))
228   
229        # Simlarly for Attribute Certificate
230        try:
231            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath']))
232           
233        except IOError, ioErr:
234            raise Exception("Error reading attribute certificate file \"%s\": "
235                            "%s" % (ioErr.filename, ioErr.strerror))
236       
237        # Make client to site B Attribute Authority
238        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
239                                       cfgFileSection='wsse',
240                                       cfg=self.cfgParser)
241   
242        # Make attribute certificate request
243        attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
244                                       userAttCert=userAttCert)
245        print "Attribute Certificate: \n\n:" + str(attCert)
246       
247        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath'])
248        attCert.write()
249       
250       
251    def test09GetMappedAttCertStressTest(self):       
252        """test09GetMappedAttCertStressTest: Request mapped attribute
253        certificate from NDG Attribute Authority Web Service."""
254        _cfg = self.cfg['test09GetMappedAttCertStressTest']
255       
256        # Read user Certificate into a string ready for passing via WS
257        try:
258            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
259            userX509CertTxt = open(userX509CertFilePath, 'r').read()
260       
261        except TypeError:
262            # No issuing cert set
263            userX509CertTxt = None
264               
265        except IOError, ioErr:
266            raise Exception("Error reading certificate file \"%s\": %s" % 
267                                    (ioErr.filename, ioErr.strerror))
268
269        # Make client to site B Attribute Authority
270        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
271                                       cfgFileSection='wsse',
272                                       cfg=self.cfgParser)
273
274        acFilePathList = [xpdVars(file) for file in \
275                          _cfg['userAttCertFilePathList'].split()]
276
277        for acFilePath in acFilePathList:
278            try:
279                userAttCert = AttCertRead(acFilePath)
280               
281            except IOError, ioErr:
282                raise Exception("Error reading attribute certificate file "
283                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
284       
285            # Make attribute certificate request
286            try:
287                attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
288                                               userAttCert=userAttCert)
289            except Exception, e:
290                outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \
291                        os.path.basename(acFilePath)   
292                msgFile = open(outFilePfx+".msg", 'w')
293                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
294             
295    def test10SAMLAttributeQuery(self):
296        _cfg = self.cfg['test10SAMLAttributeQuery']
297       
298        attributeQuery = AttributeQuery()
299        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
300        attributeQuery.id = str(uuid4())
301        attributeQuery.issueInstant = datetime.utcnow()
302       
303        attributeQuery.issuer = Issuer()
304        attributeQuery.issuer.format = "urn:esg:issuer"
305        attributeQuery.issuer.value = "Site A"   
306                       
307        attributeQuery.subject = Subject() 
308        attributeQuery.subject.nameID = NameID()
309        attributeQuery.subject.nameID.format = "urn:esg:openid"
310        attributeQuery.subject.nameID.value = \
311                                    "https://openid.localhost/philip.kershaw"
312       
313        fnAttribute = Attribute()
314        fnAttribute.name = "urn:esg:first:name"
315        fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
316        fnAttribute.friendlyName = "FirstName"
317
318        attributeQuery.attributes.append(fnAttribute)
319   
320        lnAttribute = Attribute()
321        lnAttribute.name = "urn:esg:last:name"
322        lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
323        lnAttribute.friendlyName = "LastName"
324
325        attributeQuery.attributes.append(lnAttribute)
326   
327        emailAddressAttribute = Attribute()
328        emailAddressAttribute.name = "urn:esg:email:address"
329        emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
330                                    XSStringAttributeValue.TYPE_LOCAL_NAME
331        emailAddressAttribute.friendlyName = "emailAddress"
332
333        attributeQuery.attributes.append(emailAddressAttribute)                                   
334
335        binding = SamlSoapBinding()
336        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
337       
338        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
339        print response
340       
341                                       
342if __name__ == "__main__":
343    unittest.main()
Note: See TracBrowser for help on using the repository browser.