source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py @ 5703

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py@5703
Revision 5703, 19.6 KB checked in by pjkersha, 11 years ago (diff)

Attribute Authority Client unit tests: added tests for invalid conditions for SAML Attribute Queries.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
12import logging
13logging.basicConfig()
14
15import unittest
16import os, sys, getpass, re
17   
18from os.path import expandvars as xpdVars
19from os.path import join as jnPath
20mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
21
22from datetime import datetime
23from uuid import uuid4
24from xml.etree import ElementTree
25
26from ndg.security.test.unit import BaseTestCase
27
28from ndg.security.common.utils.etree import prettyPrint
29
30from ndg.security.common.attributeauthority import AttributeAuthorityClient, \
31    NoMatchingRoleInTrustedHosts
32from ndg.security.common.AttCert import AttCertRead
33from ndg.security.common.X509 import X509CertParse, X509CertRead
34from ndg.security.common.utils.configfileparsers import \
35    CaseSensitiveConfigParser
36
37from saml.common.xml import SAMLConstants
38from saml.saml2.core import Response, Assertion, Attribute, AttributeValue, \
39    AttributeStatement, SAMLVersion, Subject, NameID, Issuer, AttributeQuery, \
40    XSStringAttributeValue, XSGroupRoleAttributeValue, Conditions, Status, \
41    StatusCode
42from saml.xml.etree import ResponseElementTree
43   
44from ndg.security.common.saml.bindings import SOAPBinding as SamlSoapBinding
45
46
47class AttributeAuthorityClientTestCase(BaseTestCase):
48    clntPriKeyPwd = None
49    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
50
51    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
52        '''Read proxy cert and user cert from a single PEM file and put in
53        a list ready for input into SignatureHandler'''               
54        proxyCertFileTxt = open(proxyCertFilePath).read()
55       
56        pemPatRE = re.compile(self.__class__.pemPat, re.S)
57        x509CertList = pemPatRE.findall(proxyCertFileTxt)
58       
59        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
60                            x509CertList]
61   
62        # Expecting proxy cert first - move this to the end.  This will
63        # be the cert used to verify the message signature
64        signingCertChain.reverse()
65       
66        return signingCertChain
67
68
69    def setUp(self):
70        super(AttributeAuthorityClientTestCase, self).setUp()
71       
72        if 'NDGSEC_INT_DEBUG' in os.environ:
73            import pdb
74            pdb.set_trace()
75       
76        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
77            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
78                os.path.abspath(os.path.dirname(__file__))
79
80        self.cfgParser = CaseSensitiveConfigParser()
81        cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
82                             'attAuthorityClientTest.cfg')
83        self.cfgParser.read(cfgFilePath)
84       
85        self.cfg = {}
86        for section in self.cfgParser.sections():
87            self.cfg[section] = dict(self.cfgParser.items(section))
88
89        try:
90            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
91                         self.cfg['setUp']['sslcaCertFilePathList'].split()]
92        except KeyError:
93            sslCACertList = []
94           
95        thisSection = self.cfg['setUp']
96       
97        # Instantiate WS proxy
98        self.siteAClnt = AttributeAuthorityClient(uri=thisSection['uri'],
99                                sslPeerCertCN=thisSection.get('sslPeerCertCN'),
100                                sslCACertList=sslCACertList,
101                                cfgFileSection='wsse',
102                                cfg=self.cfgParser)           
103
104    def test01GetHostInfo(self):
105        """test01GetHostInfo: retrieve info for AA host"""
106        hostInfo = self.siteAClnt.getHostInfo()
107        print "Host Info:\n %s" % hostInfo       
108
109    def test02GetTrustedHostInfo(self):
110        """test02GetTrustedHostInfo: retrieve trusted host info matching a
111        given role"""
112        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(\
113                                 self.cfg['test02GetTrustedHostInfo']['role'])
114        for hostname, hostInfo in trustedHostInfo.items():
115            self.assert_(hostname, "Hostname not set")
116            for k, v in hostInfo.items():
117                self.assert_(k, "hostInfo value key unset")
118
119        print "Trusted Host Info:\n %s" % trustedHostInfo
120
121    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
122        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
123        where the input role doesn't match any roles in the target AA's map
124        config file"""
125        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
126        try:
127            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
128            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
129           
130        except NoMatchingRoleInTrustedHosts, e:
131            print 'As expected - no match for role "%s": %s' % \
132                (_cfg['role'], e)
133
134
135    def test04GetTrustedHostInfoWithNoRole(self):
136        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
137        irrespective of role"""
138        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
139        for hostname, hostInfo in trustedHostInfo.items():
140            self.assert_(hostname, "Hostname not set")
141            for k, v in hostInfo.items():
142                self.assert_(k, "hostInfo value key unset")
143                   
144        print "Trusted Host Info:\n %s" % trustedHostInfo
145       
146
147    def test05GetAllHostsInfo(self):
148        """test05GetAllHostsInfo: retrieve info for all hosts"""
149        allHostInfo = self.siteAClnt.getAllHostsInfo()
150        for hostname, hostInfo in allHostInfo.items():
151            self.assert_(hostname, "Hostname not set")
152            for k, v in hostInfo.items():
153                self.assert_(k, "hostInfo value key unset")
154                   
155        print "All Hosts Info:\n %s" % allHostInfo
156
157
158    def test06GetAttCert(self):       
159        """test06GetAttCert: Request attribute certificate from NDG Attribute
160        Authority Web Service."""
161        _cfg = self.cfg['test06GetAttCert']
162       
163        # Read user Certificate into a string ready for passing via WS
164        try:
165            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
166            userX509CertTxt = open(userX509CertFilePath, 'r').read()
167       
168        except TypeError:
169            # No issuing cert set
170            userX509CertTxt = None
171               
172        except IOError, ioErr:
173            raise Exception("Error reading certificate file \"%s\": %s" % \
174                                    (ioErr.filename, ioErr.strerror))
175
176        # Make attribute certificate request
177        attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt)
178       
179        print "Attribute Certificate: \n\n:" + str(attCert)
180       
181        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
182        attCert.write()
183       
184       
185    def test07GetAttCertWithUserIdSet(self):       
186        """test07GetAttCertWithUserIdSet: Request attribute certificate from
187        NDG Attribute Authority Web Service setting a specific user Id
188        independent of the signer of the SOAP request."""
189        _cfg = self.cfg['test07GetAttCertWithUserIdSet']
190       
191        # Read user Certificate into a string ready for passing via WS
192        try:
193            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
194            userX509CertTxt = open(userX509CertFilePath, 'r').read()
195       
196        except TypeError:
197            # No issuing cert set
198            userX509CertTxt = None
199               
200        except IOError, ioErr:
201            raise Exception("Error reading certificate file \"%s\": %s" % \
202                                    (ioErr.filename, ioErr.strerror))
203
204        # Make attribute certificate request
205        userId = _cfg['userId']
206        attCert = self.siteAClnt.getAttCert(userId=userId,
207                                            userX509Cert=userX509CertTxt)
208       
209        print "Attribute Certificate: \n\n:" + str(attCert)
210       
211        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
212        attCert.write()
213
214
215    def test08GetMappedAttCert(self):       
216        """test08GetMappedAttCert: Request mapped attribute certificate from
217        NDG Attribute Authority Web Service."""
218        _cfg = self.cfg['test08GetMappedAttCert']
219       
220        # Read user Certificate into a string ready for passing via WS
221        try:
222            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
223            userX509CertTxt = open(userX509CertFilePath, 'r').read()
224       
225        except TypeError:
226            # No issuing cert set
227            userX509CertTxt = None
228               
229        except IOError, ioErr:
230            raise Exception("Error reading certificate file \"%s\": %s" % \
231                                    (ioErr.filename, ioErr.strerror))
232   
233        # Simlarly for Attribute Certificate
234        try:
235            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath']))
236           
237        except IOError, ioErr:
238            raise Exception("Error reading attribute certificate file \"%s\": "
239                            "%s" % (ioErr.filename, ioErr.strerror))
240       
241        # Make client to site B Attribute Authority
242        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
243                                       cfgFileSection='wsse',
244                                       cfg=self.cfgParser)
245   
246        # Make attribute certificate request
247        attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
248                                       userAttCert=userAttCert)
249        print "Attribute Certificate: \n\n:" + str(attCert)
250       
251        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath'])
252        attCert.write()
253       
254       
255    def test09GetMappedAttCertStressTest(self):       
256        """test09GetMappedAttCertStressTest: Request mapped attribute
257        certificate from NDG Attribute Authority Web Service."""
258        _cfg = self.cfg['test09GetMappedAttCertStressTest']
259       
260        # Read user Certificate into a string ready for passing via WS
261        try:
262            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
263            userX509CertTxt = open(userX509CertFilePath, 'r').read()
264       
265        except TypeError:
266            # No issuing cert set
267            userX509CertTxt = None
268               
269        except IOError, ioErr:
270            raise Exception("Error reading certificate file \"%s\": %s" % 
271                                    (ioErr.filename, ioErr.strerror))
272
273        # Make client to site B Attribute Authority
274        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
275                                       cfgFileSection='wsse',
276                                       cfg=self.cfgParser)
277
278        acFilePathList = [xpdVars(file) for file in \
279                          _cfg['userAttCertFilePathList'].split()]
280
281        for acFilePath in acFilePathList:
282            try:
283                userAttCert = AttCertRead(acFilePath)
284               
285            except IOError, ioErr:
286                raise Exception("Error reading attribute certificate file "
287                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
288       
289            # Make attribute certificate request
290            try:
291                attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
292                                               userAttCert=userAttCert)
293            except Exception, e:
294                outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \
295                        os.path.basename(acFilePath)   
296                msgFile = open(outFilePfx+".msg", 'w')
297                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
298             
299    def test10SAMLAttributeQuery(self):
300        _cfg = self.cfg['test10SAMLAttributeQuery']
301       
302        attributeQuery = AttributeQuery()
303        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
304        attributeQuery.id = str(uuid4())
305        attributeQuery.issueInstant = datetime.utcnow()
306       
307        attributeQuery.issuer = Issuer()
308        attributeQuery.issuer.format = "urn:esg:issuer"
309        attributeQuery.issuer.value = "Site A"   
310                       
311        attributeQuery.subject = Subject() 
312        attributeQuery.subject.nameID = NameID()
313        attributeQuery.subject.nameID.format = "urn:esg:openid"
314        attributeQuery.subject.nameID.value = \
315                                    "https://openid.localhost/philip.kershaw"
316        xsStringNs = SAMLConstants.XSD_NS+"#"+\
317                                        XSStringAttributeValue.TYPE_LOCAL_NAME
318        fnAttribute = Attribute()
319        fnAttribute.name = "urn:esg:first:name"
320        fnAttribute.nameFormat = xsStringNs
321        fnAttribute.friendlyName = "FirstName"
322
323        attributeQuery.attributes.append(fnAttribute)
324   
325        lnAttribute = Attribute()
326        lnAttribute.name = "urn:esg:last:name"
327        lnAttribute.nameFormat = xsStringNs
328        lnAttribute.friendlyName = "LastName"
329
330        attributeQuery.attributes.append(lnAttribute)
331   
332        emailAddressAttribute = Attribute()
333        emailAddressAttribute.name = "urn:esg:email:address"
334        emailAddressAttribute.nameFormat = xsStringNs
335        emailAddressAttribute.friendlyName = "emailAddress"
336       
337        attributeQuery.attributes.append(emailAddressAttribute) 
338
339        siteAAttribute = Attribute()
340        siteAAttribute.name = "urn:siteA:security:authz:1.0:attr"
341        siteAAttribute.nameFormat = xsStringNs
342       
343        attributeQuery.attributes.append(siteAAttribute) 
344
345        binding = SamlSoapBinding()
346        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
347       
348        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
349       
350        # Check Query ID matches the query ID the service received
351        self.assert_(response.inResponseTo == attributeQuery.id)
352       
353        now = datetime.utcnow()
354        self.assert_(response.issueInstant < now)
355        self.assert_(response.assertions[-1].issueInstant < now)       
356        self.assert_(response.assertions[-1].conditions.notBefore < now) 
357        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
358         
359        samlResponseElem = ResponseElementTree.toXML(response)
360       
361        print("SAML Response ...")
362        print(ElementTree.tostring(samlResponseElem))
363        print("Pretty print SAML Response ...")
364        print(prettyPrint(samlResponseElem))
365
366             
367    def test11SAMLAttributeQueryInvalidIssuer(self):
368        _cfg = self.cfg['test11SAMLAttributeQueryInvalidIssuer']
369       
370        attributeQuery = AttributeQuery()
371        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
372        attributeQuery.id = str(uuid4())
373        attributeQuery.issueInstant = datetime.utcnow()
374       
375        attributeQuery.issuer = Issuer()
376        attributeQuery.issuer.format = "urn:esg:issuer"
377        attributeQuery.issuer.value = "Invalid Site"   
378                       
379        attributeQuery.subject = Subject() 
380        attributeQuery.subject.nameID = NameID()
381        attributeQuery.subject.nameID.format = "urn:esg:openid"
382        attributeQuery.subject.nameID.value = \
383                                    "https://openid.localhost/philip.kershaw"
384        xsStringNs = SAMLConstants.XSD_NS+"#"+\
385                                        XSStringAttributeValue.TYPE_LOCAL_NAME
386
387        siteAAttribute = Attribute()
388        siteAAttribute.name = "urn:siteA:security:authz:1.0:attr"
389        siteAAttribute.nameFormat = xsStringNs
390       
391        attributeQuery.attributes.append(siteAAttribute) 
392
393        binding = SamlSoapBinding()
394        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
395       
396        self.assert_(
397            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
398        samlResponseElem = ResponseElementTree.toXML(response)
399       
400        print("SAML Response ...")
401        print(ElementTree.tostring(samlResponseElem))
402        print("Pretty print SAML Response ...")
403        print(prettyPrint(samlResponseElem))
404                   
405    def test12SAMLAttributeQueryUnknownSubject(self):
406        _cfg = self.cfg['test12SAMLAttributeQueryUnknownSubject']
407       
408        attributeQuery = AttributeQuery()
409        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
410        attributeQuery.id = str(uuid4())
411        attributeQuery.issueInstant = datetime.utcnow()
412       
413        attributeQuery.issuer = Issuer()
414        attributeQuery.issuer.format = "urn:esg:issuer"
415        attributeQuery.issuer.value = "Site A"   
416                       
417        attributeQuery.subject = Subject() 
418        attributeQuery.subject.nameID = NameID()
419        attributeQuery.subject.nameID.format = "urn:esg:openid"
420        attributeQuery.subject.nameID.value = \
421                                    "https://openid.localhost/unknown"
422        xsStringNs = SAMLConstants.XSD_NS+"#"+\
423                                        XSStringAttributeValue.TYPE_LOCAL_NAME
424
425        siteAAttribute = Attribute()
426        siteAAttribute.name = "urn:siteA:security:authz:1.0:attr"
427        siteAAttribute.nameFormat = xsStringNs
428       
429        attributeQuery.attributes.append(siteAAttribute) 
430
431        binding = SamlSoapBinding()
432        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
433       
434        self.assert_(
435            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
436        samlResponseElem = ResponseElementTree.toXML(response)
437       
438        print("SAML Response ...")
439        print(ElementTree.tostring(samlResponseElem))
440        print("Pretty print SAML Response ...")
441        print(prettyPrint(samlResponseElem))
442             
443    def test13SAMLAttributeQueryInvalidAttrName(self):
444        _cfg = self.cfg['test13SAMLAttributeQueryInvalidAttrName']
445       
446        attributeQuery = AttributeQuery()
447        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
448        attributeQuery.id = str(uuid4())
449        attributeQuery.issueInstant = datetime.utcnow()
450       
451        attributeQuery.issuer = Issuer()
452        attributeQuery.issuer.format = "urn:esg:issuer"
453        attributeQuery.issuer.value = "Site A"   
454                       
455        attributeQuery.subject = Subject() 
456        attributeQuery.subject.nameID = NameID()
457        attributeQuery.subject.nameID.format = "urn:esg:openid"
458        attributeQuery.subject.nameID.value = \
459                                    "https://openid.localhost/philip.kershaw"
460        xsStringNs = SAMLConstants.XSD_NS+"#"+\
461                                        XSStringAttributeValue.TYPE_LOCAL_NAME
462
463        invalidAttribute = Attribute()
464        invalidAttribute.name = "myInvalidAttributeName"
465        invalidAttribute.nameFormat = xsStringNs
466       
467        attributeQuery.attributes.append(invalidAttribute) 
468
469        binding = SamlSoapBinding()
470        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
471       
472        self.assert_(response.status.statusCode.value==\
473                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
474        samlResponseElem = ResponseElementTree.toXML(response)
475       
476        print("SAML Response ...")
477        print(ElementTree.tostring(samlResponseElem))
478        print("Pretty print SAML Response ...")
479        print(prettyPrint(samlResponseElem))
480       
481                                                       
482if __name__ == "__main__":
483    unittest.main()
Note: See TracBrowser for help on using the repository browser.