source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py @ 5741

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py@5741
Revision 5741, 19.7 KB checked in by pjkersha, 11 years ago (diff)

Added debug logging of SOAP request for ndg.security.common.soap.client.UrlLib2SOAPClient

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os, sys, getpass, re
17   
18from os.path import expandvars as xpdVars
19from os.path import join as jnPath
20mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
21
22from datetime import datetime
23from uuid import uuid4
24from xml.etree import ElementTree
25
26from ndg.security.test.unit import BaseTestCase
27
28from ndg.security.common.utils.etree import prettyPrint
29
30from ndg.security.common.attributeauthority import AttributeAuthorityClient, \
31    NoMatchingRoleInTrustedHosts
32from ndg.security.common.AttCert import AttCertRead
33from ndg.security.common.X509 import X509CertParse, X509CertRead
34from ndg.security.common.utils.configfileparsers import \
35    CaseSensitiveConfigParser
36
37from saml.common.xml import SAMLConstants
38from saml.saml2.core import Response, Assertion, Attribute, AttributeValue, \
39    AttributeStatement, SAMLVersion, Subject, NameID, Issuer, AttributeQuery, \
40    XSStringAttributeValue, XSGroupRoleAttributeValue, Conditions, Status, \
41    StatusCode
42from saml.xml.etree import ResponseElementTree
43   
44from ndg.security.common.saml.bindings import SOAPBinding as SamlSoapBinding
45
46class AttributeAuthorityClientBaseTestCase(BaseTestCase):
47    def __init__(self, *arg, **kw):
48        super(AttributeAuthorityClientBaseTestCase, self).__init__(*arg, **kw)
49
50        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
51            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
52                os.path.abspath(os.path.dirname(__file__))
53
54        self.cfgParser = CaseSensitiveConfigParser()
55        cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
56                             'attAuthorityClientTest.cfg')
57        self.cfgParser.read(cfgFilePath)
58       
59        self.cfg = {}
60        for section in self.cfgParser.sections():
61            self.cfg[section] = dict(self.cfgParser.items(section))
62
63        try:
64            self.sslCACertList = [X509CertRead(xpdVars(caFile)) for caFile in \
65                         self.cfg['setUp']['sslcaCertFilePathList'].split()]
66        except KeyError:
67            self.sslCACertList = []
68
69     
70class AttributeAuthorityClientTestCase(AttributeAuthorityClientBaseTestCase):
71    clntPriKeyPwd = None
72    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
73
74    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
75        '''Read proxy cert and user cert from a single PEM file and put in
76        a list ready for input into SignatureHandler'''               
77        proxyCertFileTxt = open(proxyCertFilePath).read()
78       
79        pemPatRE = re.compile(self.__class__.pemPat, re.S)
80        x509CertList = pemPatRE.findall(proxyCertFileTxt)
81       
82        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
83                            x509CertList]
84   
85        # Expecting proxy cert first - move this to the end.  This will
86        # be the cert used to verify the message signature
87        signingCertChain.reverse()
88       
89        return signingCertChain
90
91    def setUp(self):
92        super(AttributeAuthorityClientTestCase, self).setUp()
93               
94        if 'NDGSEC_INT_DEBUG' in os.environ:
95            import pdb
96            pdb.set_trace()
97           
98        thisSection = self.cfg['setUp']
99       
100        # Instantiate WS proxy
101        self.siteAClnt = AttributeAuthorityClient(uri=thisSection['uri'],
102                                sslPeerCertCN=thisSection.get('sslPeerCertCN'),
103                                sslCACertList=self.sslCACertList,
104                                cfgFileSection='wsse',
105                                cfg=self.cfgParser)           
106
107    def test01GetHostInfo(self):
108        """test01GetHostInfo: retrieve info for AA host"""
109        hostInfo = self.siteAClnt.getHostInfo()
110        print "Host Info:\n %s" % hostInfo       
111
112    def test02GetTrustedHostInfo(self):
113        """test02GetTrustedHostInfo: retrieve trusted host info matching a
114        given role"""
115        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(\
116                                 self.cfg['test02GetTrustedHostInfo']['role'])
117        for hostname, hostInfo in trustedHostInfo.items():
118            self.assert_(hostname, "Hostname not set")
119            for k, v in hostInfo.items():
120                self.assert_(k, "hostInfo value key unset")
121
122        print "Trusted Host Info:\n %s" % trustedHostInfo
123
124    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
125        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
126        where the input role doesn't match any roles in the target AA's map
127        config file"""
128        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
129        try:
130            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
131            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
132           
133        except NoMatchingRoleInTrustedHosts, e:
134            print 'As expected - no match for role "%s": %s' % \
135                (_cfg['role'], e)
136
137
138    def test04GetTrustedHostInfoWithNoRole(self):
139        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
140        irrespective of role"""
141        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
142        for hostname, hostInfo in trustedHostInfo.items():
143            self.assert_(hostname, "Hostname not set")
144            for k, v in hostInfo.items():
145                self.assert_(k, "hostInfo value key unset")
146                   
147        print "Trusted Host Info:\n %s" % trustedHostInfo
148       
149
150    def test05GetAllHostsInfo(self):
151        """test05GetAllHostsInfo: retrieve info for all hosts"""
152        allHostInfo = self.siteAClnt.getAllHostsInfo()
153        for hostname, hostInfo in allHostInfo.items():
154            self.assert_(hostname, "Hostname not set")
155            for k, v in hostInfo.items():
156                self.assert_(k, "hostInfo value key unset")
157                   
158        print "All Hosts Info:\n %s" % allHostInfo
159
160
161    def test06GetAttCert(self):       
162        """test06GetAttCert: Request attribute certificate from NDG Attribute
163        Authority Web Service."""
164        _cfg = self.cfg['test06GetAttCert']
165       
166        # Read user Certificate into a string ready for passing via WS
167        try:
168            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
169            userX509CertTxt = open(userX509CertFilePath, 'r').read()
170       
171        except TypeError:
172            # No issuing cert set
173            userX509CertTxt = None
174               
175        except IOError, ioErr:
176            raise Exception("Error reading certificate file \"%s\": %s" % \
177                                    (ioErr.filename, ioErr.strerror))
178
179        # Make attribute certificate request
180        attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt)
181       
182        print "Attribute Certificate: \n\n:" + str(attCert)
183       
184        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
185        attCert.write()
186       
187       
188    def test07GetAttCertWithUserIdSet(self):       
189        """test07GetAttCertWithUserIdSet: Request attribute certificate from
190        NDG Attribute Authority Web Service setting a specific user Id
191        independent of the signer of the SOAP request."""
192        _cfg = self.cfg['test07GetAttCertWithUserIdSet']
193       
194        # Read user Certificate into a string ready for passing via WS
195        try:
196            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
197            userX509CertTxt = open(userX509CertFilePath, 'r').read()
198       
199        except TypeError:
200            # No issuing cert set
201            userX509CertTxt = None
202               
203        except IOError, ioErr:
204            raise Exception("Error reading certificate file \"%s\": %s" % \
205                                    (ioErr.filename, ioErr.strerror))
206
207        # Make attribute certificate request
208        userId = _cfg['userId']
209        attCert = self.siteAClnt.getAttCert(userId=userId,
210                                            userX509Cert=userX509CertTxt)
211       
212        print "Attribute Certificate: \n\n:" + str(attCert)
213       
214        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
215        attCert.write()
216
217
218    def test08GetMappedAttCert(self):       
219        """test08GetMappedAttCert: Request mapped attribute certificate from
220        NDG Attribute Authority Web Service."""
221        _cfg = self.cfg['test08GetMappedAttCert']
222       
223        # Read user Certificate into a string ready for passing via WS
224        try:
225            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
226            userX509CertTxt = open(userX509CertFilePath, 'r').read()
227       
228        except TypeError:
229            # No issuing cert set
230            userX509CertTxt = None
231               
232        except IOError, ioErr:
233            raise Exception("Error reading certificate file \"%s\": %s" % \
234                                    (ioErr.filename, ioErr.strerror))
235   
236        # Simlarly for Attribute Certificate
237        try:
238            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath']))
239           
240        except IOError, ioErr:
241            raise Exception("Error reading attribute certificate file \"%s\": "
242                            "%s" % (ioErr.filename, ioErr.strerror))
243       
244        # Make client to site B Attribute Authority
245        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
246                                       cfgFileSection='wsse',
247                                       cfg=self.cfgParser)
248   
249        # Make attribute certificate request
250        attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
251                                       userAttCert=userAttCert)
252        print "Attribute Certificate: \n\n:" + str(attCert)
253       
254        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath'])
255        attCert.write()
256       
257       
258    def test09GetMappedAttCertStressTest(self):       
259        """test09GetMappedAttCertStressTest: Request mapped attribute
260        certificate from NDG Attribute Authority Web Service."""
261        _cfg = self.cfg['test09GetMappedAttCertStressTest']
262       
263        # Read user Certificate into a string ready for passing via WS
264        try:
265            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
266            userX509CertTxt = open(userX509CertFilePath, 'r').read()
267       
268        except TypeError:
269            # No issuing cert set
270            userX509CertTxt = None
271               
272        except IOError, ioErr:
273            raise Exception("Error reading certificate file \"%s\": %s" % 
274                                    (ioErr.filename, ioErr.strerror))
275
276        # Make client to site B Attribute Authority
277        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
278                                       cfgFileSection='wsse',
279                                       cfg=self.cfgParser)
280
281        acFilePathList = [xpdVars(file) for file in \
282                          _cfg['userAttCertFilePathList'].split()]
283
284        for acFilePath in acFilePathList:
285            try:
286                userAttCert = AttCertRead(acFilePath)
287               
288            except IOError, ioErr:
289                raise Exception("Error reading attribute certificate file "
290                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
291       
292            # Make attribute certificate request
293            try:
294                attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
295                                               userAttCert=userAttCert)
296            except Exception, e:
297                outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \
298                        os.path.basename(acFilePath)   
299                msgFile = open(outFilePfx+".msg", 'w')
300                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
301
302   
303class AttributeAuthoritySAMLInterfaceTestCase(
304                                        AttributeAuthorityClientBaseTestCase):
305    """Separate class for Attribute Authority SAML Attribute Query interface"""
306           
307    def test01SAMLAttributeQuery(self):
308        _cfg = self.cfg['test01SAMLAttributeQuery']
309       
310        attributeQuery = AttributeQuery()
311        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
312        attributeQuery.id = str(uuid4())
313        attributeQuery.issueInstant = datetime.utcnow()
314       
315        attributeQuery.issuer = Issuer()
316        attributeQuery.issuer.format = "urn:esg:issuer"
317        attributeQuery.issuer.value = "Site A"   
318                       
319        attributeQuery.subject = Subject() 
320        attributeQuery.subject.nameID = NameID()
321        attributeQuery.subject.nameID.format = "urn:esg:openid"
322        attributeQuery.subject.nameID.value = _cfg['subject']
323        xsStringNs = SAMLConstants.XSD_NS+"#"+\
324                                        XSStringAttributeValue.TYPE_LOCAL_NAME
325        fnAttribute = Attribute()
326        fnAttribute.name = "urn:esg:first:name"
327        fnAttribute.nameFormat = xsStringNs
328        fnAttribute.friendlyName = "FirstName"
329
330        attributeQuery.attributes.append(fnAttribute)
331   
332        lnAttribute = Attribute()
333        lnAttribute.name = "urn:esg:last:name"
334        lnAttribute.nameFormat = xsStringNs
335        lnAttribute.friendlyName = "LastName"
336
337        attributeQuery.attributes.append(lnAttribute)
338   
339        emailAddressAttribute = Attribute()
340        emailAddressAttribute.name = "urn:esg:email:address"
341        emailAddressAttribute.nameFormat = xsStringNs
342        emailAddressAttribute.friendlyName = "emailAddress"
343       
344        attributeQuery.attributes.append(emailAddressAttribute) 
345
346        siteAAttribute = Attribute()
347        siteAAttribute.name = _cfg['siteAttributeName']
348        siteAAttribute.nameFormat = xsStringNs
349       
350        attributeQuery.attributes.append(siteAAttribute) 
351
352        binding = SamlSoapBinding()
353        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
354       
355        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
356       
357        # Check Query ID matches the query ID the service received
358        self.assert_(response.inResponseTo == attributeQuery.id)
359       
360        now = datetime.utcnow()
361        self.assert_(response.issueInstant < now)
362        self.assert_(response.assertions[-1].issueInstant < now)       
363        self.assert_(response.assertions[-1].conditions.notBefore < now) 
364        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
365         
366        samlResponseElem = ResponseElementTree.toXML(response)
367       
368        print("SAML Response ...")
369        print(ElementTree.tostring(samlResponseElem))
370        print("Pretty print SAML Response ...")
371        print(prettyPrint(samlResponseElem))
372             
373    def test02SAMLAttributeQueryInvalidIssuer(self):
374        _cfg = self.cfg['test02SAMLAttributeQueryInvalidIssuer']
375       
376        attributeQuery = AttributeQuery()
377        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
378        attributeQuery.id = str(uuid4())
379        attributeQuery.issueInstant = datetime.utcnow()
380       
381        attributeQuery.issuer = Issuer()
382        attributeQuery.issuer.format = "urn:esg:issuer"
383        attributeQuery.issuer.value = "Invalid Site"   
384                       
385        attributeQuery.subject = Subject() 
386        attributeQuery.subject.nameID = NameID()
387        attributeQuery.subject.nameID.format = "urn:esg:openid"
388        attributeQuery.subject.nameID.value = _cfg['subject']
389        xsStringNs = SAMLConstants.XSD_NS+"#"+\
390                                        XSStringAttributeValue.TYPE_LOCAL_NAME
391
392        siteAAttribute = Attribute()
393        siteAAttribute.name = _cfg['siteAttributeName']
394        siteAAttribute.nameFormat = xsStringNs
395       
396        attributeQuery.attributes.append(siteAAttribute) 
397
398        binding = SamlSoapBinding()
399        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
400
401        samlResponseElem = ResponseElementTree.toXML(response)
402       
403        print("SAML Response ...")
404        print(ElementTree.tostring(samlResponseElem))
405        print("Pretty print SAML Response ...")
406        print(prettyPrint(samlResponseElem))
407       
408        self.assert_(
409            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
410                   
411    def test03SAMLAttributeQueryUnknownSubject(self):
412        _cfg = self.cfg['test03SAMLAttributeQueryUnknownSubject']
413       
414        attributeQuery = AttributeQuery()
415        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
416        attributeQuery.id = str(uuid4())
417        attributeQuery.issueInstant = datetime.utcnow()
418       
419        attributeQuery.issuer = Issuer()
420        attributeQuery.issuer.format = "urn:esg:issuer"
421        attributeQuery.issuer.value = "Site A"   
422                       
423        attributeQuery.subject = Subject() 
424        attributeQuery.subject.nameID = NameID()
425        attributeQuery.subject.nameID.format = "urn:esg:openid"
426        attributeQuery.subject.nameID.value = _cfg['subject']
427        xsStringNs = SAMLConstants.XSD_NS+"#"+\
428                                        XSStringAttributeValue.TYPE_LOCAL_NAME
429
430        siteAAttribute = Attribute()
431        siteAAttribute.name = _cfg['siteAttributeName']
432        siteAAttribute.nameFormat = xsStringNs
433       
434        attributeQuery.attributes.append(siteAAttribute) 
435
436        binding = SamlSoapBinding()
437        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
438       
439        samlResponseElem = ResponseElementTree.toXML(response)
440        print("SAML Response ...")
441        print(ElementTree.tostring(samlResponseElem))
442        print("Pretty print SAML Response ...")
443        print(prettyPrint(samlResponseElem))
444       
445        self.assert_(
446            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
447             
448    def test04SAMLAttributeQueryInvalidAttrName(self):
449        _cfg = self.cfg['test04SAMLAttributeQueryInvalidAttrName']
450       
451        attributeQuery = AttributeQuery()
452        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
453        attributeQuery.id = str(uuid4())
454        attributeQuery.issueInstant = datetime.utcnow()
455       
456        attributeQuery.issuer = Issuer()
457        attributeQuery.issuer.format = "urn:esg:issuer"
458        attributeQuery.issuer.value = "Site A"   
459                       
460        attributeQuery.subject = Subject() 
461        attributeQuery.subject.nameID = NameID()
462        attributeQuery.subject.nameID.format = "urn:esg:openid"
463        attributeQuery.subject.nameID.value = _cfg['subject']
464        xsStringNs = SAMLConstants.XSD_NS+"#"+\
465                                        XSStringAttributeValue.TYPE_LOCAL_NAME
466
467        invalidAttribute = Attribute()
468        invalidAttribute.name = "myInvalidAttributeName"
469        invalidAttribute.nameFormat = xsStringNs
470       
471        attributeQuery.attributes.append(invalidAttribute) 
472
473        binding = SamlSoapBinding()
474        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
475       
476        samlResponseElem = ResponseElementTree.toXML(response)
477       
478        print("SAML Response ...")
479        print(ElementTree.tostring(samlResponseElem))
480        print("Pretty print SAML Response ...")
481        print(prettyPrint(samlResponseElem))
482       
483        self.assert_(response.status.statusCode.value==\
484                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
485       
486                                                       
487if __name__ == "__main__":
488    unittest.main()
Note: See TracBrowser for help on using the repository browser.