source: TI12-security/trunk/ndg_saml/ndg/saml/test/test_saml.py @ 7634

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/ndg_saml/ndg/saml/test/test_saml.py@7634
Revision 7634, 30.4 KB checked in by pjkersha, 10 years ago (diff)

0.5.1 Release - fixes:

  • Property svn:keywords set to Id
Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4
5This implementation is adapted from the Java OpenSAML implementation.  The
6copyright and licence information are included here:
7
8Copyright [2005] [University Corporation for Advanced Internet Development, Inc.]
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21"""
22__author__ = "P J Kershaw"
23__date__ = "21/07/09"
24__copyright__ = "(C) 2009 Science and Technology Facilities Council"
25__license__ = "http://www.apache.org/licenses/LICENSE-2.0"
26__contact__ = "Philip.Kershaw@stfc.ac.uk"
27__revision__ = '$Id$'
28import logging
29logging.basicConfig(level=logging.DEBUG)
30   
31from datetime import datetime, timedelta
32from uuid import uuid4
33from cStringIO import StringIO
34
35import unittest
36import pickle
37
38from xml.etree.ElementTree import iselement
39from xml.etree import ElementTree
40
41from ndg.saml.utils import SAMLDateTime
42from ndg.saml.saml2.core import (SAMLVersion, Attribute, AttributeStatement, 
43                                 AuthzDecisionStatement, Assertion, 
44                                 AttributeQuery, Response, Issuer, Subject, 
45                                 NameID, StatusCode, StatusMessage, Status, 
46                                 Conditions, DecisionType, 
47                                 XSStringAttributeValue, Action, 
48                                 AuthzDecisionQuery)
49
50from ndg.saml.common.xml import SAMLConstants
51from ndg.saml.xml.etree import (prettyPrint, AssertionElementTree, 
52                            AttributeQueryElementTree, ResponseElementTree,
53                            AuthzDecisionQueryElementTree)
54
55
56class SAMLUtil(object):
57    """SAML utility class based on ANL examples for Earth System Grid:
58    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
59    """
60    NAMEID_FORMAT = "urn:esg:openid"
61    NAMEID_VALUE = "https://openid.localhost/philip.kershaw"
62    ISSUER_DN = "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
63    UNCORRECTED_RESOURCE_URI = "http://LOCALHOST:80/My Secured URI"
64    RESOURCE_URI = "http://localhost/My%20Secured%20URI"
65    XSSTRING_NS = "http://www.w3.org/2001/XMLSchema#string"
66   
67    def __init__(self):
68        """Set-up ESG core attributes, Group/Role and miscellaneous
69        attributes lists
70        """
71        self.firstName = None
72        self.lastName = None
73        self.emailAddress = None
74       
75        self.__miscAttrList = []
76   
77    def addAttribute(self, name, value):
78        """Add a generic attribute
79        @type name: basestring
80        @param name: attribute name
81        @type value: basestring
82        @param value: attribute value
83        """
84        self.__miscAttrList.append((name, value))
85
86    def buildAssertion(self):
87        """Create a SAML Assertion containing ESG core attributes: First
88        Name, Last Name, e-mail Address; ESG Group/Role type attributes
89        and generic attributes
90        @rtype: ndg.security.common.saml.Assertion
91        @return: new SAML Assertion object
92        """
93       
94        assertion = Assertion()
95        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
96        assertion.id = str(uuid4())
97        assertion.issueInstant = datetime.utcnow()
98        attributeStatement = AttributeStatement()
99       
100        for attribute in self.createAttributes():
101            attributeStatement.attributes.append(attribute)
102           
103        assertion.attributeStatements.append(attributeStatement)
104       
105        return assertion
106
107    def buildAttributeQuery(self, issuer, subjectNameID):
108        """Make a SAML Attribute Query
109        @type issuer: basestring
110        @param issuer: attribute issuer name
111        @type subjectNameID: basestring
112        @param subjectNameID: identity to query attributes for
113        """
114        attributeQuery = AttributeQuery()
115        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
116        attributeQuery.id = str(uuid4())
117        attributeQuery.issueInstant = datetime.utcnow()
118       
119        attributeQuery.issuer = Issuer()
120        attributeQuery.issuer.format = Issuer.X509_SUBJECT
121        attributeQuery.issuer.value = issuer
122                       
123        attributeQuery.subject = Subject() 
124        attributeQuery.subject.nameID = NameID()
125        attributeQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
126        attributeQuery.subject.nameID.value = subjectNameID
127                                   
128        attributeQuery.attributes = self.createAttributes()
129       
130        return attributeQuery
131   
132    def createAttributes(self):
133        """Create SAML Attributes for use in an Assertion or AttributeQuery"""
134       
135        attributes = []
136        if self.firstName is not None:   
137            # special case handling for 'FirstName' attribute
138            fnAttribute = Attribute()
139            fnAttribute.name = "urn:esg:first:name"
140            fnAttribute.nameFormat = SAMLUtil.XSSTRING_NS
141            fnAttribute.friendlyName = "FirstName"
142
143            firstName = XSStringAttributeValue()
144            firstName.value = self.firstName
145            fnAttribute.attributeValues.append(firstName)
146
147            attributes.append(fnAttribute)
148       
149
150        if self.lastName is not None:
151            # special case handling for 'LastName' attribute
152            lnAttribute = Attribute()
153            lnAttribute.name = "urn:esg:last:name"
154            lnAttribute.nameFormat = SAMLUtil.XSSTRING_NS
155            lnAttribute.friendlyName = "LastName"
156
157            lastName = XSStringAttributeValue()
158            lastName.value = self.lastName
159            lnAttribute.attributeValues.append(lastName)
160
161            attributes.append(lnAttribute)
162       
163
164        if self.emailAddress is not None:
165            # special case handling for 'LastName' attribute
166            emailAddressAttribute = Attribute()
167            emailAddressAttribute.name = "urn:esg:email:address"
168            emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
169                                        XSStringAttributeValue.TYPE_LOCAL_NAME
170            emailAddressAttribute.friendlyName = "emailAddress"
171
172            emailAddress = XSStringAttributeValue()
173            emailAddress.value = self.emailAddress
174            emailAddressAttribute.attributeValues.append(emailAddress)
175
176            attributes.append(emailAddressAttribute)
177       
178        for name, value in self.__miscAttrList:
179            attribute = Attribute()
180            attribute.name = name
181            attribute.nameFormat = SAMLUtil.XSSTRING_NS
182
183            stringAttributeValue = XSStringAttributeValue()
184            stringAttributeValue.value = value
185            attribute.attributeValues.append(stringAttributeValue)
186
187            attributes.append(attribute)
188           
189        return attributes
190   
191    def buildAuthzDecisionQuery(self, 
192                                issuer=ISSUER_DN,
193                                issuerFormat=Issuer.X509_SUBJECT,
194                                subjectNameID=NAMEID_VALUE, 
195                                subjectNameIdFormat=NAMEID_FORMAT,
196                                resource=UNCORRECTED_RESOURCE_URI,
197                                actions=((Action.HTTP_GET_ACTION, 
198                                          Action.GHPP_NS_URI),)):
199        """Convenience utility to make an Authorisation decision query"""
200        authzDecisionQuery = AuthzDecisionQuery()
201
202        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
203        authzDecisionQuery.id = str(uuid4())
204        authzDecisionQuery.issueInstant = datetime.utcnow()
205       
206        authzDecisionQuery.issuer = Issuer()
207        authzDecisionQuery.issuer.format = issuerFormat
208        authzDecisionQuery.issuer.value = issuer
209       
210        authzDecisionQuery.subject = Subject()
211        authzDecisionQuery.subject.nameID = NameID()
212        authzDecisionQuery.subject.nameID.format = subjectNameIdFormat
213        authzDecisionQuery.subject.nameID.value = subjectNameID
214       
215        authzDecisionQuery.resource = resource
216       
217        for action, namespace in actions:
218            authzDecisionQuery.actions.append(Action())
219            authzDecisionQuery.actions[-1].namespace = namespace
220            authzDecisionQuery.actions[-1].value = action
221           
222        return authzDecisionQuery
223           
224
225class SAMLTestCase(unittest.TestCase):
226    """Test SAML implementation for use with CMIP5 federation"""
227    NAMEID_FORMAT = SAMLUtil.NAMEID_FORMAT
228    NAMEID_VALUE = SAMLUtil.NAMEID_VALUE
229    ISSUER_DN = SAMLUtil.ISSUER_DN
230    UNCORRECTED_RESOURCE_URI = SAMLUtil.UNCORRECTED_RESOURCE_URI
231    RESOURCE_URI = SAMLUtil.RESOURCE_URI
232   
233    def _createAttributeAssertionHelper(self):
234        samlUtil = SAMLUtil()
235       
236        # ESG core attributes
237        samlUtil.firstName = "Philip"
238        samlUtil.lastName = "Kershaw"
239        samlUtil.emailAddress = "p.j.k@somewhere"
240       
241        # BADC specific attributes
242        badcRoleList = (
243            'urn:badc:security:authz:1.0:attr:admin', 
244            'urn:badc:security:authz:1.0:attr:rapid', 
245            'urn:badc:security:authz:1.0:attr:coapec', 
246            'urn:badc:security:authz:1.0:attr:midas', 
247            'urn:badc:security:authz:1.0:attr:quest', 
248            'urn:badc:security:authz:1.0:attr:staff'
249        )
250        for role in badcRoleList:
251            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
252       
253        # Make an assertion object
254        assertion = samlUtil.buildAssertion()
255       
256        return assertion
257       
258    def test01CreateAssertion(self):
259         
260        assertion = self._createAttributeAssertionHelper()
261
262       
263        # Create ElementTree Assertion Element
264        assertionElem = AssertionElementTree.toXML(assertion)
265       
266        self.assert_(iselement(assertionElem))
267       
268        # Serialise to output
269        xmlOutput = prettyPrint(assertionElem)       
270        self.assert_(len(xmlOutput))
271       
272        print("\n"+"_"*80)
273        print(xmlOutput)
274        print("_"*80)
275
276    def test02ParseAssertion(self):
277        assertion = self._createAttributeAssertionHelper()
278       
279        # Create ElementTree Assertion Element
280        assertionElem = AssertionElementTree.toXML(assertion)
281       
282        self.assert_(iselement(assertionElem))
283       
284        # Serialise to output
285        xmlOutput = prettyPrint(assertionElem)       
286           
287        print("\n"+"_"*80)
288        print(xmlOutput)
289        print("_"*80)
290               
291        assertionStream = StringIO()
292        assertionStream.write(xmlOutput)
293        assertionStream.seek(0)
294
295        tree = ElementTree.parse(assertionStream)
296        elem2 = tree.getroot()
297       
298        assertion2 = AssertionElementTree.fromXML(elem2)
299        self.assert_(assertion2)
300       
301    def test03CreateAttributeQuery(self):
302        samlUtil = SAMLUtil()
303        samlUtil.firstName = ''
304        samlUtil.lastName = ''
305        samlUtil.emailAddress = ''
306        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
307                                                      SAMLTestCase.NAMEID_VALUE)
308       
309        elem = AttributeQueryElementTree.toXML(attributeQuery)
310        xmlOutput = prettyPrint(elem)
311           
312        print("\n"+"_"*80)
313        print(xmlOutput)
314        print("_"*80)
315
316    def test04ParseAttributeQuery(self):
317        samlUtil = SAMLUtil()
318        samlUtil.firstName = ''
319        samlUtil.lastName = ''
320        samlUtil.emailAddress = ''
321        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
322                                                      SAMLTestCase.NAMEID_VALUE)
323       
324        elem = AttributeQueryElementTree.toXML(attributeQuery)       
325        xmlOutput = prettyPrint(elem)       
326        print("\n"+"_"*80)
327        print(xmlOutput)
328               
329        attributeQueryStream = StringIO()
330        attributeQueryStream.write(xmlOutput)
331        attributeQueryStream.seek(0)
332
333        tree = ElementTree.parse(attributeQueryStream)
334        elem2 = tree.getroot()
335       
336        attributeQuery2 = AttributeQueryElementTree.fromXML(elem2)
337        self.assert_(attributeQuery2.id == attributeQuery.id)
338        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
339        self.assert_(attributeQuery2.subject.nameID.value == \
340                     attributeQuery.subject.nameID.value)
341       
342        self.assert_(attributeQuery2.attributes[1].name == \
343                     attributeQuery.attributes[1].name)
344       
345        xmlOutput2 = prettyPrint(elem2)       
346        print("_"*80)
347        print(xmlOutput2)
348        print("_"*80)
349
350    def _createAttributeQueryResponse(self):
351        response = Response()
352        response.issueInstant = datetime.utcnow()
353       
354        # Make up a request ID that this response is responding to
355        response.inResponseTo = str(uuid4())
356        response.id = str(uuid4())
357        response.version = SAMLVersion(SAMLVersion.VERSION_20)
358           
359        response.issuer = Issuer()
360        response.issuer.format = Issuer.X509_SUBJECT
361        response.issuer.value = \
362                        SAMLTestCase.ISSUER_DN
363       
364        response.status = Status()
365        response.status.statusCode = StatusCode()
366        response.status.statusCode.value = StatusCode.SUCCESS_URI
367        response.status.statusMessage = StatusMessage()       
368        response.status.statusMessage.value = "Response created successfully"
369           
370        assertion = self._createAttributeAssertionHelper()
371       
372        # Add a conditions statement for a validity of 8 hours
373        assertion.conditions = Conditions()
374        assertion.conditions.notBefore = datetime.utcnow()
375        assertion.conditions.notOnOrAfter = (assertion.conditions.notBefore + 
376                                             timedelta(seconds=60*60*8))
377       
378        assertion.subject = Subject() 
379        assertion.subject.nameID = NameID()
380        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
381        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE   
382           
383        assertion.issuer = Issuer()
384        assertion.issuer.format = Issuer.X509_SUBJECT
385        assertion.issuer.value = SAMLTestCase.ISSUER_DN
386
387        response.assertions.append(assertion)
388       
389        return response
390       
391    def test05CreateAttributeQueryResponse(self):
392        response = self._createAttributeQueryResponse()
393       
394        # Create ElementTree Assertion Element
395        responseElem = ResponseElementTree.toXML(response)
396       
397        self.assert_(iselement(responseElem))
398       
399        # Serialise to output       
400        xmlOutput = prettyPrint(responseElem)       
401        self.assert_(len(xmlOutput))
402        print("\n"+"_"*80)
403        print(xmlOutput)
404        print("_"*80)
405       
406    def _createAuthzDecisionQuery(self):
407        authzDecisionQuery = AuthzDecisionQuery()
408
409        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
410        authzDecisionQuery.id = str(uuid4())
411        authzDecisionQuery.issueInstant = datetime.utcnow()
412       
413        authzDecisionQuery.issuer = Issuer()
414        authzDecisionQuery.issuer.format = Issuer.X509_SUBJECT
415        authzDecisionQuery.issuer.value = SAMLTestCase.ISSUER_DN
416       
417        authzDecisionQuery.subject = Subject()
418        authzDecisionQuery.subject.nameID = NameID()
419        authzDecisionQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
420        authzDecisionQuery.subject.nameID.value = SAMLTestCase.NAMEID_VALUE
421       
422        authzDecisionQuery.resource = "http://LOCALHOST:80/My Secured URI"
423       
424        return authzDecisionQuery
425   
426    def test06CreateAuthzDecisionQuery(self):
427        samlUtil = SAMLUtil()
428        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
429       
430        self.assert_(":80" not in authzDecisionQuery.resource)
431        self.assert_("localhost" in authzDecisionQuery.resource)
432        self.assert_(" " not in authzDecisionQuery.resource)
433       
434        authzDecisionQuery.resource = \
435            "https://Somewhere.ac.uk:443/My Secured URI?blah=4&yes=True"
436           
437        self.assert_(":443" not in authzDecisionQuery.resource)
438        self.assert_("somewhere.ac.uk" in authzDecisionQuery.resource)
439        self.assert_("yes=True" in authzDecisionQuery.resource)
440       
441        authzDecisionQuery.actions.append(Action())
442        authzDecisionQuery.actions[0].namespace = Action.GHPP_NS_URI
443        authzDecisionQuery.actions[0].value = Action.HTTP_GET_ACTION
444       
445        self.assert_(
446            authzDecisionQuery.actions[0].value == Action.HTTP_GET_ACTION)
447        self.assert_(
448            authzDecisionQuery.actions[0].namespace == Action.GHPP_NS_URI)
449       
450        # Try out the restricted vocabulary
451        try:
452            authzDecisionQuery.actions[0].value = "delete everything"
453            self.fail("Expecting AttributeError raised for incorrect action "
454                      "setting.")
455        except AttributeError, e:
456            print("Caught incorrect action type setting: %s" % e)
457       
458        authzDecisionQuery.actions[0].actionTypes = {'urn:malicious': 
459                                                     ("delete everything",)}
460       
461        # Try again now that the actipn types have been adjusted
462        authzDecisionQuery.actions[0].namespace = 'urn:malicious'
463        authzDecisionQuery.actions[0].value = "delete everything"
464       
465    def test07SerializeAuthzDecisionQuery(self):
466        samlUtil = SAMLUtil()
467        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
468       
469        # Create ElementTree Assertion Element
470        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML(
471                                                            authzDecisionQuery)
472       
473        self.assert_(iselement(authzDecisionQueryElem))
474       
475        # Serialise to output
476        xmlOutput = prettyPrint(authzDecisionQueryElem)       
477        self.assert_(len(xmlOutput))
478       
479        print("\n"+"_"*80)
480        print(xmlOutput)
481        print("_"*80)
482   
483    def test08DeserializeAuthzDecisionQuery(self):
484        samlUtil = SAMLUtil()
485        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
486       
487        # Create ElementTree Assertion Element
488        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML(
489                                                            authzDecisionQuery)
490       
491        self.assert_(iselement(authzDecisionQueryElem))
492       
493        # Serialise to output
494        xmlOutput = prettyPrint(authzDecisionQueryElem)       
495        self.assert_(len(xmlOutput))
496       
497        authzDecisionQueryStream = StringIO()
498        authzDecisionQueryStream.write(xmlOutput)
499        authzDecisionQueryStream.seek(0)
500
501        tree = ElementTree.parse(authzDecisionQueryStream)
502        elem2 = tree.getroot()
503       
504        authzDecisionQuery2 = AuthzDecisionQueryElementTree.fromXML(elem2)
505        self.assert_(authzDecisionQuery2)
506        self.assert_(
507        authzDecisionQuery2.subject.nameID.value == SAMLTestCase.NAMEID_VALUE)
508        self.assert_(
509        authzDecisionQuery2.subject.nameID.format == SAMLTestCase.NAMEID_FORMAT)
510        self.assert_(
511            authzDecisionQuery2.issuer.value == SAMLTestCase.ISSUER_DN)
512        self.assert_(
513            authzDecisionQuery2.resource == SAMLTestCase.RESOURCE_URI)
514        self.assert_(len(authzDecisionQuery2.actions) == 1)
515        self.assert_(
516            authzDecisionQuery2.actions[0].value == Action.HTTP_GET_ACTION)
517        self.assert_(
518            authzDecisionQuery2.actions[0].namespace == Action.GHPP_NS_URI)
519        self.assert_(authzDecisionQuery2.evidence is None)
520
521    def _createAuthzDecisionQueryResponse(self):
522        """Helper method for Authz Decision Response"""
523        response = Response()
524        now = datetime.utcnow()
525        response.issueInstant = now
526       
527        # Make up a request ID that this response is responding to
528        response.inResponseTo = str(uuid4())
529        response.id = str(uuid4())
530        response.version = SAMLVersion(SAMLVersion.VERSION_20)
531           
532        response.issuer = Issuer()
533        response.issuer.format = Issuer.X509_SUBJECT
534        response.issuer.value = SAMLTestCase.ISSUER_DN
535       
536        response.status = Status()
537        response.status.statusCode = StatusCode()
538        response.status.statusCode.value = StatusCode.SUCCESS_URI
539        response.status.statusMessage = StatusMessage()       
540        response.status.statusMessage.value = "Response created successfully"
541           
542        assertion = Assertion()
543        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
544        assertion.id = str(uuid4())
545        assertion.issueInstant = now
546       
547        authzDecisionStatement = AuthzDecisionStatement()
548        authzDecisionStatement.decision = DecisionType.PERMIT
549        authzDecisionStatement.resource = SAMLTestCase.RESOURCE_URI
550        authzDecisionStatement.actions.append(Action())
551        authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
552        authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
553        assertion.authzDecisionStatements.append(authzDecisionStatement)
554       
555        # Add a conditions statement for a validity of 8 hours
556        assertion.conditions = Conditions()
557        assertion.conditions.notBefore = now
558        assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8)
559               
560        assertion.subject = Subject() 
561        assertion.subject.nameID = NameID()
562        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
563        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE   
564           
565        assertion.issuer = Issuer()
566        assertion.issuer.format = Issuer.X509_SUBJECT
567        assertion.issuer.value = SAMLTestCase.ISSUER_DN
568
569        response.assertions.append(assertion)
570       
571        return response
572       
573    def test09CreateAuthzDecisionQueryResponse(self):
574        response = self._createAuthzDecisionQueryResponse()
575        self.assert_(response.assertions[0])
576        self.assert_(response.assertions[0].authzDecisionStatements[0])
577        self.assert_(response.assertions[0].authzDecisionStatements[0
578            ].decision == DecisionType.PERMIT)
579        self.assert_(response.assertions[0].authzDecisionStatements[0
580            ].resource == SAMLTestCase.RESOURCE_URI)
581        self.assert_(response.assertions[0].authzDecisionStatements[0
582            ].decision == DecisionType.PERMIT)
583        self.assert_(response.assertions[0].authzDecisionStatements[0
584            ].actions[-1].namespace == Action.GHPP_NS_URI)
585        self.assert_(response.assertions[0].authzDecisionStatements[0
586            ].actions[-1].value == Action.HTTP_GET_ACTION)
587     
588    def _serializeAuthzDecisionQueryResponse(self):
589        response = self._createAuthzDecisionQueryResponse()
590       
591        # Create ElementTree Assertion Element
592        responseElem = ResponseElementTree.toXML(response)
593        self.assert_(iselement(responseElem))
594       
595        # Serialise to output       
596        xmlOutput = prettyPrint(responseElem)
597        return xmlOutput
598   
599    def test10SerializeAuthzDecisionQueryResponse(self):
600        xmlOutput = self._serializeAuthzDecisionQueryResponse()
601        self.assert_(len(xmlOutput))
602        print("\n"+"_"*80)
603        print(xmlOutput)
604        print("_"*80)
605       
606        self.assert_('AuthzDecisionStatement' in xmlOutput)
607        self.assert_('GET' in xmlOutput)
608        self.assert_('Permit' in xmlOutput)
609
610    def test11DeserializeAuthzDecisionResponse(self):
611        xmlOutput = self._serializeAuthzDecisionQueryResponse()
612       
613        authzDecisionResponseStream = StringIO()
614        authzDecisionResponseStream.write(xmlOutput)
615        authzDecisionResponseStream.seek(0)
616
617        tree = ElementTree.parse(authzDecisionResponseStream)
618        elem = tree.getroot()
619        response = ResponseElementTree.fromXML(elem)
620       
621        self.assert_(response.assertions[0])
622        self.assert_(response.assertions[0].authzDecisionStatements[0])
623        self.assert_(response.assertions[0].authzDecisionStatements[0
624            ].decision == DecisionType.PERMIT)
625        self.assert_(response.assertions[0].authzDecisionStatements[0
626            ].resource == SAMLTestCase.RESOURCE_URI)
627        self.assert_(response.assertions[0].authzDecisionStatements[0
628            ].decision == DecisionType.PERMIT)
629        self.assert_(response.assertions[0].authzDecisionStatements[0
630            ].actions[-1].namespace == Action.GHPP_NS_URI)
631        self.assert_(response.assertions[0].authzDecisionStatements[0
632            ].actions[-1].value == Action.HTTP_GET_ACTION)
633       
634 
635    def test12PickleAssertion(self):
636        # Test pickling with __slots__
637        assertion = self._createAttributeAssertionHelper()
638        assertion.issuer = Issuer()
639        assertion.issuer.format = Issuer.X509_SUBJECT
640        assertion.issuer.value = SAMLTestCase.ISSUER_DN
641       
642        jar = pickle.dumps(assertion)
643        assertion2 = pickle.loads(jar)
644        self.assert_(isinstance(assertion2, Assertion))
645        self.assert_(assertion2.issuer.value == assertion.issuer.value)
646        self.assert_(assertion2.issuer.format == assertion.issuer.format)
647        self.assert_(len(assertion2.attributeStatements)==1)
648        self.assert_(len(assertion2.attributeStatements[0].attributes) > 0)
649        self.assert_(assertion2.attributeStatements[0].attributes[0
650                     ].attributeValues[0
651                     ].value == assertion.attributeStatements[0].attributes[0
652                                ].attributeValues[0].value)
653       
654    def test13PickleAttributeQuery(self):
655        # Test pickling with __slots__
656        samlUtil = SAMLUtil()
657        samlUtil.firstName = ''
658        samlUtil.lastName = ''
659        samlUtil.emailAddress = ''
660        query = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
661                                             SAMLTestCase.NAMEID_VALUE)
662       
663        jar = pickle.dumps(query)
664        query2 = pickle.loads(jar)
665
666        self.assert_(isinstance(query2, AttributeQuery))
667        self.assert_(query2.subject.nameID.value == query.subject.nameID.value)
668        self.assert_((query2.subject.nameID.format == 
669                      query.subject.nameID.format))
670        self.assert_(query2.issuer.value == query.issuer.value)
671        self.assert_(query2.issuer.format == query.issuer.format)
672        self.assert_(query2.issueInstant == query.issueInstant)
673        self.assert_(query2.id == query.id)
674        self.assert_(len(query2.attributes) == 3)
675        self.assert_(query2.attributes[0].name == "urn:esg:first:name")
676        self.assert_(query2.attributes[1].nameFormat == SAMLUtil.XSSTRING_NS)
677
678    def test14PickleAttributeQueryResponse(self):
679        response = self._createAttributeQueryResponse()
680       
681        jar = pickle.dumps(response)
682        response2 = pickle.loads(jar)
683       
684        self.assert_(isinstance(response2, Response))
685        self.assert_((response2.status.statusCode.value == 
686                      response.status.statusCode.value))
687        self.assert_((response2.status.statusMessage.value == 
688                      response.status.statusMessage.value))
689        self.assert_(len(response2.assertions) == 1)
690        self.assert_(response2.assertions[0].id == response.assertions[0].id)
691        self.assert_((response2.assertions[0].conditions.notBefore == 
692                      response.assertions[0].conditions.notBefore))
693        self.assert_((response2.assertions[0].conditions.notOnOrAfter == 
694                      response.assertions[0].conditions.notOnOrAfter))
695        self.assert_(len(response2.assertions[0].attributeStatements) == 1)
696        self.assert_(len(response2.assertions[0].attributeStatements[0
697                                                            ].attributes) == 9)
698        self.assert_(response2.assertions[0].attributeStatements[0].attributes[1
699                     ].attributeValues[0
700                     ].value == response.assertions[0].attributeStatements[0
701                                    ].attributes[1].attributeValues[0].value)
702             
703    def test15PickleAuthzDecisionQuery(self):
704        samlUtil = SAMLUtil()
705        query = samlUtil.buildAuthzDecisionQuery()
706             
707        jar = pickle.dumps(query)
708        query2 = pickle.loads(jar)
709       
710        self.assert_(isinstance(query2, AuthzDecisionQuery))
711        self.assert_(query.resource == query2.resource)
712        self.assert_(query.version == query2.version)
713        self.assert_(len(query2.actions) == 1)
714        self.assert_(query2.actions[0].value == Action.HTTP_GET_ACTION)
715        self.assert_(query2.actions[0].namespace == Action.GHPP_NS_URI)
716
717    def test16PickleAuthzDecisionResponse(self):
718        response = self._createAuthzDecisionQueryResponse()
719       
720        jar = pickle.dumps(response)
721        response2 = pickle.loads(jar)
722       
723        self.assert_(isinstance(response2, Response))
724       
725        self.assert_(len(response.assertions) == 1)
726        self.assert_(len(response.assertions[0].authzDecisionStatements) == 1)
727         
728        self.assert_(response.assertions[0].authzDecisionStatements[0
729                        ].resource == response2.assertions[0
730                                        ].authzDecisionStatements[0].resource)
731       
732        self.assert_(len(response.assertions[0].authzDecisionStatements[0
733                        ].actions) == 1)
734        self.assert_(response.assertions[0].authzDecisionStatements[0
735                        ].actions[0].value == response2.assertions[0
736                                        ].authzDecisionStatements[0
737                                                ].actions[0].value)
738       
739        self.assert_(response2.assertions[0].authzDecisionStatements[0
740                        ].actions[0].namespace == Action.GHPP_NS_URI)       
741
742        self.assert_(response2.assertions[0].authzDecisionStatements[0
743                        ].decision == DecisionType.PERMIT)       
744       
745    def test17SAMLDatetime(self):
746        # Test parsing of Datetimes following
747        # http://www.w3.org/TR/xmlschema-2/#dateTime
748       
749        # No seconds fraction
750        self.assert_(SAMLDateTime.fromString('2010-10-20T14:49:50Z'))
751       
752        self.assertRaises(TypeError, SAMLDateTime.fromString, 
753                          None)
754       
755       
756if __name__ == "__main__":
757    unittest.main()       
Note: See TracBrowser for help on using the repository browser.