source: TI12-security/trunk/ndg_security_saml/saml/test/test_saml.py @ 6553

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/ndg_security_saml/saml/test/test_saml.py@6553
Revision 6553, 20.7 KB checked in by pjkersha, 11 years ago (diff)

Unit testing AuthzDecisionStatement? added to a Response.

Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "21/07/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id: $'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13   
14from datetime import datetime, timedelta
15from uuid import uuid4
16from cStringIO import StringIO
17
18import unittest
19
20from xml.etree.ElementTree import iselement
21from xml.etree import ElementTree
22
23from saml.saml2.core import (SAMLVersion, Attribute, AttributeStatement, 
24                             AuthzDecisionStatement, Assertion, AttributeQuery, 
25                             Response, Issuer, Subject, NameID, StatusCode, 
26                             StatusMessage, Status, Conditions, 
27                             XSStringAttributeValue, Action,
28                             AuthzDecisionQuery)
29
30from saml.common.xml import SAMLConstants
31from saml.xml.etree import (prettyPrint, AssertionElementTree, 
32                            AttributeQueryElementTree, ResponseElementTree,
33                            AuthzDecisionQueryElementTree)
34
35
36class SAMLUtil(object):
37    """SAML utility class based on ANL examples for Earth System Grid:
38    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
39    """
40    NAMEID_FORMAT = "urn:esg:openid"
41    NAMEID_VALUE = "https://openid.localhost/philip.kershaw"
42    ISSUER_DN = "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
43    UNCORRECTED_RESOURCE_URI = "http://LOCALHOST:80/My Secured URI"
44    RESOURCE_URI = "http://localhost/My%20Secured%20URI"
45   
46    def __init__(self):
47        """Set-up ESG core attributes, Group/Role and miscellaneous
48        attributes lists
49        """
50        self.firstName = None
51        self.lastName = None
52        self.emailAddress = None
53       
54        self.__miscAttrList = []
55   
56    def addAttribute(self, name, value):
57        """Add a generic attribute
58        @type name: basestring
59        @param name: attribute name
60        @type value: basestring
61        @param value: attribute value
62        """
63        self.__miscAttrList.append((name, value))
64
65    def buildAssertion(self):
66        """Create a SAML Assertion containing ESG core attributes: First
67        Name, Last Name, e-mail Address; ESG Group/Role type attributes
68        and generic attributes
69        @rtype: ndg.security.common.saml.Assertion
70        @return: new SAML Assertion object
71        """
72       
73        assertion = Assertion()
74        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
75        assertion.id = str(uuid4())
76        assertion.issueInstant = datetime.utcnow()
77        attributeStatement = AttributeStatement()
78       
79        for attribute in self.createAttributes():
80            attributeStatement.attributes.append(attribute)
81           
82        assertion.attributeStatements.append(attributeStatement)
83       
84        return assertion
85
86    def buildAttributeQuery(self, issuer, subjectNameID):
87        """Make a SAML Attribute Query
88        @type issuer: basestring
89        @param issuer: attribute issuer name
90        @type subjectNameID: basestring
91        @param subjectNameID: identity to query attributes for
92        """
93        attributeQuery = AttributeQuery()
94        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
95        attributeQuery.id = str(uuid4())
96        attributeQuery.issueInstant = datetime.utcnow()
97       
98        attributeQuery.issuer = Issuer()
99        attributeQuery.issuer.format = Issuer.X509_SUBJECT
100        attributeQuery.issuer.value = issuer
101                       
102        attributeQuery.subject = Subject() 
103        attributeQuery.subject.nameID = NameID()
104        attributeQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
105        attributeQuery.subject.nameID.value = subjectNameID
106                                   
107        attributeQuery.attributes = self.createAttributes()
108       
109        return attributeQuery
110   
111    def createAttributes(self):
112        """Create SAML Attributes for use in an Assertion or AttributeQuery"""
113       
114        attributes = []
115        if self.firstName is not None:   
116            # special case handling for 'FirstName' attribute
117            fnAttribute = Attribute()
118            fnAttribute.name = "urn:esg:first:name"
119            fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
120            fnAttribute.friendlyName = "FirstName"
121
122            firstName = XSStringAttributeValue()
123            firstName.value = self.firstName
124            fnAttribute.attributeValues.append(firstName)
125
126            attributes.append(fnAttribute)
127       
128
129        if self.lastName is not None:
130            # special case handling for 'LastName' attribute
131            lnAttribute = Attribute()
132            lnAttribute.name = "urn:esg:last:name"
133            lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
134            lnAttribute.friendlyName = "LastName"
135
136            lastName = XSStringAttributeValue()
137            lastName.value = self.lastName
138            lnAttribute.attributeValues.append(lastName)
139
140            attributes.append(lnAttribute)
141       
142
143        if self.emailAddress is not None:
144            # special case handling for 'LastName' attribute
145            emailAddressAttribute = Attribute()
146            emailAddressAttribute.name = "urn:esg:email:address"
147            emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
148                                        XSStringAttributeValue.TYPE_LOCAL_NAME
149            emailAddressAttribute.friendlyName = "emailAddress"
150
151            emailAddress = XSStringAttributeValue()
152            emailAddress.value = self.emailAddress
153            emailAddressAttribute.attributeValues.append(emailAddress)
154
155            attributes.append(emailAddressAttribute)
156       
157        for name, value in self.__miscAttrList:
158            attribute = Attribute()
159            attribute.name = name
160            attribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
161
162            stringAttributeValue = XSStringAttributeValue()
163            stringAttributeValue.value = value
164            attribute.attributeValues.append(stringAttributeValue)
165
166            attributes.append(attribute)
167           
168        return attributes
169   
170    def buildAuthzDecisionQuery(self, 
171                                issuer=ISSUER_DN,
172                                issuerFormat=Issuer.X509_SUBJECT,
173                                subjectNameID=NAMEID_VALUE, 
174                                subjectNameIdFormat=NAMEID_FORMAT,
175                                resource=UNCORRECTED_RESOURCE_URI,
176                                actions=((Action.HTTP_GET_ACTION, 
177                                          Action.GHPP_NS_URI),)):
178        """Convenience utility to make an Authorisation decision query"""
179        authzDecisionQuery = AuthzDecisionQuery()
180
181        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
182        authzDecisionQuery.id = str(uuid4())
183        authzDecisionQuery.issueInstant = datetime.utcnow()
184       
185        authzDecisionQuery.issuer = Issuer()
186        authzDecisionQuery.issuer.format = issuerFormat
187        authzDecisionQuery.issuer.value = issuer
188       
189        authzDecisionQuery.subject = Subject()
190        authzDecisionQuery.subject.nameID = NameID()
191        authzDecisionQuery.subject.nameID.format = subjectNameIdFormat
192        authzDecisionQuery.subject.nameID.value = subjectNameID
193       
194        authzDecisionQuery.resource = resource
195       
196        for action, namespace in actions:
197            authzDecisionQuery.actions.append(Action())
198            authzDecisionQuery.actions[-1].namespace = namespace
199            authzDecisionQuery.actions[-1].value = action
200           
201        return authzDecisionQuery
202           
203
204class SAMLTestCase(unittest.TestCase):
205    """Test SAML implementation for use with CMIP5 federation"""
206    NAMEID_FORMAT = SAMLUtil.NAMEID_FORMAT
207    NAMEID_VALUE = SAMLUtil.NAMEID_VALUE
208    ISSUER_DN = SAMLUtil.ISSUER_DN
209    UNCORRECTED_RESOURCE_URI = SAMLUtil.UNCORRECTED_RESOURCE_URI
210    RESOURCE_URI = SAMLUtil.RESOURCE_URI
211   
212    def _createAttributeAssertionHelper(self):
213        samlUtil = SAMLUtil()
214       
215        # ESG core attributes
216        samlUtil.firstName = "Philip"
217        samlUtil.lastName = "Kershaw"
218        samlUtil.emailAddress = "p.j.k@somewhere"
219       
220        # BADC specific attributes
221        badcRoleList = (
222            'urn:badc:security:authz:1.0:attr:admin', 
223            'urn:badc:security:authz:1.0:attr:rapid', 
224            'urn:badc:security:authz:1.0:attr:coapec', 
225            'urn:badc:security:authz:1.0:attr:midas', 
226            'urn:badc:security:authz:1.0:attr:quest', 
227            'urn:badc:security:authz:1.0:attr:staff'
228        )
229        for role in badcRoleList:
230            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
231       
232        # Make an assertion object
233        assertion = samlUtil.buildAssertion()
234       
235        return assertion
236       
237    def test01CreateAssertion(self):
238         
239        assertion = self._createAttributeAssertionHelper()
240
241       
242        # Create ElementTree Assertion Element
243        assertionElem = AssertionElementTree.toXML(assertion)
244       
245        self.assert_(iselement(assertionElem))
246       
247        # Serialise to output
248        xmlOutput = prettyPrint(assertionElem)       
249        self.assert_(len(xmlOutput))
250       
251        print("\n"+"_"*80)
252        print(xmlOutput)
253        print("_"*80)
254
255    def test02ParseAssertion(self):
256        assertion = self._createAttributeAssertionHelper()
257       
258        # Create ElementTree Assertion Element
259        assertionElem = AssertionElementTree.toXML(assertion)
260       
261        self.assert_(iselement(assertionElem))
262       
263        # Serialise to output
264        xmlOutput = prettyPrint(assertionElem)       
265           
266        print("\n"+"_"*80)
267        print(xmlOutput)
268        print("_"*80)
269               
270        assertionStream = StringIO()
271        assertionStream.write(xmlOutput)
272        assertionStream.seek(0)
273
274        tree = ElementTree.parse(assertionStream)
275        elem2 = tree.getroot()
276       
277        assertion2 = AssertionElementTree.fromXML(elem2)
278        self.assert_(assertion2)
279       
280    def test03CreateAttributeQuery(self):
281        samlUtil = SAMLUtil()
282        samlUtil.firstName = ''
283        samlUtil.lastName = ''
284        samlUtil.emailAddress = ''
285        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
286                                                      SAMLTestCase.NAMEID_VALUE)
287       
288        elem = AttributeQueryElementTree.toXML(attributeQuery)       
289        xmlOutput = prettyPrint(elem)
290           
291        print("\n"+"_"*80)
292        print(xmlOutput)
293        print("_"*80)
294
295    def test04ParseAttributeQuery(self):
296        samlUtil = SAMLUtil()
297        samlUtil.firstName = ''
298        samlUtil.lastName = ''
299        samlUtil.emailAddress = ''
300        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
301                                                      SAMLTestCase.NAMEID_VALUE)
302       
303        elem = AttributeQueryElementTree.toXML(attributeQuery)       
304        xmlOutput = prettyPrint(elem)       
305        print("\n"+"_"*80)
306        print(xmlOutput)
307               
308        attributeQueryStream = StringIO()
309        attributeQueryStream.write(xmlOutput)
310        attributeQueryStream.seek(0)
311
312        tree = ElementTree.parse(attributeQueryStream)
313        elem2 = tree.getroot()
314       
315        attributeQuery2 = AttributeQueryElementTree.fromXML(elem2)
316        self.assert_(attributeQuery2.id == attributeQuery.id)
317        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
318        self.assert_(attributeQuery2.subject.nameID.value == \
319                     attributeQuery.subject.nameID.value)
320       
321        self.assert_(attributeQuery2.attributes[1].name == \
322                     attributeQuery.attributes[1].name)
323       
324        xmlOutput2 = prettyPrint(elem2)       
325        print("_"*80)
326        print(xmlOutput2)
327        print("_"*80)
328
329    def test05CreateAttributeQueryResponse(self):
330        response = Response()
331        response.issueInstant = datetime.utcnow()
332       
333        # Make up a request ID that this response is responding to
334        response.inResponseTo = str(uuid4())
335        response.id = str(uuid4())
336        response.version = SAMLVersion(SAMLVersion.VERSION_20)
337           
338        response.issuer = Issuer()
339        response.issuer.format = Issuer.X509_SUBJECT
340        response.issuer.value = \
341                        SAMLTestCase.ISSUER_DN
342       
343        response.status = Status()
344        response.status.statusCode = StatusCode()
345        response.status.statusCode.value = StatusCode.SUCCESS_URI
346        response.status.statusMessage = StatusMessage()       
347        response.status.statusMessage.value = "Response created successfully"
348           
349        assertion = self._createAttributeAssertionHelper()
350       
351        # Add a conditions statement for a validity of 8 hours
352        assertion.conditions = Conditions()
353        assertion.conditions.notBefore = datetime.utcnow()
354        assertion.conditions.notOnOrAfter = (assertion.conditions.notBefore + 
355                                             timedelta(seconds=60*60*8))
356       
357        assertion.subject = Subject() 
358        assertion.subject.nameID = NameID()
359        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
360        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE   
361           
362        assertion.issuer = Issuer()
363        assertion.issuer.format = Issuer.X509_SUBJECT
364        assertion.issuer.value = SAMLTestCase.ISSUER_DN
365
366        response.assertions.append(assertion)
367       
368        # Create ElementTree Assertion Element
369        responseElem = ResponseElementTree.toXML(response)
370       
371        self.assert_(iselement(responseElem))
372       
373        # Serialise to output       
374        xmlOutput = prettyPrint(responseElem)       
375        self.assert_(len(xmlOutput))
376        print("\n"+"_"*80)
377        print(xmlOutput)
378        print("_"*80)
379       
380    def test06CreateAuthzDecisionQuery(self):
381        authzDecisionQuery = AuthzDecisionQuery()
382
383        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
384        authzDecisionQuery.id = str(uuid4())
385        authzDecisionQuery.issueInstant = datetime.utcnow()
386       
387        authzDecisionQuery.issuer = Issuer()
388        authzDecisionQuery.issuer.format = Issuer.X509_SUBJECT
389        authzDecisionQuery.issuer.value = SAMLTestCase.ISSUER_DN
390       
391        authzDecisionQuery.subject = Subject()
392        authzDecisionQuery.subject.nameID = NameID()
393        authzDecisionQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
394        authzDecisionQuery.subject.nameID.value = SAMLTestCase.NAMEID_VALUE
395       
396        authzDecisionQuery.resource = "http://LOCALHOST:80/My Secured URI"
397        self.assert_(":80" not in authzDecisionQuery.resource)
398        self.assert_("localhost" in authzDecisionQuery.resource)
399        self.assert_(" " not in authzDecisionQuery.resource)
400       
401        authzDecisionQuery.resource = \
402            "https://Somewhere.ac.uk:443/My Secured URI?blah=4&yes=True"
403           
404        self.assert_(":443" not in authzDecisionQuery.resource)
405        self.assert_("somewhere.ac.uk" in authzDecisionQuery.resource)
406        self.assert_("yes=True" in authzDecisionQuery.resource)
407       
408        authzDecisionQuery.actions.append(Action())
409        authzDecisionQuery.actions[0].namespace = Action.GHPP_NS_URI
410        authzDecisionQuery.actions[0].value = Action.HTTP_GET_ACTION
411       
412        self.assert_(
413            authzDecisionQuery.actions[0].value == Action.HTTP_GET_ACTION)
414        self.assert_(
415            authzDecisionQuery.actions[0].namespace == Action.GHPP_NS_URI)
416       
417        # Try out the restricted vocabulary
418        try:
419            authzDecisionQuery.actions[0].value = "delete everything"
420            self.fail("Expecting AttributeError raised for incorrect action "
421                      "setting.")
422        except AttributeError, e:
423            print("Caught incorrect action type setting: %s" % e)
424       
425        authzDecisionQuery.actions[0].actionTypes = {'urn:malicious': 
426                                                     ("delete everything",)}
427       
428        # Try again now that the actipn types have been adjusted
429        authzDecisionQuery.actions[0].namespace = 'urn:malicious'
430        authzDecisionQuery.actions[0].value = "delete everything"
431       
432    def test07SerializeAuthzDecisionQuery(self):
433        samlUtil = SAMLUtil()
434        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
435       
436        # Create ElementTree Assertion Element
437        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML(
438                                                            authzDecisionQuery)
439       
440        self.assert_(iselement(authzDecisionQueryElem))
441       
442        # Serialise to output
443        xmlOutput = prettyPrint(authzDecisionQueryElem)       
444        self.assert_(len(xmlOutput))
445       
446        print("\n"+"_"*80)
447        print(xmlOutput)
448        print("_"*80)
449   
450    def test08DeserializeAuthzDecisionQuery(self):
451        samlUtil = SAMLUtil()
452        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
453       
454        # Create ElementTree Assertion Element
455        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML(
456                                                            authzDecisionQuery)
457       
458        self.assert_(iselement(authzDecisionQueryElem))
459       
460        # Serialise to output
461        xmlOutput = prettyPrint(authzDecisionQueryElem)       
462        self.assert_(len(xmlOutput))
463       
464        authzDecisionQueryStream = StringIO()
465        authzDecisionQueryStream.write(xmlOutput)
466        authzDecisionQueryStream.seek(0)
467
468        tree = ElementTree.parse(authzDecisionQueryStream)
469        elem2 = tree.getroot()
470       
471        authzDecisionQuery2 = AuthzDecisionQueryElementTree.fromXML(elem2)
472        self.assert_(authzDecisionQuery2)
473        self.assert_(
474        authzDecisionQuery2.subject.nameID.value == SAMLTestCase.NAMEID_VALUE)
475        self.assert_(
476        authzDecisionQuery2.subject.nameID.format == SAMLTestCase.NAMEID_FORMAT)
477        self.assert_(
478            authzDecisionQuery2.issuer.value == SAMLTestCase.ISSUER_DN)
479        self.assert_(
480            authzDecisionQuery2.resource == SAMLTestCase.RESOURCE_URI)
481        self.assert_(len(authzDecisionQuery2.actions) == 1)
482        self.assert_(
483            authzDecisionQuery2.actions[0].value == Action.HTTP_GET_ACTION)
484        self.assert_(
485            authzDecisionQuery2.actions[0].namespace == Action.GHPP_NS_URI)
486        self.assert_(authzDecisionQuery2.evidence is None)
487
488
489    def test05CreateAuthzDecisionQueryResponse(self):
490        response = Response()
491        response.issueInstant = datetime.utcnow()
492       
493        # Make up a request ID that this response is responding to
494        response.inResponseTo = str(uuid4())
495        response.id = str(uuid4())
496        response.version = SAMLVersion(SAMLVersion.VERSION_20)
497           
498        response.issuer = Issuer()
499        response.issuer.format = Issuer.X509_SUBJECT
500        response.issuer.value = \
501                        SAMLTestCase.ISSUER_DN
502       
503        response.status = Status()
504        response.status.statusCode = StatusCode()
505        response.status.statusCode.value = StatusCode.SUCCESS_URI
506        response.status.statusMessage = StatusMessage()       
507        response.status.statusMessage.value = "Response created successfully"
508           
509        assertion = Assertion()
510        authzDecisionStatement = AuthzDecisionStatement()
511        authzDecisionStatement.resource = SAMLTestCase.RESOURCE_URI
512        authzDecisionStatement.actions.append(Action())
513        authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
514        authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
515        assertion.authzDecisionStatements.append(authzDecisionStatement)
516       
517#        assertion.subject = Subject() 
518#        assertion.subject.nameID = NameID()
519#        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
520#        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE   
521#           
522#        assertion.issuer = Issuer()
523#        assertion.issuer.format = Issuer.X509_SUBJECT
524#        assertion.issuer.value = SAMLTestCase.ISSUER_DN
525
526        response.assertions.append(assertion)
527       
528        # Create ElementTree Assertion Element
529        responseElem = ResponseElementTree.toXML(response)
530       
531        self.assert_(iselement(responseElem))
532       
533        # Serialise to output       
534        xmlOutput = prettyPrint(responseElem)       
535        self.assert_(len(xmlOutput))
536        print("\n"+"_"*80)
537        print(xmlOutput)
538        print("_"*80)
539
540       
541if __name__ == "__main__":
542    unittest.main()       
Note: See TracBrowser for help on using the repository browser.