source: TI12-security/trunk/ndg_saml/ndg/saml/test/test_saml.py @ 6900

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/ndg_saml/ndg/saml/test/test_saml.py@6900
Revision 6900, 30.0 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 6: Put NDG SAML package on PyPI

  • add documentation folder and Makefile for epydoc
  • updating epydoc for all modules and packages.
Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4
5This implementation is adapted from the Java OpenSAML implementation.  The
6copyright and licence information are included here:
7
8Copyright [2005] [University Corporation for Advanced Internet Development, Inc.]
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21"""
22__author__ = "P J Kershaw"
23__date__ = "21/07/09"
24__copyright__ = "(C) 2009 Science and Technology Facilities Council"
25__license__ = "http://www.apache.org/licenses/LICENSE-2.0"
26__contact__ = "Philip.Kershaw@stfc.ac.uk"
27__revision__ = '$Id$'
28import logging
29logging.basicConfig(level=logging.DEBUG)
30   
31from datetime import datetime, timedelta
32from uuid import uuid4
33from cStringIO import StringIO
34
35import unittest
36import pickle
37
38from xml.etree.ElementTree import iselement
39from xml.etree import ElementTree
40
41from ndg.saml.saml2.core import (SAMLVersion, Attribute, AttributeStatement, 
42                             AuthzDecisionStatement, Assertion, AttributeQuery, 
43                             Response, Issuer, Subject, NameID, StatusCode, 
44                             StatusMessage, Status, Conditions, DecisionType,
45                             XSStringAttributeValue, Action, 
46                             AuthzDecisionQuery)
47
48from ndg.saml.common.xml import SAMLConstants
49from ndg.saml.xml.etree import (prettyPrint, AssertionElementTree, 
50                            AttributeQueryElementTree, ResponseElementTree,
51                            AuthzDecisionQueryElementTree)
52
53
54class SAMLUtil(object):
55    """SAML utility class based on ANL examples for Earth System Grid:
56    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
57    """
58    NAMEID_FORMAT = "urn:esg:openid"
59    NAMEID_VALUE = "https://openid.localhost/philip.kershaw"
60    ISSUER_DN = "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
61    UNCORRECTED_RESOURCE_URI = "http://LOCALHOST:80/My Secured URI"
62    RESOURCE_URI = "http://localhost/My%20Secured%20URI"
63    XSSTRING_NS = "http://www.w3.org/2001/XMLSchema#string"
64   
65    def __init__(self):
66        """Set-up ESG core attributes, Group/Role and miscellaneous
67        attributes lists
68        """
69        self.firstName = None
70        self.lastName = None
71        self.emailAddress = None
72       
73        self.__miscAttrList = []
74   
75    def addAttribute(self, name, value):
76        """Add a generic attribute
77        @type name: basestring
78        @param name: attribute name
79        @type value: basestring
80        @param value: attribute value
81        """
82        self.__miscAttrList.append((name, value))
83
84    def buildAssertion(self):
85        """Create a SAML Assertion containing ESG core attributes: First
86        Name, Last Name, e-mail Address; ESG Group/Role type attributes
87        and generic attributes
88        @rtype: ndg.security.common.saml.Assertion
89        @return: new SAML Assertion object
90        """
91       
92        assertion = Assertion()
93        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
94        assertion.id = str(uuid4())
95        assertion.issueInstant = datetime.utcnow()
96        attributeStatement = AttributeStatement()
97       
98        for attribute in self.createAttributes():
99            attributeStatement.attributes.append(attribute)
100           
101        assertion.attributeStatements.append(attributeStatement)
102       
103        return assertion
104
105    def buildAttributeQuery(self, issuer, subjectNameID):
106        """Make a SAML Attribute Query
107        @type issuer: basestring
108        @param issuer: attribute issuer name
109        @type subjectNameID: basestring
110        @param subjectNameID: identity to query attributes for
111        """
112        attributeQuery = AttributeQuery()
113        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
114        attributeQuery.id = str(uuid4())
115        attributeQuery.issueInstant = datetime.utcnow()
116       
117        attributeQuery.issuer = Issuer()
118        attributeQuery.issuer.format = Issuer.X509_SUBJECT
119        attributeQuery.issuer.value = issuer
120                       
121        attributeQuery.subject = Subject() 
122        attributeQuery.subject.nameID = NameID()
123        attributeQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
124        attributeQuery.subject.nameID.value = subjectNameID
125                                   
126        attributeQuery.attributes = self.createAttributes()
127       
128        return attributeQuery
129   
130    def createAttributes(self):
131        """Create SAML Attributes for use in an Assertion or AttributeQuery"""
132       
133        attributes = []
134        if self.firstName is not None:   
135            # special case handling for 'FirstName' attribute
136            fnAttribute = Attribute()
137            fnAttribute.name = "urn:esg:first:name"
138            fnAttribute.nameFormat = SAMLUtil.XSSTRING_NS
139            fnAttribute.friendlyName = "FirstName"
140
141            firstName = XSStringAttributeValue()
142            firstName.value = self.firstName
143            fnAttribute.attributeValues.append(firstName)
144
145            attributes.append(fnAttribute)
146       
147
148        if self.lastName is not None:
149            # special case handling for 'LastName' attribute
150            lnAttribute = Attribute()
151            lnAttribute.name = "urn:esg:last:name"
152            lnAttribute.nameFormat = SAMLUtil.XSSTRING_NS
153            lnAttribute.friendlyName = "LastName"
154
155            lastName = XSStringAttributeValue()
156            lastName.value = self.lastName
157            lnAttribute.attributeValues.append(lastName)
158
159            attributes.append(lnAttribute)
160       
161
162        if self.emailAddress is not None:
163            # special case handling for 'LastName' attribute
164            emailAddressAttribute = Attribute()
165            emailAddressAttribute.name = "urn:esg:email:address"
166            emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
167                                        XSStringAttributeValue.TYPE_LOCAL_NAME
168            emailAddressAttribute.friendlyName = "emailAddress"
169
170            emailAddress = XSStringAttributeValue()
171            emailAddress.value = self.emailAddress
172            emailAddressAttribute.attributeValues.append(emailAddress)
173
174            attributes.append(emailAddressAttribute)
175       
176        for name, value in self.__miscAttrList:
177            attribute = Attribute()
178            attribute.name = name
179            attribute.nameFormat = SAMLUtil.XSSTRING_NS
180
181            stringAttributeValue = XSStringAttributeValue()
182            stringAttributeValue.value = value
183            attribute.attributeValues.append(stringAttributeValue)
184
185            attributes.append(attribute)
186           
187        return attributes
188   
189    def buildAuthzDecisionQuery(self, 
190                                issuer=ISSUER_DN,
191                                issuerFormat=Issuer.X509_SUBJECT,
192                                subjectNameID=NAMEID_VALUE, 
193                                subjectNameIdFormat=NAMEID_FORMAT,
194                                resource=UNCORRECTED_RESOURCE_URI,
195                                actions=((Action.HTTP_GET_ACTION, 
196                                          Action.GHPP_NS_URI),)):
197        """Convenience utility to make an Authorisation decision query"""
198        authzDecisionQuery = AuthzDecisionQuery()
199
200        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
201        authzDecisionQuery.id = str(uuid4())
202        authzDecisionQuery.issueInstant = datetime.utcnow()
203       
204        authzDecisionQuery.issuer = Issuer()
205        authzDecisionQuery.issuer.format = issuerFormat
206        authzDecisionQuery.issuer.value = issuer
207       
208        authzDecisionQuery.subject = Subject()
209        authzDecisionQuery.subject.nameID = NameID()
210        authzDecisionQuery.subject.nameID.format = subjectNameIdFormat
211        authzDecisionQuery.subject.nameID.value = subjectNameID
212       
213        authzDecisionQuery.resource = resource
214       
215        for action, namespace in actions:
216            authzDecisionQuery.actions.append(Action())
217            authzDecisionQuery.actions[-1].namespace = namespace
218            authzDecisionQuery.actions[-1].value = action
219           
220        return authzDecisionQuery
221           
222
223class SAMLTestCase(unittest.TestCase):
224    """Test SAML implementation for use with CMIP5 federation"""
225    NAMEID_FORMAT = SAMLUtil.NAMEID_FORMAT
226    NAMEID_VALUE = SAMLUtil.NAMEID_VALUE
227    ISSUER_DN = SAMLUtil.ISSUER_DN
228    UNCORRECTED_RESOURCE_URI = SAMLUtil.UNCORRECTED_RESOURCE_URI
229    RESOURCE_URI = SAMLUtil.RESOURCE_URI
230   
231    def _createAttributeAssertionHelper(self):
232        samlUtil = SAMLUtil()
233       
234        # ESG core attributes
235        samlUtil.firstName = "Philip"
236        samlUtil.lastName = "Kershaw"
237        samlUtil.emailAddress = "p.j.k@somewhere"
238       
239        # BADC specific attributes
240        badcRoleList = (
241            'urn:badc:security:authz:1.0:attr:admin', 
242            'urn:badc:security:authz:1.0:attr:rapid', 
243            'urn:badc:security:authz:1.0:attr:coapec', 
244            'urn:badc:security:authz:1.0:attr:midas', 
245            'urn:badc:security:authz:1.0:attr:quest', 
246            'urn:badc:security:authz:1.0:attr:staff'
247        )
248        for role in badcRoleList:
249            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
250       
251        # Make an assertion object
252        assertion = samlUtil.buildAssertion()
253       
254        return assertion
255       
256    def test01CreateAssertion(self):
257         
258        assertion = self._createAttributeAssertionHelper()
259
260       
261        # Create ElementTree Assertion Element
262        assertionElem = AssertionElementTree.toXML(assertion)
263       
264        self.assert_(iselement(assertionElem))
265       
266        # Serialise to output
267        xmlOutput = prettyPrint(assertionElem)       
268        self.assert_(len(xmlOutput))
269       
270        print("\n"+"_"*80)
271        print(xmlOutput)
272        print("_"*80)
273
274    def test02ParseAssertion(self):
275        assertion = self._createAttributeAssertionHelper()
276       
277        # Create ElementTree Assertion Element
278        assertionElem = AssertionElementTree.toXML(assertion)
279       
280        self.assert_(iselement(assertionElem))
281       
282        # Serialise to output
283        xmlOutput = prettyPrint(assertionElem)       
284           
285        print("\n"+"_"*80)
286        print(xmlOutput)
287        print("_"*80)
288               
289        assertionStream = StringIO()
290        assertionStream.write(xmlOutput)
291        assertionStream.seek(0)
292
293        tree = ElementTree.parse(assertionStream)
294        elem2 = tree.getroot()
295       
296        assertion2 = AssertionElementTree.fromXML(elem2)
297        self.assert_(assertion2)
298       
299    def test03CreateAttributeQuery(self):
300        samlUtil = SAMLUtil()
301        samlUtil.firstName = ''
302        samlUtil.lastName = ''
303        samlUtil.emailAddress = ''
304        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
305                                                      SAMLTestCase.NAMEID_VALUE)
306       
307        elem = AttributeQueryElementTree.toXML(attributeQuery)
308        xmlOutput = prettyPrint(elem)
309           
310        print("\n"+"_"*80)
311        print(xmlOutput)
312        print("_"*80)
313
314    def test04ParseAttributeQuery(self):
315        samlUtil = SAMLUtil()
316        samlUtil.firstName = ''
317        samlUtil.lastName = ''
318        samlUtil.emailAddress = ''
319        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
320                                                      SAMLTestCase.NAMEID_VALUE)
321       
322        elem = AttributeQueryElementTree.toXML(attributeQuery)       
323        xmlOutput = prettyPrint(elem)       
324        print("\n"+"_"*80)
325        print(xmlOutput)
326               
327        attributeQueryStream = StringIO()
328        attributeQueryStream.write(xmlOutput)
329        attributeQueryStream.seek(0)
330
331        tree = ElementTree.parse(attributeQueryStream)
332        elem2 = tree.getroot()
333       
334        attributeQuery2 = AttributeQueryElementTree.fromXML(elem2)
335        self.assert_(attributeQuery2.id == attributeQuery.id)
336        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
337        self.assert_(attributeQuery2.subject.nameID.value == \
338                     attributeQuery.subject.nameID.value)
339       
340        self.assert_(attributeQuery2.attributes[1].name == \
341                     attributeQuery.attributes[1].name)
342       
343        xmlOutput2 = prettyPrint(elem2)       
344        print("_"*80)
345        print(xmlOutput2)
346        print("_"*80)
347
348    def _createAttributeQueryResponse(self):
349        response = Response()
350        response.issueInstant = datetime.utcnow()
351       
352        # Make up a request ID that this response is responding to
353        response.inResponseTo = str(uuid4())
354        response.id = str(uuid4())
355        response.version = SAMLVersion(SAMLVersion.VERSION_20)
356           
357        response.issuer = Issuer()
358        response.issuer.format = Issuer.X509_SUBJECT
359        response.issuer.value = \
360                        SAMLTestCase.ISSUER_DN
361       
362        response.status = Status()
363        response.status.statusCode = StatusCode()
364        response.status.statusCode.value = StatusCode.SUCCESS_URI
365        response.status.statusMessage = StatusMessage()       
366        response.status.statusMessage.value = "Response created successfully"
367           
368        assertion = self._createAttributeAssertionHelper()
369       
370        # Add a conditions statement for a validity of 8 hours
371        assertion.conditions = Conditions()
372        assertion.conditions.notBefore = datetime.utcnow()
373        assertion.conditions.notOnOrAfter = (assertion.conditions.notBefore + 
374                                             timedelta(seconds=60*60*8))
375       
376        assertion.subject = Subject() 
377        assertion.subject.nameID = NameID()
378        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
379        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE   
380           
381        assertion.issuer = Issuer()
382        assertion.issuer.format = Issuer.X509_SUBJECT
383        assertion.issuer.value = SAMLTestCase.ISSUER_DN
384
385        response.assertions.append(assertion)
386       
387        return response
388       
389    def test05CreateAttributeQueryResponse(self):
390        response = self._createAttributeQueryResponse()
391       
392        # Create ElementTree Assertion Element
393        responseElem = ResponseElementTree.toXML(response)
394       
395        self.assert_(iselement(responseElem))
396       
397        # Serialise to output       
398        xmlOutput = prettyPrint(responseElem)       
399        self.assert_(len(xmlOutput))
400        print("\n"+"_"*80)
401        print(xmlOutput)
402        print("_"*80)
403       
404    def _createAuthzDecisionQuery(self):
405        authzDecisionQuery = AuthzDecisionQuery()
406
407        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
408        authzDecisionQuery.id = str(uuid4())
409        authzDecisionQuery.issueInstant = datetime.utcnow()
410       
411        authzDecisionQuery.issuer = Issuer()
412        authzDecisionQuery.issuer.format = Issuer.X509_SUBJECT
413        authzDecisionQuery.issuer.value = SAMLTestCase.ISSUER_DN
414       
415        authzDecisionQuery.subject = Subject()
416        authzDecisionQuery.subject.nameID = NameID()
417        authzDecisionQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
418        authzDecisionQuery.subject.nameID.value = SAMLTestCase.NAMEID_VALUE
419       
420        authzDecisionQuery.resource = "http://LOCALHOST:80/My Secured URI"
421       
422        return authzDecisionQuery
423   
424    def test06CreateAuthzDecisionQuery(self):
425        samlUtil = SAMLUtil()
426        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
427       
428        self.assert_(":80" not in authzDecisionQuery.resource)
429        self.assert_("localhost" in authzDecisionQuery.resource)
430        self.assert_(" " not in authzDecisionQuery.resource)
431       
432        authzDecisionQuery.resource = \
433            "https://Somewhere.ac.uk:443/My Secured URI?blah=4&yes=True"
434           
435        self.assert_(":443" not in authzDecisionQuery.resource)
436        self.assert_("somewhere.ac.uk" in authzDecisionQuery.resource)
437        self.assert_("yes=True" in authzDecisionQuery.resource)
438       
439        authzDecisionQuery.actions.append(Action())
440        authzDecisionQuery.actions[0].namespace = Action.GHPP_NS_URI
441        authzDecisionQuery.actions[0].value = Action.HTTP_GET_ACTION
442       
443        self.assert_(
444            authzDecisionQuery.actions[0].value == Action.HTTP_GET_ACTION)
445        self.assert_(
446            authzDecisionQuery.actions[0].namespace == Action.GHPP_NS_URI)
447       
448        # Try out the restricted vocabulary
449        try:
450            authzDecisionQuery.actions[0].value = "delete everything"
451            self.fail("Expecting AttributeError raised for incorrect action "
452                      "setting.")
453        except AttributeError, e:
454            print("Caught incorrect action type setting: %s" % e)
455       
456        authzDecisionQuery.actions[0].actionTypes = {'urn:malicious': 
457                                                     ("delete everything",)}
458       
459        # Try again now that the actipn types have been adjusted
460        authzDecisionQuery.actions[0].namespace = 'urn:malicious'
461        authzDecisionQuery.actions[0].value = "delete everything"
462       
463    def test07SerializeAuthzDecisionQuery(self):
464        samlUtil = SAMLUtil()
465        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
466       
467        # Create ElementTree Assertion Element
468        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML(
469                                                            authzDecisionQuery)
470       
471        self.assert_(iselement(authzDecisionQueryElem))
472       
473        # Serialise to output
474        xmlOutput = prettyPrint(authzDecisionQueryElem)       
475        self.assert_(len(xmlOutput))
476       
477        print("\n"+"_"*80)
478        print(xmlOutput)
479        print("_"*80)
480   
481    def test08DeserializeAuthzDecisionQuery(self):
482        samlUtil = SAMLUtil()
483        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
484       
485        # Create ElementTree Assertion Element
486        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML(
487                                                            authzDecisionQuery)
488       
489        self.assert_(iselement(authzDecisionQueryElem))
490       
491        # Serialise to output
492        xmlOutput = prettyPrint(authzDecisionQueryElem)       
493        self.assert_(len(xmlOutput))
494       
495        authzDecisionQueryStream = StringIO()
496        authzDecisionQueryStream.write(xmlOutput)
497        authzDecisionQueryStream.seek(0)
498
499        tree = ElementTree.parse(authzDecisionQueryStream)
500        elem2 = tree.getroot()
501       
502        authzDecisionQuery2 = AuthzDecisionQueryElementTree.fromXML(elem2)
503        self.assert_(authzDecisionQuery2)
504        self.assert_(
505        authzDecisionQuery2.subject.nameID.value == SAMLTestCase.NAMEID_VALUE)
506        self.assert_(
507        authzDecisionQuery2.subject.nameID.format == SAMLTestCase.NAMEID_FORMAT)
508        self.assert_(
509            authzDecisionQuery2.issuer.value == SAMLTestCase.ISSUER_DN)
510        self.assert_(
511            authzDecisionQuery2.resource == SAMLTestCase.RESOURCE_URI)
512        self.assert_(len(authzDecisionQuery2.actions) == 1)
513        self.assert_(
514            authzDecisionQuery2.actions[0].value == Action.HTTP_GET_ACTION)
515        self.assert_(
516            authzDecisionQuery2.actions[0].namespace == Action.GHPP_NS_URI)
517        self.assert_(authzDecisionQuery2.evidence is None)
518
519    def _createAuthzDecisionQueryResponse(self):
520        """Helper method for Authz Decision Response"""
521        response = Response()
522        now = datetime.utcnow()
523        response.issueInstant = now
524       
525        # Make up a request ID that this response is responding to
526        response.inResponseTo = str(uuid4())
527        response.id = str(uuid4())
528        response.version = SAMLVersion(SAMLVersion.VERSION_20)
529           
530        response.issuer = Issuer()
531        response.issuer.format = Issuer.X509_SUBJECT
532        response.issuer.value = SAMLTestCase.ISSUER_DN
533       
534        response.status = Status()
535        response.status.statusCode = StatusCode()
536        response.status.statusCode.value = StatusCode.SUCCESS_URI
537        response.status.statusMessage = StatusMessage()       
538        response.status.statusMessage.value = "Response created successfully"
539           
540        assertion = Assertion()
541        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
542        assertion.id = str(uuid4())
543        assertion.issueInstant = now
544       
545        authzDecisionStatement = AuthzDecisionStatement()
546        authzDecisionStatement.decision = DecisionType.PERMIT
547        authzDecisionStatement.resource = SAMLTestCase.RESOURCE_URI
548        authzDecisionStatement.actions.append(Action())
549        authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
550        authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
551        assertion.authzDecisionStatements.append(authzDecisionStatement)
552       
553        # Add a conditions statement for a validity of 8 hours
554        assertion.conditions = Conditions()
555        assertion.conditions.notBefore = now
556        assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8)
557               
558        assertion.subject = Subject() 
559        assertion.subject.nameID = NameID()
560        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
561        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE   
562           
563        assertion.issuer = Issuer()
564        assertion.issuer.format = Issuer.X509_SUBJECT
565        assertion.issuer.value = SAMLTestCase.ISSUER_DN
566
567        response.assertions.append(assertion)
568       
569        return response
570       
571    def test09CreateAuthzDecisionQueryResponse(self):
572        response = self._createAuthzDecisionQueryResponse()
573        self.assert_(response.assertions[0])
574        self.assert_(response.assertions[0].authzDecisionStatements[0])
575        self.assert_(response.assertions[0].authzDecisionStatements[0
576            ].decision == DecisionType.PERMIT)
577        self.assert_(response.assertions[0].authzDecisionStatements[0
578            ].resource == SAMLTestCase.RESOURCE_URI)
579        self.assert_(response.assertions[0].authzDecisionStatements[0
580            ].decision == DecisionType.PERMIT)
581        self.assert_(response.assertions[0].authzDecisionStatements[0
582            ].actions[-1].namespace == Action.GHPP_NS_URI)
583        self.assert_(response.assertions[0].authzDecisionStatements[0
584            ].actions[-1].value == Action.HTTP_GET_ACTION)
585     
586    def _serializeAuthzDecisionQueryResponse(self):
587        response = self._createAuthzDecisionQueryResponse()
588       
589        # Create ElementTree Assertion Element
590        responseElem = ResponseElementTree.toXML(response)
591        self.assert_(iselement(responseElem))
592       
593        # Serialise to output       
594        xmlOutput = prettyPrint(responseElem)
595        return xmlOutput
596   
597    def test10SerializeAuthzDecisionQueryResponse(self):
598        xmlOutput = self._serializeAuthzDecisionQueryResponse()
599        self.assert_(len(xmlOutput))
600        print("\n"+"_"*80)
601        print(xmlOutput)
602        print("_"*80)
603       
604        self.assert_('AuthzDecisionStatement' in xmlOutput)
605        self.assert_('GET' in xmlOutput)
606        self.assert_('Permit' in xmlOutput)
607
608    def test11DeserializeAuthzDecisionResponse(self):
609        xmlOutput = self._serializeAuthzDecisionQueryResponse()
610       
611        authzDecisionResponseStream = StringIO()
612        authzDecisionResponseStream.write(xmlOutput)
613        authzDecisionResponseStream.seek(0)
614
615        tree = ElementTree.parse(authzDecisionResponseStream)
616        elem = tree.getroot()
617        response = ResponseElementTree.fromXML(elem)
618       
619        self.assert_(response.assertions[0])
620        self.assert_(response.assertions[0].authzDecisionStatements[0])
621        self.assert_(response.assertions[0].authzDecisionStatements[0
622            ].decision == DecisionType.PERMIT)
623        self.assert_(response.assertions[0].authzDecisionStatements[0
624            ].resource == SAMLTestCase.RESOURCE_URI)
625        self.assert_(response.assertions[0].authzDecisionStatements[0
626            ].decision == DecisionType.PERMIT)
627        self.assert_(response.assertions[0].authzDecisionStatements[0
628            ].actions[-1].namespace == Action.GHPP_NS_URI)
629        self.assert_(response.assertions[0].authzDecisionStatements[0
630            ].actions[-1].value == Action.HTTP_GET_ACTION)
631       
632 
633    def test12PickleAssertion(self):
634        # Test pickling with __slots__
635        assertion = self._createAttributeAssertionHelper()
636        assertion.issuer = Issuer()
637        assertion.issuer.format = Issuer.X509_SUBJECT
638        assertion.issuer.value = SAMLTestCase.ISSUER_DN
639       
640        jar = pickle.dumps(assertion)
641        assertion2 = pickle.loads(jar)
642        self.assert_(isinstance(assertion2, Assertion))
643        self.assert_(assertion2.issuer.value == assertion.issuer.value)
644        self.assert_(assertion2.issuer.format == assertion.issuer.format)
645        self.assert_(len(assertion2.attributeStatements)==1)
646        self.assert_(len(assertion2.attributeStatements[0].attributes) > 0)
647        self.assert_(assertion2.attributeStatements[0].attributes[0
648                     ].attributeValues[0
649                     ].value == assertion.attributeStatements[0].attributes[0
650                                ].attributeValues[0].value)
651       
652    def test13PickleAttributeQuery(self):
653        # Test pickling with __slots__
654        samlUtil = SAMLUtil()
655        samlUtil.firstName = ''
656        samlUtil.lastName = ''
657        samlUtil.emailAddress = ''
658        query = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
659                                             SAMLTestCase.NAMEID_VALUE)
660       
661        jar = pickle.dumps(query)
662        query2 = pickle.loads(jar)
663
664        self.assert_(isinstance(query2, AttributeQuery))
665        self.assert_(query2.subject.nameID.value == query.subject.nameID.value)
666        self.assert_((query2.subject.nameID.format == 
667                      query.subject.nameID.format))
668        self.assert_(query2.issuer.value == query.issuer.value)
669        self.assert_(query2.issuer.format == query.issuer.format)
670        self.assert_(query2.issueInstant == query.issueInstant)
671        self.assert_(query2.id == query.id)
672        self.assert_(len(query2.attributes) == 3)
673        self.assert_(query2.attributes[0].name == "urn:esg:first:name")
674        self.assert_(query2.attributes[1].nameFormat == SAMLUtil.XSSTRING_NS)
675
676    def test14PickleAttributeQueryResponse(self):
677        response = self._createAttributeQueryResponse()
678       
679        jar = pickle.dumps(response)
680        response2 = pickle.loads(jar)
681       
682        self.assert_(isinstance(response2, Response))
683        self.assert_((response2.status.statusCode.value == 
684                      response.status.statusCode.value))
685        self.assert_((response2.status.statusMessage.value == 
686                      response.status.statusMessage.value))
687        self.assert_(len(response2.assertions) == 1)
688        self.assert_(response2.assertions[0].id == response.assertions[0].id)
689        self.assert_((response2.assertions[0].conditions.notBefore == 
690                      response.assertions[0].conditions.notBefore))
691        self.assert_((response2.assertions[0].conditions.notOnOrAfter == 
692                      response.assertions[0].conditions.notOnOrAfter))
693        self.assert_(len(response2.assertions[0].attributeStatements) == 1)
694        self.assert_(len(response2.assertions[0].attributeStatements[0
695                                                            ].attributes) == 9)
696        self.assert_(response2.assertions[0].attributeStatements[0].attributes[1
697                     ].attributeValues[0
698                     ].value == response.assertions[0].attributeStatements[0
699                                    ].attributes[1].attributeValues[0].value)
700             
701    def test15PickleAuthzDecisionQuery(self):
702        samlUtil = SAMLUtil()
703        query = samlUtil.buildAuthzDecisionQuery()
704             
705        jar = pickle.dumps(query)
706        query2 = pickle.loads(jar)
707       
708        self.assert_(isinstance(query2, AuthzDecisionQuery))
709        self.assert_(query.resource == query2.resource)
710        self.assert_(query.version == query2.version)
711        self.assert_(len(query2.actions) == 1)
712        self.assert_(query2.actions[0].value == Action.HTTP_GET_ACTION)
713        self.assert_(query2.actions[0].namespace == Action.GHPP_NS_URI)
714
715
716       
717    def test16PickleAuthzDecisionResponse(self):
718        response = self._createAuthzDecisionQueryResponse()
719       
720        jar = pickle.dumps(response)
721        response2 = pickle.loads(jar)
722       
723        self.assert_(isinstance(response2, Response))
724       
725        self.assert_(len(response.assertions) == 1)
726        self.assert_(len(response.assertions[0].authzDecisionStatements) == 1)
727         
728        self.assert_(response.assertions[0].authzDecisionStatements[0
729                        ].resource == response2.assertions[0
730                                        ].authzDecisionStatements[0].resource)
731       
732        self.assert_(len(response.assertions[0].authzDecisionStatements[0
733                        ].actions) == 1)
734        self.assert_(response.assertions[0].authzDecisionStatements[0
735                        ].actions[0].value == response2.assertions[0
736                                        ].authzDecisionStatements[0
737                                                ].actions[0].value)
738       
739        self.assert_(response2.assertions[0].authzDecisionStatements[0
740                        ].actions[0].namespace == Action.GHPP_NS_URI)       
741
742        self.assert_(response2.assertions[0].authzDecisionStatements[0
743                        ].decision == DecisionType.PERMIT)       
744       
745if __name__ == "__main__":
746    unittest.main()       
Note: See TracBrowser for help on using the repository browser.