source: TI12-security/trunk/ndg_security_saml/ndg/saml/test/test_saml.py @ 6608

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/ndg_security_saml/ndg/saml/test/test_saml.py@6608
Revision 6608, 29.2 KB checked in by pjkersha, 10 years ago (diff)

Moving whole code base into ndg namespace.

Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "21/07/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id: $'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13   
14from datetime import datetime, timedelta
15from uuid import uuid4
16from cStringIO import StringIO
17
18import unittest
19import pickle
20
21from xml.etree.ElementTree import iselement
22from xml.etree import ElementTree
23
24from saml.saml2.core import (SAMLVersion, Attribute, AttributeStatement, 
25                             AuthzDecisionStatement, Assertion, AttributeQuery, 
26                             Response, Issuer, Subject, NameID, StatusCode, 
27                             StatusMessage, Status, Conditions, DecisionType,
28                             XSStringAttributeValue, Action, 
29                             AuthzDecisionQuery)
30
31from saml.common.xml import SAMLConstants
32from saml.xml.etree import (prettyPrint, AssertionElementTree, 
33                            AttributeQueryElementTree, ResponseElementTree,
34                            AuthzDecisionQueryElementTree)
35
36
37class SAMLUtil(object):
38    """SAML utility class based on ANL examples for Earth System Grid:
39    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
40    """
41    NAMEID_FORMAT = "urn:esg:openid"
42    NAMEID_VALUE = "https://openid.localhost/philip.kershaw"
43    ISSUER_DN = "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
44    UNCORRECTED_RESOURCE_URI = "http://LOCALHOST:80/My Secured URI"
45    RESOURCE_URI = "http://localhost/My%20Secured%20URI"
46    XSSTRING_NS = "http://www.w3.org/2001/XMLSchema#string"
47   
48    def __init__(self):
49        """Set-up ESG core attributes, Group/Role and miscellaneous
50        attributes lists
51        """
52        self.firstName = None
53        self.lastName = None
54        self.emailAddress = None
55       
56        self.__miscAttrList = []
57   
58    def addAttribute(self, name, value):
59        """Add a generic attribute
60        @type name: basestring
61        @param name: attribute name
62        @type value: basestring
63        @param value: attribute value
64        """
65        self.__miscAttrList.append((name, value))
66
67    def buildAssertion(self):
68        """Create a SAML Assertion containing ESG core attributes: First
69        Name, Last Name, e-mail Address; ESG Group/Role type attributes
70        and generic attributes
71        @rtype: ndg.security.common.saml.Assertion
72        @return: new SAML Assertion object
73        """
74       
75        assertion = Assertion()
76        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
77        assertion.id = str(uuid4())
78        assertion.issueInstant = datetime.utcnow()
79        attributeStatement = AttributeStatement()
80       
81        for attribute in self.createAttributes():
82            attributeStatement.attributes.append(attribute)
83           
84        assertion.attributeStatements.append(attributeStatement)
85       
86        return assertion
87
88    def buildAttributeQuery(self, issuer, subjectNameID):
89        """Make a SAML Attribute Query
90        @type issuer: basestring
91        @param issuer: attribute issuer name
92        @type subjectNameID: basestring
93        @param subjectNameID: identity to query attributes for
94        """
95        attributeQuery = AttributeQuery()
96        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
97        attributeQuery.id = str(uuid4())
98        attributeQuery.issueInstant = datetime.utcnow()
99       
100        attributeQuery.issuer = Issuer()
101        attributeQuery.issuer.format = Issuer.X509_SUBJECT
102        attributeQuery.issuer.value = issuer
103                       
104        attributeQuery.subject = Subject() 
105        attributeQuery.subject.nameID = NameID()
106        attributeQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
107        attributeQuery.subject.nameID.value = subjectNameID
108                                   
109        attributeQuery.attributes = self.createAttributes()
110       
111        return attributeQuery
112   
113    def createAttributes(self):
114        """Create SAML Attributes for use in an Assertion or AttributeQuery"""
115       
116        attributes = []
117        if self.firstName is not None:   
118            # special case handling for 'FirstName' attribute
119            fnAttribute = Attribute()
120            fnAttribute.name = "urn:esg:first:name"
121            fnAttribute.nameFormat = SAMLUtil.XSSTRING_NS
122            fnAttribute.friendlyName = "FirstName"
123
124            firstName = XSStringAttributeValue()
125            firstName.value = self.firstName
126            fnAttribute.attributeValues.append(firstName)
127
128            attributes.append(fnAttribute)
129       
130
131        if self.lastName is not None:
132            # special case handling for 'LastName' attribute
133            lnAttribute = Attribute()
134            lnAttribute.name = "urn:esg:last:name"
135            lnAttribute.nameFormat = SAMLUtil.XSSTRING_NS
136            lnAttribute.friendlyName = "LastName"
137
138            lastName = XSStringAttributeValue()
139            lastName.value = self.lastName
140            lnAttribute.attributeValues.append(lastName)
141
142            attributes.append(lnAttribute)
143       
144
145        if self.emailAddress is not None:
146            # special case handling for 'LastName' attribute
147            emailAddressAttribute = Attribute()
148            emailAddressAttribute.name = "urn:esg:email:address"
149            emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
150                                        XSStringAttributeValue.TYPE_LOCAL_NAME
151            emailAddressAttribute.friendlyName = "emailAddress"
152
153            emailAddress = XSStringAttributeValue()
154            emailAddress.value = self.emailAddress
155            emailAddressAttribute.attributeValues.append(emailAddress)
156
157            attributes.append(emailAddressAttribute)
158       
159        for name, value in self.__miscAttrList:
160            attribute = Attribute()
161            attribute.name = name
162            attribute.nameFormat = SAMLUtil.XSSTRING_NS
163
164            stringAttributeValue = XSStringAttributeValue()
165            stringAttributeValue.value = value
166            attribute.attributeValues.append(stringAttributeValue)
167
168            attributes.append(attribute)
169           
170        return attributes
171   
172    def buildAuthzDecisionQuery(self, 
173                                issuer=ISSUER_DN,
174                                issuerFormat=Issuer.X509_SUBJECT,
175                                subjectNameID=NAMEID_VALUE, 
176                                subjectNameIdFormat=NAMEID_FORMAT,
177                                resource=UNCORRECTED_RESOURCE_URI,
178                                actions=((Action.HTTP_GET_ACTION, 
179                                          Action.GHPP_NS_URI),)):
180        """Convenience utility to make an Authorisation decision query"""
181        authzDecisionQuery = AuthzDecisionQuery()
182
183        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
184        authzDecisionQuery.id = str(uuid4())
185        authzDecisionQuery.issueInstant = datetime.utcnow()
186       
187        authzDecisionQuery.issuer = Issuer()
188        authzDecisionQuery.issuer.format = issuerFormat
189        authzDecisionQuery.issuer.value = issuer
190       
191        authzDecisionQuery.subject = Subject()
192        authzDecisionQuery.subject.nameID = NameID()
193        authzDecisionQuery.subject.nameID.format = subjectNameIdFormat
194        authzDecisionQuery.subject.nameID.value = subjectNameID
195       
196        authzDecisionQuery.resource = resource
197       
198        for action, namespace in actions:
199            authzDecisionQuery.actions.append(Action())
200            authzDecisionQuery.actions[-1].namespace = namespace
201            authzDecisionQuery.actions[-1].value = action
202           
203        return authzDecisionQuery
204           
205
206class SAMLTestCase(unittest.TestCase):
207    """Test SAML implementation for use with CMIP5 federation"""
208    NAMEID_FORMAT = SAMLUtil.NAMEID_FORMAT
209    NAMEID_VALUE = SAMLUtil.NAMEID_VALUE
210    ISSUER_DN = SAMLUtil.ISSUER_DN
211    UNCORRECTED_RESOURCE_URI = SAMLUtil.UNCORRECTED_RESOURCE_URI
212    RESOURCE_URI = SAMLUtil.RESOURCE_URI
213   
214    def _createAttributeAssertionHelper(self):
215        samlUtil = SAMLUtil()
216       
217        # ESG core attributes
218        samlUtil.firstName = "Philip"
219        samlUtil.lastName = "Kershaw"
220        samlUtil.emailAddress = "p.j.k@somewhere"
221       
222        # BADC specific attributes
223        badcRoleList = (
224            'urn:badc:security:authz:1.0:attr:admin', 
225            'urn:badc:security:authz:1.0:attr:rapid', 
226            'urn:badc:security:authz:1.0:attr:coapec', 
227            'urn:badc:security:authz:1.0:attr:midas', 
228            'urn:badc:security:authz:1.0:attr:quest', 
229            'urn:badc:security:authz:1.0:attr:staff'
230        )
231        for role in badcRoleList:
232            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
233       
234        # Make an assertion object
235        assertion = samlUtil.buildAssertion()
236       
237        return assertion
238       
239    def test01CreateAssertion(self):
240         
241        assertion = self._createAttributeAssertionHelper()
242
243       
244        # Create ElementTree Assertion Element
245        assertionElem = AssertionElementTree.toXML(assertion)
246       
247        self.assert_(iselement(assertionElem))
248       
249        # Serialise to output
250        xmlOutput = prettyPrint(assertionElem)       
251        self.assert_(len(xmlOutput))
252       
253        print("\n"+"_"*80)
254        print(xmlOutput)
255        print("_"*80)
256
257    def test02ParseAssertion(self):
258        assertion = self._createAttributeAssertionHelper()
259       
260        # Create ElementTree Assertion Element
261        assertionElem = AssertionElementTree.toXML(assertion)
262       
263        self.assert_(iselement(assertionElem))
264       
265        # Serialise to output
266        xmlOutput = prettyPrint(assertionElem)       
267           
268        print("\n"+"_"*80)
269        print(xmlOutput)
270        print("_"*80)
271               
272        assertionStream = StringIO()
273        assertionStream.write(xmlOutput)
274        assertionStream.seek(0)
275
276        tree = ElementTree.parse(assertionStream)
277        elem2 = tree.getroot()
278       
279        assertion2 = AssertionElementTree.fromXML(elem2)
280        self.assert_(assertion2)
281       
282    def test03CreateAttributeQuery(self):
283        samlUtil = SAMLUtil()
284        samlUtil.firstName = ''
285        samlUtil.lastName = ''
286        samlUtil.emailAddress = ''
287        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
288                                                      SAMLTestCase.NAMEID_VALUE)
289       
290        elem = AttributeQueryElementTree.toXML(attributeQuery)
291        xmlOutput = prettyPrint(elem)
292           
293        print("\n"+"_"*80)
294        print(xmlOutput)
295        print("_"*80)
296
297    def test04ParseAttributeQuery(self):
298        samlUtil = SAMLUtil()
299        samlUtil.firstName = ''
300        samlUtil.lastName = ''
301        samlUtil.emailAddress = ''
302        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
303                                                      SAMLTestCase.NAMEID_VALUE)
304       
305        elem = AttributeQueryElementTree.toXML(attributeQuery)       
306        xmlOutput = prettyPrint(elem)       
307        print("\n"+"_"*80)
308        print(xmlOutput)
309               
310        attributeQueryStream = StringIO()
311        attributeQueryStream.write(xmlOutput)
312        attributeQueryStream.seek(0)
313
314        tree = ElementTree.parse(attributeQueryStream)
315        elem2 = tree.getroot()
316       
317        attributeQuery2 = AttributeQueryElementTree.fromXML(elem2)
318        self.assert_(attributeQuery2.id == attributeQuery.id)
319        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
320        self.assert_(attributeQuery2.subject.nameID.value == \
321                     attributeQuery.subject.nameID.value)
322       
323        self.assert_(attributeQuery2.attributes[1].name == \
324                     attributeQuery.attributes[1].name)
325       
326        xmlOutput2 = prettyPrint(elem2)       
327        print("_"*80)
328        print(xmlOutput2)
329        print("_"*80)
330
331    def _createAttributeQueryResponse(self):
332        response = Response()
333        response.issueInstant = datetime.utcnow()
334       
335        # Make up a request ID that this response is responding to
336        response.inResponseTo = str(uuid4())
337        response.id = str(uuid4())
338        response.version = SAMLVersion(SAMLVersion.VERSION_20)
339           
340        response.issuer = Issuer()
341        response.issuer.format = Issuer.X509_SUBJECT
342        response.issuer.value = \
343                        SAMLTestCase.ISSUER_DN
344       
345        response.status = Status()
346        response.status.statusCode = StatusCode()
347        response.status.statusCode.value = StatusCode.SUCCESS_URI
348        response.status.statusMessage = StatusMessage()       
349        response.status.statusMessage.value = "Response created successfully"
350           
351        assertion = self._createAttributeAssertionHelper()
352       
353        # Add a conditions statement for a validity of 8 hours
354        assertion.conditions = Conditions()
355        assertion.conditions.notBefore = datetime.utcnow()
356        assertion.conditions.notOnOrAfter = (assertion.conditions.notBefore + 
357                                             timedelta(seconds=60*60*8))
358       
359        assertion.subject = Subject() 
360        assertion.subject.nameID = NameID()
361        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
362        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE   
363           
364        assertion.issuer = Issuer()
365        assertion.issuer.format = Issuer.X509_SUBJECT
366        assertion.issuer.value = SAMLTestCase.ISSUER_DN
367
368        response.assertions.append(assertion)
369       
370        return response
371       
372    def test05CreateAttributeQueryResponse(self):
373        response = self._createAttributeQueryResponse()
374       
375        # Create ElementTree Assertion Element
376        responseElem = ResponseElementTree.toXML(response)
377       
378        self.assert_(iselement(responseElem))
379       
380        # Serialise to output       
381        xmlOutput = prettyPrint(responseElem)       
382        self.assert_(len(xmlOutput))
383        print("\n"+"_"*80)
384        print(xmlOutput)
385        print("_"*80)
386       
387    def _createAuthzDecisionQuery(self):
388        authzDecisionQuery = AuthzDecisionQuery()
389
390        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
391        authzDecisionQuery.id = str(uuid4())
392        authzDecisionQuery.issueInstant = datetime.utcnow()
393       
394        authzDecisionQuery.issuer = Issuer()
395        authzDecisionQuery.issuer.format = Issuer.X509_SUBJECT
396        authzDecisionQuery.issuer.value = SAMLTestCase.ISSUER_DN
397       
398        authzDecisionQuery.subject = Subject()
399        authzDecisionQuery.subject.nameID = NameID()
400        authzDecisionQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
401        authzDecisionQuery.subject.nameID.value = SAMLTestCase.NAMEID_VALUE
402       
403        authzDecisionQuery.resource = "http://LOCALHOST:80/My Secured URI"
404       
405        return authzDecisionQuery
406   
407    def test06CreateAuthzDecisionQuery(self):
408        samlUtil = SAMLUtil()
409        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
410       
411        self.assert_(":80" not in authzDecisionQuery.resource)
412        self.assert_("localhost" in authzDecisionQuery.resource)
413        self.assert_(" " not in authzDecisionQuery.resource)
414       
415        authzDecisionQuery.resource = \
416            "https://Somewhere.ac.uk:443/My Secured URI?blah=4&yes=True"
417           
418        self.assert_(":443" not in authzDecisionQuery.resource)
419        self.assert_("somewhere.ac.uk" in authzDecisionQuery.resource)
420        self.assert_("yes=True" in authzDecisionQuery.resource)
421       
422        authzDecisionQuery.actions.append(Action())
423        authzDecisionQuery.actions[0].namespace = Action.GHPP_NS_URI
424        authzDecisionQuery.actions[0].value = Action.HTTP_GET_ACTION
425       
426        self.assert_(
427            authzDecisionQuery.actions[0].value == Action.HTTP_GET_ACTION)
428        self.assert_(
429            authzDecisionQuery.actions[0].namespace == Action.GHPP_NS_URI)
430       
431        # Try out the restricted vocabulary
432        try:
433            authzDecisionQuery.actions[0].value = "delete everything"
434            self.fail("Expecting AttributeError raised for incorrect action "
435                      "setting.")
436        except AttributeError, e:
437            print("Caught incorrect action type setting: %s" % e)
438       
439        authzDecisionQuery.actions[0].actionTypes = {'urn:malicious': 
440                                                     ("delete everything",)}
441       
442        # Try again now that the actipn types have been adjusted
443        authzDecisionQuery.actions[0].namespace = 'urn:malicious'
444        authzDecisionQuery.actions[0].value = "delete everything"
445       
446    def test07SerializeAuthzDecisionQuery(self):
447        samlUtil = SAMLUtil()
448        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
449       
450        # Create ElementTree Assertion Element
451        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML(
452                                                            authzDecisionQuery)
453       
454        self.assert_(iselement(authzDecisionQueryElem))
455       
456        # Serialise to output
457        xmlOutput = prettyPrint(authzDecisionQueryElem)       
458        self.assert_(len(xmlOutput))
459       
460        print("\n"+"_"*80)
461        print(xmlOutput)
462        print("_"*80)
463   
464    def test08DeserializeAuthzDecisionQuery(self):
465        samlUtil = SAMLUtil()
466        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
467       
468        # Create ElementTree Assertion Element
469        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML(
470                                                            authzDecisionQuery)
471       
472        self.assert_(iselement(authzDecisionQueryElem))
473       
474        # Serialise to output
475        xmlOutput = prettyPrint(authzDecisionQueryElem)       
476        self.assert_(len(xmlOutput))
477       
478        authzDecisionQueryStream = StringIO()
479        authzDecisionQueryStream.write(xmlOutput)
480        authzDecisionQueryStream.seek(0)
481
482        tree = ElementTree.parse(authzDecisionQueryStream)
483        elem2 = tree.getroot()
484       
485        authzDecisionQuery2 = AuthzDecisionQueryElementTree.fromXML(elem2)
486        self.assert_(authzDecisionQuery2)
487        self.assert_(
488        authzDecisionQuery2.subject.nameID.value == SAMLTestCase.NAMEID_VALUE)
489        self.assert_(
490        authzDecisionQuery2.subject.nameID.format == SAMLTestCase.NAMEID_FORMAT)
491        self.assert_(
492            authzDecisionQuery2.issuer.value == SAMLTestCase.ISSUER_DN)
493        self.assert_(
494            authzDecisionQuery2.resource == SAMLTestCase.RESOURCE_URI)
495        self.assert_(len(authzDecisionQuery2.actions) == 1)
496        self.assert_(
497            authzDecisionQuery2.actions[0].value == Action.HTTP_GET_ACTION)
498        self.assert_(
499            authzDecisionQuery2.actions[0].namespace == Action.GHPP_NS_URI)
500        self.assert_(authzDecisionQuery2.evidence is None)
501
502    def _createAuthzDecisionQueryResponse(self):
503        """Helper method for Authz Decision Response"""
504        response = Response()
505        now = datetime.utcnow()
506        response.issueInstant = now
507       
508        # Make up a request ID that this response is responding to
509        response.inResponseTo = str(uuid4())
510        response.id = str(uuid4())
511        response.version = SAMLVersion(SAMLVersion.VERSION_20)
512           
513        response.issuer = Issuer()
514        response.issuer.format = Issuer.X509_SUBJECT
515        response.issuer.value = SAMLTestCase.ISSUER_DN
516       
517        response.status = Status()
518        response.status.statusCode = StatusCode()
519        response.status.statusCode.value = StatusCode.SUCCESS_URI
520        response.status.statusMessage = StatusMessage()       
521        response.status.statusMessage.value = "Response created successfully"
522           
523        assertion = Assertion()
524        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
525        assertion.id = str(uuid4())
526        assertion.issueInstant = now
527       
528        authzDecisionStatement = AuthzDecisionStatement()
529        authzDecisionStatement.decision = DecisionType.PERMIT
530        authzDecisionStatement.resource = SAMLTestCase.RESOURCE_URI
531        authzDecisionStatement.actions.append(Action())
532        authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
533        authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
534        assertion.authzDecisionStatements.append(authzDecisionStatement)
535       
536        # Add a conditions statement for a validity of 8 hours
537        assertion.conditions = Conditions()
538        assertion.conditions.notBefore = now
539        assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8)
540               
541        assertion.subject = Subject() 
542        assertion.subject.nameID = NameID()
543        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
544        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE   
545           
546        assertion.issuer = Issuer()
547        assertion.issuer.format = Issuer.X509_SUBJECT
548        assertion.issuer.value = SAMLTestCase.ISSUER_DN
549
550        response.assertions.append(assertion)
551       
552        return response
553       
554    def test09CreateAuthzDecisionQueryResponse(self):
555        response = self._createAuthzDecisionQueryResponse()
556        self.assert_(response.assertions[0])
557        self.assert_(response.assertions[0].authzDecisionStatements[0])
558        self.assert_(response.assertions[0].authzDecisionStatements[0
559            ].decision == DecisionType.PERMIT)
560        self.assert_(response.assertions[0].authzDecisionStatements[0
561            ].resource == SAMLTestCase.RESOURCE_URI)
562        self.assert_(response.assertions[0].authzDecisionStatements[0
563            ].decision == DecisionType.PERMIT)
564        self.assert_(response.assertions[0].authzDecisionStatements[0
565            ].actions[-1].namespace == Action.GHPP_NS_URI)
566        self.assert_(response.assertions[0].authzDecisionStatements[0
567            ].actions[-1].value == Action.HTTP_GET_ACTION)
568     
569    def _serializeAuthzDecisionQueryResponse(self):
570        response = self._createAuthzDecisionQueryResponse()
571       
572        # Create ElementTree Assertion Element
573        responseElem = ResponseElementTree.toXML(response)
574        self.assert_(iselement(responseElem))
575       
576        # Serialise to output       
577        xmlOutput = prettyPrint(responseElem)
578        return xmlOutput
579   
580    def test10SerializeAuthzDecisionQueryResponse(self):
581        xmlOutput = self._serializeAuthzDecisionQueryResponse()
582        self.assert_(len(xmlOutput))
583        print("\n"+"_"*80)
584        print(xmlOutput)
585        print("_"*80)
586       
587        self.assert_('AuthzDecisionStatement' in xmlOutput)
588        self.assert_('GET' in xmlOutput)
589        self.assert_('Permit' in xmlOutput)
590
591    def test11DeserializeAuthzDecisionResponse(self):
592        xmlOutput = self._serializeAuthzDecisionQueryResponse()
593       
594        authzDecisionResponseStream = StringIO()
595        authzDecisionResponseStream.write(xmlOutput)
596        authzDecisionResponseStream.seek(0)
597
598        tree = ElementTree.parse(authzDecisionResponseStream)
599        elem = tree.getroot()
600        response = ResponseElementTree.fromXML(elem)
601       
602        self.assert_(response.assertions[0])
603        self.assert_(response.assertions[0].authzDecisionStatements[0])
604        self.assert_(response.assertions[0].authzDecisionStatements[0
605            ].decision == DecisionType.PERMIT)
606        self.assert_(response.assertions[0].authzDecisionStatements[0
607            ].resource == SAMLTestCase.RESOURCE_URI)
608        self.assert_(response.assertions[0].authzDecisionStatements[0
609            ].decision == DecisionType.PERMIT)
610        self.assert_(response.assertions[0].authzDecisionStatements[0
611            ].actions[-1].namespace == Action.GHPP_NS_URI)
612        self.assert_(response.assertions[0].authzDecisionStatements[0
613            ].actions[-1].value == Action.HTTP_GET_ACTION)
614       
615 
616    def test12PickleAssertion(self):
617        # Test pickling with __slots__
618        assertion = self._createAttributeAssertionHelper()
619        assertion.issuer = Issuer()
620        assertion.issuer.format = Issuer.X509_SUBJECT
621        assertion.issuer.value = SAMLTestCase.ISSUER_DN
622       
623        jar = pickle.dumps(assertion)
624        assertion2 = pickle.loads(jar)
625        self.assert_(isinstance(assertion2, Assertion))
626        self.assert_(assertion2.issuer.value == assertion.issuer.value)
627        self.assert_(assertion2.issuer.format == assertion.issuer.format)
628        self.assert_(len(assertion2.attributeStatements)==1)
629        self.assert_(len(assertion2.attributeStatements[0].attributes) > 0)
630        self.assert_(assertion2.attributeStatements[0].attributes[0
631                     ].attributeValues[0
632                     ].value == assertion.attributeStatements[0].attributes[0
633                                ].attributeValues[0].value)
634       
635    def test13PickleAttributeQuery(self):
636        # Test pickling with __slots__
637        samlUtil = SAMLUtil()
638        samlUtil.firstName = ''
639        samlUtil.lastName = ''
640        samlUtil.emailAddress = ''
641        query = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
642                                             SAMLTestCase.NAMEID_VALUE)
643       
644        jar = pickle.dumps(query)
645        query2 = pickle.loads(jar)
646
647        self.assert_(isinstance(query2, AttributeQuery))
648        self.assert_(query2.subject.nameID.value == query.subject.nameID.value)
649        self.assert_((query2.subject.nameID.format == 
650                      query.subject.nameID.format))
651        self.assert_(query2.issuer.value == query.issuer.value)
652        self.assert_(query2.issuer.format == query.issuer.format)
653        self.assert_(query2.issueInstant == query.issueInstant)
654        self.assert_(query2.id == query.id)
655        self.assert_(len(query2.attributes) == 3)
656        self.assert_(query2.attributes[0].name == "urn:esg:first:name")
657        self.assert_(query2.attributes[1].nameFormat == SAMLUtil.XSSTRING_NS)
658
659    def test14PickleAttributeQueryResponse(self):
660        response = self._createAttributeQueryResponse()
661       
662        jar = pickle.dumps(response)
663        response2 = pickle.loads(jar)
664       
665        self.assert_(isinstance(response2, Response))
666        self.assert_((response2.status.statusCode.value == 
667                      response.status.statusCode.value))
668        self.assert_((response2.status.statusMessage.value == 
669                      response.status.statusMessage.value))
670        self.assert_(len(response2.assertions) == 1)
671        self.assert_(response2.assertions[0].id == response.assertions[0].id)
672        self.assert_((response2.assertions[0].conditions.notBefore == 
673                      response.assertions[0].conditions.notBefore))
674        self.assert_((response2.assertions[0].conditions.notOnOrAfter == 
675                      response.assertions[0].conditions.notOnOrAfter))
676        self.assert_(len(response2.assertions[0].attributeStatements) == 1)
677        self.assert_(len(response2.assertions[0].attributeStatements[0
678                                                            ].attributes) == 9)
679        self.assert_(response2.assertions[0].attributeStatements[0].attributes[1
680                     ].attributeValues[0
681                     ].value == response.assertions[0].attributeStatements[0
682                                    ].attributes[1].attributeValues[0].value)
683             
684    def test15PickleAuthzDecisionQuery(self):
685        samlUtil = SAMLUtil()
686        query = samlUtil.buildAuthzDecisionQuery()
687             
688        jar = pickle.dumps(query)
689        query2 = pickle.loads(jar)
690       
691        self.assert_(isinstance(query2, AuthzDecisionQuery))
692        self.assert_(query.resource == query2.resource)
693        self.assert_(query.version == query2.version)
694        self.assert_(len(query2.actions) == 1)
695        self.assert_(query2.actions[0].value == Action.HTTP_GET_ACTION)
696        self.assert_(query2.actions[0].namespace == Action.GHPP_NS_URI)
697
698
699       
700    def test16PickleAuthzDecisionResponse(self):
701        response = self._createAuthzDecisionQueryResponse()
702       
703        jar = pickle.dumps(response)
704        response2 = pickle.loads(jar)
705       
706        self.assert_(isinstance(response2, Response))
707       
708        self.assert_(len(response.assertions) == 1)
709        self.assert_(len(response.assertions[0].authzDecisionStatements) == 1)
710         
711        self.assert_(response.assertions[0].authzDecisionStatements[0
712                        ].resource == response2.assertions[0
713                                        ].authzDecisionStatements[0].resource)
714       
715        self.assert_(len(response.assertions[0].authzDecisionStatements[0
716                        ].actions) == 1)
717        self.assert_(response.assertions[0].authzDecisionStatements[0
718                        ].actions[0].value == response2.assertions[0
719                                        ].authzDecisionStatements[0
720                                                ].actions[0].value)
721       
722        self.assert_(response2.assertions[0].authzDecisionStatements[0
723                        ].actions[0].namespace == Action.GHPP_NS_URI)       
724
725        self.assert_(response2.assertions[0].authzDecisionStatements[0
726                        ].decision == DecisionType.PERMIT)       
727       
728if __name__ == "__main__":
729    unittest.main()       
Note: See TracBrowser for help on using the repository browser.