source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py @ 5907

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py@5907
Revision 5907, 19.8 KB checked in by pjkersha, 10 years ago (diff)

Move MyProxy? certificate callout application into ndg.security.server and ndg.security.test. Too much trouble as separate egg.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os, sys, getpass, re
17   
18from os.path import expandvars as xpdVars
19from os.path import join as jnPath
20mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
21
22from datetime import datetime
23from uuid import uuid4
24from xml.etree import ElementTree
25
26from ndg.security.test.unit import BaseTestCase, mkDataDirPath
27
28from ndg.security.common.utils.etree import prettyPrint
29
30from ndg.security.common.attributeauthority import AttributeAuthorityClient, \
31    NoMatchingRoleInTrustedHosts
32from ndg.security.common.AttCert import AttCertRead
33from ndg.security.common.X509 import X509CertParse, X509CertRead
34from ndg.security.common.utils.configfileparsers import \
35    CaseSensitiveConfigParser
36
37from saml.common.xml import SAMLConstants
38from saml.saml2.core import Response, Assertion, Attribute, AttributeValue, \
39    AttributeStatement, SAMLVersion, Subject, NameID, Issuer, AttributeQuery, \
40    XSStringAttributeValue, XSGroupRoleAttributeValue, Conditions, Status, \
41    StatusCode
42from saml.xml.etree import ResponseElementTree
43   
44from ndg.security.common.saml.bindings import SOAPBinding as SamlSoapBinding
45
46
47class AttributeAuthorityClientBaseTestCase(BaseTestCase):
48    def __init__(self, *arg, **kw):
49        super(AttributeAuthorityClientBaseTestCase, self).__init__(*arg, **kw)
50
51        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
52            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
53                os.path.abspath(os.path.dirname(__file__))
54
55        self.cfgParser = CaseSensitiveConfigParser()
56        cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
57                             'attAuthorityClientTest.cfg')
58        self.cfgParser.read(cfgFilePath)
59       
60        self.cfg = {}
61        for section in self.cfgParser.sections():
62            self.cfg[section] = dict(self.cfgParser.items(section))
63
64        try:
65            self.sslCACertList = [X509CertRead(xpdVars(caFile)) for caFile in \
66                         self.cfg['setUp']['sslcaCertFilePathList'].split()]
67        except KeyError:
68            self.sslCACertList = []
69           
70        self.startAttributeAuthorities()       
71     
72     
73class AttributeAuthorityClientTestCase(AttributeAuthorityClientBaseTestCase):
74    clntPriKeyPwd = None
75    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
76
77    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
78        '''Read proxy cert and user cert from a single PEM file and put in
79        a list ready for input into SignatureHandler'''               
80        proxyCertFileTxt = open(proxyCertFilePath).read()
81       
82        pemPatRE = re.compile(self.__class__.pemPat, re.S)
83        x509CertList = pemPatRE.findall(proxyCertFileTxt)
84       
85        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
86                            x509CertList]
87   
88        # Expecting proxy cert first - move this to the end.  This will
89        # be the cert used to verify the message signature
90        signingCertChain.reverse()
91       
92        return signingCertChain
93
94    def setUp(self):
95        super(AttributeAuthorityClientTestCase, self).setUp()
96               
97        if 'NDGSEC_INT_DEBUG' in os.environ:
98            import pdb
99            pdb.set_trace()
100           
101        thisSection = self.cfg['setUp']
102       
103        # Instantiate WS proxy
104        self.siteAClnt = AttributeAuthorityClient(uri=thisSection['uri'],
105                                sslPeerCertCN=thisSection.get('sslPeerCertCN'),
106                                sslCACertList=self.sslCACertList,
107                                cfgFileSection='wsse',
108                                cfg=self.cfgParser)           
109
110    def test01GetHostInfo(self):
111        """test01GetHostInfo: retrieve info for AA host"""
112        hostInfo = self.siteAClnt.getHostInfo()
113        print "Host Info:\n %s" % hostInfo       
114
115    def test02GetTrustedHostInfo(self):
116        """test02GetTrustedHostInfo: retrieve trusted host info matching a
117        given role"""
118        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(\
119                                 self.cfg['test02GetTrustedHostInfo']['role'])
120        for hostname, hostInfo in trustedHostInfo.items():
121            self.assert_(hostname, "Hostname not set")
122            for k, v in hostInfo.items():
123                self.assert_(k, "hostInfo value key unset")
124
125        print "Trusted Host Info:\n %s" % trustedHostInfo
126
127    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
128        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
129        where the input role doesn't match any roles in the target AA's map
130        config file"""
131        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
132        try:
133            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
134            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
135           
136        except NoMatchingRoleInTrustedHosts, e:
137            print 'As expected - no match for role "%s": %s' % \
138                (_cfg['role'], e)
139
140
141    def test04GetTrustedHostInfoWithNoRole(self):
142        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
143        irrespective of role"""
144        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
145        for hostname, hostInfo in trustedHostInfo.items():
146            self.assert_(hostname, "Hostname not set")
147            for k, v in hostInfo.items():
148                self.assert_(k, "hostInfo value key unset")
149                   
150        print "Trusted Host Info:\n %s" % trustedHostInfo
151       
152
153    def test05GetAllHostsInfo(self):
154        """test05GetAllHostsInfo: retrieve info for all hosts"""
155        allHostInfo = self.siteAClnt.getAllHostsInfo()
156        for hostname, hostInfo in allHostInfo.items():
157            self.assert_(hostname, "Hostname not set")
158            for k, v in hostInfo.items():
159                self.assert_(k, "hostInfo value key unset")
160                   
161        print "All Hosts Info:\n %s" % allHostInfo
162
163
164    def test06GetAttCert(self):       
165        """test06GetAttCert: Request attribute certificate from NDG Attribute
166        Authority Web Service."""
167        _cfg = self.cfg['test06GetAttCert']
168       
169        # Read user Certificate into a string ready for passing via WS
170        try:
171            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
172            userX509CertTxt = open(userX509CertFilePath, 'r').read()
173       
174        except TypeError:
175            # No issuing cert set
176            userX509CertTxt = None
177               
178        except IOError, ioErr:
179            raise Exception("Error reading certificate file \"%s\": %s" % \
180                                    (ioErr.filename, ioErr.strerror))
181
182        # Make attribute certificate request
183        attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt)
184       
185        print "Attribute Certificate: \n\n:" + str(attCert)
186       
187        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
188        attCert.write()
189       
190       
191    def test07GetAttCertWithUserIdSet(self):       
192        """test07GetAttCertWithUserIdSet: Request attribute certificate from
193        NDG Attribute Authority Web Service setting a specific user Id
194        independent of the signer of the SOAP request."""
195        _cfg = self.cfg['test07GetAttCertWithUserIdSet']
196       
197        # Read user Certificate into a string ready for passing via WS
198        try:
199            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
200            userX509CertTxt = open(userX509CertFilePath, 'r').read()
201       
202        except TypeError:
203            # No issuing cert set
204            userX509CertTxt = None
205               
206        except IOError, ioErr:
207            raise Exception("Error reading certificate file \"%s\": %s" % \
208                                    (ioErr.filename, ioErr.strerror))
209
210        # Make attribute certificate request
211        userId = _cfg['userId']
212        attCert = self.siteAClnt.getAttCert(userId=userId,
213                                            userX509Cert=userX509CertTxt)
214       
215        print "Attribute Certificate: \n\n:" + str(attCert)
216       
217        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
218        attCert.write()
219
220
221    def test08GetMappedAttCert(self):       
222        """test08GetMappedAttCert: Request mapped attribute certificate from
223        NDG Attribute Authority Web Service."""
224        _cfg = self.cfg['test08GetMappedAttCert']
225       
226        # Read user Certificate into a string ready for passing via WS
227        try:
228            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
229            userX509CertTxt = open(userX509CertFilePath, 'r').read()
230       
231        except TypeError:
232            # No issuing cert set
233            userX509CertTxt = None
234               
235        except IOError, ioErr:
236            raise Exception("Error reading certificate file \"%s\": %s" % \
237                                    (ioErr.filename, ioErr.strerror))
238   
239        # Simlarly for Attribute Certificate
240        try:
241            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath']))
242           
243        except IOError, ioErr:
244            raise Exception("Error reading attribute certificate file \"%s\": "
245                            "%s" % (ioErr.filename, ioErr.strerror))
246       
247        # Make client to site B Attribute Authority
248        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
249                                       cfgFileSection='wsse',
250                                       cfg=self.cfgParser)
251   
252        # Make attribute certificate request
253        attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
254                                       userAttCert=userAttCert)
255        print "Attribute Certificate: \n\n:" + str(attCert)
256       
257        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath'])
258        attCert.write()
259       
260       
261    def test09GetMappedAttCertStressTest(self):       
262        """test09GetMappedAttCertStressTest: Request mapped attribute
263        certificate from NDG Attribute Authority Web Service."""
264        _cfg = self.cfg['test09GetMappedAttCertStressTest']
265       
266        # Read user Certificate into a string ready for passing via WS
267        try:
268            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
269            userX509CertTxt = open(userX509CertFilePath, 'r').read()
270       
271        except TypeError:
272            # No issuing cert set
273            userX509CertTxt = None
274               
275        except IOError, ioErr:
276            raise Exception("Error reading certificate file \"%s\": %s" % 
277                                    (ioErr.filename, ioErr.strerror))
278
279        # Make client to site B Attribute Authority
280        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
281                                       cfgFileSection='wsse',
282                                       cfg=self.cfgParser)
283
284        acFilePathList = [xpdVars(file) for file in \
285                          _cfg['userAttCertFilePathList'].split()]
286
287        for acFilePath in acFilePathList:
288            try:
289                userAttCert = AttCertRead(acFilePath)
290               
291            except IOError, ioErr:
292                raise Exception("Error reading attribute certificate file "
293                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
294       
295            # Make attribute certificate request
296            try:
297                attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
298                                               userAttCert=userAttCert)
299            except Exception, e:
300                outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \
301                        os.path.basename(acFilePath)   
302                msgFile = open(outFilePfx+".msg", 'w')
303                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
304
305   
306class AttributeAuthoritySAMLInterfaceTestCase(
307                                        AttributeAuthorityClientBaseTestCase):
308    """Separate class for Attribute Authority SAML Attribute Query interface"""
309           
310    def test01SAMLAttributeQuery(self):
311        _cfg = self.cfg['test01SAMLAttributeQuery']
312       
313        attributeQuery = AttributeQuery()
314        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
315        attributeQuery.id = str(uuid4())
316        attributeQuery.issueInstant = datetime.utcnow()
317       
318        attributeQuery.issuer = Issuer()
319        attributeQuery.issuer.format = "urn:esg:issuer"
320        attributeQuery.issuer.value = "Site A"   
321                       
322        attributeQuery.subject = Subject() 
323        attributeQuery.subject.nameID = NameID()
324        attributeQuery.subject.nameID.format = "urn:esg:openid"
325        attributeQuery.subject.nameID.value = _cfg['subject']
326        xsStringNs = SAMLConstants.XSD_NS+"#"+\
327                                        XSStringAttributeValue.TYPE_LOCAL_NAME
328        fnAttribute = Attribute()
329        fnAttribute.name = "urn:esg:first:name"
330        fnAttribute.nameFormat = xsStringNs
331        fnAttribute.friendlyName = "FirstName"
332
333        attributeQuery.attributes.append(fnAttribute)
334   
335        lnAttribute = Attribute()
336        lnAttribute.name = "urn:esg:last:name"
337        lnAttribute.nameFormat = xsStringNs
338        lnAttribute.friendlyName = "LastName"
339
340        attributeQuery.attributes.append(lnAttribute)
341   
342        emailAddressAttribute = Attribute()
343        emailAddressAttribute.name = "urn:esg:email:address"
344        emailAddressAttribute.nameFormat = xsStringNs
345        emailAddressAttribute.friendlyName = "emailAddress"
346       
347        attributeQuery.attributes.append(emailAddressAttribute) 
348
349        siteAAttribute = Attribute()
350        siteAAttribute.name = _cfg['siteAttributeName']
351        siteAAttribute.nameFormat = xsStringNs
352       
353        attributeQuery.attributes.append(siteAAttribute) 
354
355        binding = SamlSoapBinding()
356        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
357       
358        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
359       
360        # Check Query ID matches the query ID the service received
361        self.assert_(response.inResponseTo == attributeQuery.id)
362       
363        now = datetime.utcnow()
364        self.assert_(response.issueInstant < now)
365        self.assert_(response.assertions[-1].issueInstant < now)       
366        self.assert_(response.assertions[-1].conditions.notBefore < now) 
367        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
368         
369        samlResponseElem = ResponseElementTree.toXML(response)
370       
371        print("SAML Response ...")
372        print(ElementTree.tostring(samlResponseElem))
373        print("Pretty print SAML Response ...")
374        print(prettyPrint(samlResponseElem))
375             
376    def test02SAMLAttributeQueryInvalidIssuer(self):
377        _cfg = self.cfg['test02SAMLAttributeQueryInvalidIssuer']
378       
379        attributeQuery = AttributeQuery()
380        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
381        attributeQuery.id = str(uuid4())
382        attributeQuery.issueInstant = datetime.utcnow()
383       
384        attributeQuery.issuer = Issuer()
385        attributeQuery.issuer.format = "urn:esg:issuer"
386        attributeQuery.issuer.value = "Invalid Site"   
387                       
388        attributeQuery.subject = Subject() 
389        attributeQuery.subject.nameID = NameID()
390        attributeQuery.subject.nameID.format = "urn:esg:openid"
391        attributeQuery.subject.nameID.value = _cfg['subject']
392        xsStringNs = SAMLConstants.XSD_NS+"#"+\
393                                        XSStringAttributeValue.TYPE_LOCAL_NAME
394
395        siteAAttribute = Attribute()
396        siteAAttribute.name = _cfg['siteAttributeName']
397        siteAAttribute.nameFormat = xsStringNs
398       
399        attributeQuery.attributes.append(siteAAttribute) 
400
401        binding = SamlSoapBinding()
402        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
403
404        samlResponseElem = ResponseElementTree.toXML(response)
405       
406        print("SAML Response ...")
407        print(ElementTree.tostring(samlResponseElem))
408        print("Pretty print SAML Response ...")
409        print(prettyPrint(samlResponseElem))
410       
411        self.assert_(
412            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
413                   
414    def test03SAMLAttributeQueryUnknownSubject(self):
415        _cfg = self.cfg['test03SAMLAttributeQueryUnknownSubject']
416       
417        attributeQuery = AttributeQuery()
418        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
419        attributeQuery.id = str(uuid4())
420        attributeQuery.issueInstant = datetime.utcnow()
421       
422        attributeQuery.issuer = Issuer()
423        attributeQuery.issuer.format = "urn:esg:issuer"
424        attributeQuery.issuer.value = "Site A"   
425                       
426        attributeQuery.subject = Subject() 
427        attributeQuery.subject.nameID = NameID()
428        attributeQuery.subject.nameID.format = "urn:esg:openid"
429        attributeQuery.subject.nameID.value = _cfg['subject']
430        xsStringNs = SAMLConstants.XSD_NS+"#"+\
431                                        XSStringAttributeValue.TYPE_LOCAL_NAME
432
433        siteAAttribute = Attribute()
434        siteAAttribute.name = _cfg['siteAttributeName']
435        siteAAttribute.nameFormat = xsStringNs
436       
437        attributeQuery.attributes.append(siteAAttribute) 
438
439        binding = SamlSoapBinding()
440        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
441       
442        samlResponseElem = ResponseElementTree.toXML(response)
443        print("SAML Response ...")
444        print(ElementTree.tostring(samlResponseElem))
445        print("Pretty print SAML Response ...")
446        print(prettyPrint(samlResponseElem))
447       
448        self.assert_(
449            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
450             
451    def test04SAMLAttributeQueryInvalidAttrName(self):
452        _cfg = self.cfg['test04SAMLAttributeQueryInvalidAttrName']
453       
454        attributeQuery = AttributeQuery()
455        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
456        attributeQuery.id = str(uuid4())
457        attributeQuery.issueInstant = datetime.utcnow()
458       
459        attributeQuery.issuer = Issuer()
460        attributeQuery.issuer.format = "urn:esg:issuer"
461        attributeQuery.issuer.value = "Site A"   
462                       
463        attributeQuery.subject = Subject() 
464        attributeQuery.subject.nameID = NameID()
465        attributeQuery.subject.nameID.format = "urn:esg:openid"
466        attributeQuery.subject.nameID.value = _cfg['subject']
467        xsStringNs = SAMLConstants.XSD_NS+"#"+\
468                                        XSStringAttributeValue.TYPE_LOCAL_NAME
469
470        invalidAttribute = Attribute()
471        invalidAttribute.name = "myInvalidAttributeName"
472        invalidAttribute.nameFormat = xsStringNs
473       
474        attributeQuery.attributes.append(invalidAttribute) 
475
476        binding = SamlSoapBinding()
477        response = binding.attributeQuery(attributeQuery, _cfg['uri'])
478       
479        samlResponseElem = ResponseElementTree.toXML(response)
480       
481        print("SAML Response ...")
482        print(ElementTree.tostring(samlResponseElem))
483        print("Pretty print SAML Response ...")
484        print(prettyPrint(samlResponseElem))
485       
486        self.assert_(response.status.statusCode.value==\
487                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
488       
489                                                       
490if __name__ == "__main__":
491    unittest.main()
Note: See TracBrowser for help on using the repository browser.