source: TI12-security/trunk/ndg_saml/ndg/saml/test/test_saml.py @ 6901

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/ndg_saml/ndg/saml/test/test_saml.py@6901
Revision 6901, 30.0 KB checked in by pjkersha, 10 years ago (diff)

Incomplete - task 6: Put NDG SAML package on PyPI

  • updating epydoc for 'core' module
Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4
5This implementation is adapted from the Java OpenSAML implementation.  The
6copyright and licence information are included here:
7
8Copyright [2005] [University Corporation for Advanced Internet Development, Inc.]
9
10Licensed under the Apache License, Version 2.0 (the "License");
11you may not use this file except in compliance with the License.
12You may obtain a copy of the License at
13
14http://www.apache.org/licenses/LICENSE-2.0
15
16Unless required by applicable law or agreed to in writing, software
17distributed under the License is distributed on an "AS IS" BASIS,
18WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19See the License for the specific language governing permissions and
20limitations under the License.
21"""
22__author__ = "P J Kershaw"
23__date__ = "21/07/09"
24__copyright__ = "(C) 2009 Science and Technology Facilities Council"
25__license__ = "http://www.apache.org/licenses/LICENSE-2.0"
26__contact__ = "Philip.Kershaw@stfc.ac.uk"
27__revision__ = '$Id$'
28import logging
29logging.basicConfig(level=logging.DEBUG)
30   
31from datetime import datetime, timedelta
32from uuid import uuid4
33from cStringIO import StringIO
34
35import unittest
36import pickle
37
38from xml.etree.ElementTree import iselement
39from xml.etree import ElementTree
40
41from ndg.saml.saml2.core import (SAMLVersion, Attribute, AttributeStatement, 
42                                 AuthzDecisionStatement, Assertion, 
43                                 AttributeQuery, Response, Issuer, Subject, 
44                                 NameID, StatusCode, StatusMessage, Status, 
45                                 Conditions, DecisionType, 
46                                 XSStringAttributeValue, Action, 
47                                 AuthzDecisionQuery)
48
49from ndg.saml.common.xml import SAMLConstants
50from ndg.saml.xml.etree import (prettyPrint, AssertionElementTree, 
51                            AttributeQueryElementTree, ResponseElementTree,
52                            AuthzDecisionQueryElementTree)
53
54
55class SAMLUtil(object):
56    """SAML utility class based on ANL examples for Earth System Grid:
57    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
58    """
59    NAMEID_FORMAT = "urn:esg:openid"
60    NAMEID_VALUE = "https://openid.localhost/philip.kershaw"
61    ISSUER_DN = "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
62    UNCORRECTED_RESOURCE_URI = "http://LOCALHOST:80/My Secured URI"
63    RESOURCE_URI = "http://localhost/My%20Secured%20URI"
64    XSSTRING_NS = "http://www.w3.org/2001/XMLSchema#string"
65   
66    def __init__(self):
67        """Set-up ESG core attributes, Group/Role and miscellaneous
68        attributes lists
69        """
70        self.firstName = None
71        self.lastName = None
72        self.emailAddress = None
73       
74        self.__miscAttrList = []
75   
76    def addAttribute(self, name, value):
77        """Add a generic attribute
78        @type name: basestring
79        @param name: attribute name
80        @type value: basestring
81        @param value: attribute value
82        """
83        self.__miscAttrList.append((name, value))
84
85    def buildAssertion(self):
86        """Create a SAML Assertion containing ESG core attributes: First
87        Name, Last Name, e-mail Address; ESG Group/Role type attributes
88        and generic attributes
89        @rtype: ndg.security.common.saml.Assertion
90        @return: new SAML Assertion object
91        """
92       
93        assertion = Assertion()
94        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
95        assertion.id = str(uuid4())
96        assertion.issueInstant = datetime.utcnow()
97        attributeStatement = AttributeStatement()
98       
99        for attribute in self.createAttributes():
100            attributeStatement.attributes.append(attribute)
101           
102        assertion.attributeStatements.append(attributeStatement)
103       
104        return assertion
105
106    def buildAttributeQuery(self, issuer, subjectNameID):
107        """Make a SAML Attribute Query
108        @type issuer: basestring
109        @param issuer: attribute issuer name
110        @type subjectNameID: basestring
111        @param subjectNameID: identity to query attributes for
112        """
113        attributeQuery = AttributeQuery()
114        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
115        attributeQuery.id = str(uuid4())
116        attributeQuery.issueInstant = datetime.utcnow()
117       
118        attributeQuery.issuer = Issuer()
119        attributeQuery.issuer.format = Issuer.X509_SUBJECT
120        attributeQuery.issuer.value = issuer
121                       
122        attributeQuery.subject = Subject() 
123        attributeQuery.subject.nameID = NameID()
124        attributeQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
125        attributeQuery.subject.nameID.value = subjectNameID
126                                   
127        attributeQuery.attributes = self.createAttributes()
128       
129        return attributeQuery
130   
131    def createAttributes(self):
132        """Create SAML Attributes for use in an Assertion or AttributeQuery"""
133       
134        attributes = []
135        if self.firstName is not None:   
136            # special case handling for 'FirstName' attribute
137            fnAttribute = Attribute()
138            fnAttribute.name = "urn:esg:first:name"
139            fnAttribute.nameFormat = SAMLUtil.XSSTRING_NS
140            fnAttribute.friendlyName = "FirstName"
141
142            firstName = XSStringAttributeValue()
143            firstName.value = self.firstName
144            fnAttribute.attributeValues.append(firstName)
145
146            attributes.append(fnAttribute)
147       
148
149        if self.lastName is not None:
150            # special case handling for 'LastName' attribute
151            lnAttribute = Attribute()
152            lnAttribute.name = "urn:esg:last:name"
153            lnAttribute.nameFormat = SAMLUtil.XSSTRING_NS
154            lnAttribute.friendlyName = "LastName"
155
156            lastName = XSStringAttributeValue()
157            lastName.value = self.lastName
158            lnAttribute.attributeValues.append(lastName)
159
160            attributes.append(lnAttribute)
161       
162
163        if self.emailAddress is not None:
164            # special case handling for 'LastName' attribute
165            emailAddressAttribute = Attribute()
166            emailAddressAttribute.name = "urn:esg:email:address"
167            emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
168                                        XSStringAttributeValue.TYPE_LOCAL_NAME
169            emailAddressAttribute.friendlyName = "emailAddress"
170
171            emailAddress = XSStringAttributeValue()
172            emailAddress.value = self.emailAddress
173            emailAddressAttribute.attributeValues.append(emailAddress)
174
175            attributes.append(emailAddressAttribute)
176       
177        for name, value in self.__miscAttrList:
178            attribute = Attribute()
179            attribute.name = name
180            attribute.nameFormat = SAMLUtil.XSSTRING_NS
181
182            stringAttributeValue = XSStringAttributeValue()
183            stringAttributeValue.value = value
184            attribute.attributeValues.append(stringAttributeValue)
185
186            attributes.append(attribute)
187           
188        return attributes
189   
190    def buildAuthzDecisionQuery(self, 
191                                issuer=ISSUER_DN,
192                                issuerFormat=Issuer.X509_SUBJECT,
193                                subjectNameID=NAMEID_VALUE, 
194                                subjectNameIdFormat=NAMEID_FORMAT,
195                                resource=UNCORRECTED_RESOURCE_URI,
196                                actions=((Action.HTTP_GET_ACTION, 
197                                          Action.GHPP_NS_URI),)):
198        """Convenience utility to make an Authorisation decision query"""
199        authzDecisionQuery = AuthzDecisionQuery()
200
201        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
202        authzDecisionQuery.id = str(uuid4())
203        authzDecisionQuery.issueInstant = datetime.utcnow()
204       
205        authzDecisionQuery.issuer = Issuer()
206        authzDecisionQuery.issuer.format = issuerFormat
207        authzDecisionQuery.issuer.value = issuer
208       
209        authzDecisionQuery.subject = Subject()
210        authzDecisionQuery.subject.nameID = NameID()
211        authzDecisionQuery.subject.nameID.format = subjectNameIdFormat
212        authzDecisionQuery.subject.nameID.value = subjectNameID
213       
214        authzDecisionQuery.resource = resource
215       
216        for action, namespace in actions:
217            authzDecisionQuery.actions.append(Action())
218            authzDecisionQuery.actions[-1].namespace = namespace
219            authzDecisionQuery.actions[-1].value = action
220           
221        return authzDecisionQuery
222           
223
224class SAMLTestCase(unittest.TestCase):
225    """Test SAML implementation for use with CMIP5 federation"""
226    NAMEID_FORMAT = SAMLUtil.NAMEID_FORMAT
227    NAMEID_VALUE = SAMLUtil.NAMEID_VALUE
228    ISSUER_DN = SAMLUtil.ISSUER_DN
229    UNCORRECTED_RESOURCE_URI = SAMLUtil.UNCORRECTED_RESOURCE_URI
230    RESOURCE_URI = SAMLUtil.RESOURCE_URI
231   
232    def _createAttributeAssertionHelper(self):
233        samlUtil = SAMLUtil()
234       
235        # ESG core attributes
236        samlUtil.firstName = "Philip"
237        samlUtil.lastName = "Kershaw"
238        samlUtil.emailAddress = "p.j.k@somewhere"
239       
240        # BADC specific attributes
241        badcRoleList = (
242            'urn:badc:security:authz:1.0:attr:admin', 
243            'urn:badc:security:authz:1.0:attr:rapid', 
244            'urn:badc:security:authz:1.0:attr:coapec', 
245            'urn:badc:security:authz:1.0:attr:midas', 
246            'urn:badc:security:authz:1.0:attr:quest', 
247            'urn:badc:security:authz:1.0:attr:staff'
248        )
249        for role in badcRoleList:
250            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
251       
252        # Make an assertion object
253        assertion = samlUtil.buildAssertion()
254       
255        return assertion
256       
257    def test01CreateAssertion(self):
258         
259        assertion = self._createAttributeAssertionHelper()
260
261       
262        # Create ElementTree Assertion Element
263        assertionElem = AssertionElementTree.toXML(assertion)
264       
265        self.assert_(iselement(assertionElem))
266       
267        # Serialise to output
268        xmlOutput = prettyPrint(assertionElem)       
269        self.assert_(len(xmlOutput))
270       
271        print("\n"+"_"*80)
272        print(xmlOutput)
273        print("_"*80)
274
275    def test02ParseAssertion(self):
276        assertion = self._createAttributeAssertionHelper()
277       
278        # Create ElementTree Assertion Element
279        assertionElem = AssertionElementTree.toXML(assertion)
280       
281        self.assert_(iselement(assertionElem))
282       
283        # Serialise to output
284        xmlOutput = prettyPrint(assertionElem)       
285           
286        print("\n"+"_"*80)
287        print(xmlOutput)
288        print("_"*80)
289               
290        assertionStream = StringIO()
291        assertionStream.write(xmlOutput)
292        assertionStream.seek(0)
293
294        tree = ElementTree.parse(assertionStream)
295        elem2 = tree.getroot()
296       
297        assertion2 = AssertionElementTree.fromXML(elem2)
298        self.assert_(assertion2)
299       
300    def test03CreateAttributeQuery(self):
301        samlUtil = SAMLUtil()
302        samlUtil.firstName = ''
303        samlUtil.lastName = ''
304        samlUtil.emailAddress = ''
305        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
306                                                      SAMLTestCase.NAMEID_VALUE)
307       
308        elem = AttributeQueryElementTree.toXML(attributeQuery)
309        xmlOutput = prettyPrint(elem)
310           
311        print("\n"+"_"*80)
312        print(xmlOutput)
313        print("_"*80)
314
315    def test04ParseAttributeQuery(self):
316        samlUtil = SAMLUtil()
317        samlUtil.firstName = ''
318        samlUtil.lastName = ''
319        samlUtil.emailAddress = ''
320        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
321                                                      SAMLTestCase.NAMEID_VALUE)
322       
323        elem = AttributeQueryElementTree.toXML(attributeQuery)       
324        xmlOutput = prettyPrint(elem)       
325        print("\n"+"_"*80)
326        print(xmlOutput)
327               
328        attributeQueryStream = StringIO()
329        attributeQueryStream.write(xmlOutput)
330        attributeQueryStream.seek(0)
331
332        tree = ElementTree.parse(attributeQueryStream)
333        elem2 = tree.getroot()
334       
335        attributeQuery2 = AttributeQueryElementTree.fromXML(elem2)
336        self.assert_(attributeQuery2.id == attributeQuery.id)
337        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
338        self.assert_(attributeQuery2.subject.nameID.value == \
339                     attributeQuery.subject.nameID.value)
340       
341        self.assert_(attributeQuery2.attributes[1].name == \
342                     attributeQuery.attributes[1].name)
343       
344        xmlOutput2 = prettyPrint(elem2)       
345        print("_"*80)
346        print(xmlOutput2)
347        print("_"*80)
348
349    def _createAttributeQueryResponse(self):
350        response = Response()
351        response.issueInstant = datetime.utcnow()
352       
353        # Make up a request ID that this response is responding to
354        response.inResponseTo = str(uuid4())
355        response.id = str(uuid4())
356        response.version = SAMLVersion(SAMLVersion.VERSION_20)
357           
358        response.issuer = Issuer()
359        response.issuer.format = Issuer.X509_SUBJECT
360        response.issuer.value = \
361                        SAMLTestCase.ISSUER_DN
362       
363        response.status = Status()
364        response.status.statusCode = StatusCode()
365        response.status.statusCode.value = StatusCode.SUCCESS_URI
366        response.status.statusMessage = StatusMessage()       
367        response.status.statusMessage.value = "Response created successfully"
368           
369        assertion = self._createAttributeAssertionHelper()
370       
371        # Add a conditions statement for a validity of 8 hours
372        assertion.conditions = Conditions()
373        assertion.conditions.notBefore = datetime.utcnow()
374        assertion.conditions.notOnOrAfter = (assertion.conditions.notBefore + 
375                                             timedelta(seconds=60*60*8))
376       
377        assertion.subject = Subject() 
378        assertion.subject.nameID = NameID()
379        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
380        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE   
381           
382        assertion.issuer = Issuer()
383        assertion.issuer.format = Issuer.X509_SUBJECT
384        assertion.issuer.value = SAMLTestCase.ISSUER_DN
385
386        response.assertions.append(assertion)
387       
388        return response
389       
390    def test05CreateAttributeQueryResponse(self):
391        response = self._createAttributeQueryResponse()
392       
393        # Create ElementTree Assertion Element
394        responseElem = ResponseElementTree.toXML(response)
395       
396        self.assert_(iselement(responseElem))
397       
398        # Serialise to output       
399        xmlOutput = prettyPrint(responseElem)       
400        self.assert_(len(xmlOutput))
401        print("\n"+"_"*80)
402        print(xmlOutput)
403        print("_"*80)
404       
405    def _createAuthzDecisionQuery(self):
406        authzDecisionQuery = AuthzDecisionQuery()
407
408        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
409        authzDecisionQuery.id = str(uuid4())
410        authzDecisionQuery.issueInstant = datetime.utcnow()
411       
412        authzDecisionQuery.issuer = Issuer()
413        authzDecisionQuery.issuer.format = Issuer.X509_SUBJECT
414        authzDecisionQuery.issuer.value = SAMLTestCase.ISSUER_DN
415       
416        authzDecisionQuery.subject = Subject()
417        authzDecisionQuery.subject.nameID = NameID()
418        authzDecisionQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
419        authzDecisionQuery.subject.nameID.value = SAMLTestCase.NAMEID_VALUE
420       
421        authzDecisionQuery.resource = "http://LOCALHOST:80/My Secured URI"
422       
423        return authzDecisionQuery
424   
425    def test06CreateAuthzDecisionQuery(self):
426        samlUtil = SAMLUtil()
427        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
428       
429        self.assert_(":80" not in authzDecisionQuery.resource)
430        self.assert_("localhost" in authzDecisionQuery.resource)
431        self.assert_(" " not in authzDecisionQuery.resource)
432       
433        authzDecisionQuery.resource = \
434            "https://Somewhere.ac.uk:443/My Secured URI?blah=4&yes=True"
435           
436        self.assert_(":443" not in authzDecisionQuery.resource)
437        self.assert_("somewhere.ac.uk" in authzDecisionQuery.resource)
438        self.assert_("yes=True" in authzDecisionQuery.resource)
439       
440        authzDecisionQuery.actions.append(Action())
441        authzDecisionQuery.actions[0].namespace = Action.GHPP_NS_URI
442        authzDecisionQuery.actions[0].value = Action.HTTP_GET_ACTION
443       
444        self.assert_(
445            authzDecisionQuery.actions[0].value == Action.HTTP_GET_ACTION)
446        self.assert_(
447            authzDecisionQuery.actions[0].namespace == Action.GHPP_NS_URI)
448       
449        # Try out the restricted vocabulary
450        try:
451            authzDecisionQuery.actions[0].value = "delete everything"
452            self.fail("Expecting AttributeError raised for incorrect action "
453                      "setting.")
454        except AttributeError, e:
455            print("Caught incorrect action type setting: %s" % e)
456       
457        authzDecisionQuery.actions[0].actionTypes = {'urn:malicious': 
458                                                     ("delete everything",)}
459       
460        # Try again now that the actipn types have been adjusted
461        authzDecisionQuery.actions[0].namespace = 'urn:malicious'
462        authzDecisionQuery.actions[0].value = "delete everything"
463       
464    def test07SerializeAuthzDecisionQuery(self):
465        samlUtil = SAMLUtil()
466        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
467       
468        # Create ElementTree Assertion Element
469        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML(
470                                                            authzDecisionQuery)
471       
472        self.assert_(iselement(authzDecisionQueryElem))
473       
474        # Serialise to output
475        xmlOutput = prettyPrint(authzDecisionQueryElem)       
476        self.assert_(len(xmlOutput))
477       
478        print("\n"+"_"*80)
479        print(xmlOutput)
480        print("_"*80)
481   
482    def test08DeserializeAuthzDecisionQuery(self):
483        samlUtil = SAMLUtil()
484        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery()
485       
486        # Create ElementTree Assertion Element
487        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML(
488                                                            authzDecisionQuery)
489       
490        self.assert_(iselement(authzDecisionQueryElem))
491       
492        # Serialise to output
493        xmlOutput = prettyPrint(authzDecisionQueryElem)       
494        self.assert_(len(xmlOutput))
495       
496        authzDecisionQueryStream = StringIO()
497        authzDecisionQueryStream.write(xmlOutput)
498        authzDecisionQueryStream.seek(0)
499
500        tree = ElementTree.parse(authzDecisionQueryStream)
501        elem2 = tree.getroot()
502       
503        authzDecisionQuery2 = AuthzDecisionQueryElementTree.fromXML(elem2)
504        self.assert_(authzDecisionQuery2)
505        self.assert_(
506        authzDecisionQuery2.subject.nameID.value == SAMLTestCase.NAMEID_VALUE)
507        self.assert_(
508        authzDecisionQuery2.subject.nameID.format == SAMLTestCase.NAMEID_FORMAT)
509        self.assert_(
510            authzDecisionQuery2.issuer.value == SAMLTestCase.ISSUER_DN)
511        self.assert_(
512            authzDecisionQuery2.resource == SAMLTestCase.RESOURCE_URI)
513        self.assert_(len(authzDecisionQuery2.actions) == 1)
514        self.assert_(
515            authzDecisionQuery2.actions[0].value == Action.HTTP_GET_ACTION)
516        self.assert_(
517            authzDecisionQuery2.actions[0].namespace == Action.GHPP_NS_URI)
518        self.assert_(authzDecisionQuery2.evidence is None)
519
520    def _createAuthzDecisionQueryResponse(self):
521        """Helper method for Authz Decision Response"""
522        response = Response()
523        now = datetime.utcnow()
524        response.issueInstant = now
525       
526        # Make up a request ID that this response is responding to
527        response.inResponseTo = str(uuid4())
528        response.id = str(uuid4())
529        response.version = SAMLVersion(SAMLVersion.VERSION_20)
530           
531        response.issuer = Issuer()
532        response.issuer.format = Issuer.X509_SUBJECT
533        response.issuer.value = SAMLTestCase.ISSUER_DN
534       
535        response.status = Status()
536        response.status.statusCode = StatusCode()
537        response.status.statusCode.value = StatusCode.SUCCESS_URI
538        response.status.statusMessage = StatusMessage()       
539        response.status.statusMessage.value = "Response created successfully"
540           
541        assertion = Assertion()
542        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
543        assertion.id = str(uuid4())
544        assertion.issueInstant = now
545       
546        authzDecisionStatement = AuthzDecisionStatement()
547        authzDecisionStatement.decision = DecisionType.PERMIT
548        authzDecisionStatement.resource = SAMLTestCase.RESOURCE_URI
549        authzDecisionStatement.actions.append(Action())
550        authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
551        authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
552        assertion.authzDecisionStatements.append(authzDecisionStatement)
553       
554        # Add a conditions statement for a validity of 8 hours
555        assertion.conditions = Conditions()
556        assertion.conditions.notBefore = now
557        assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8)
558               
559        assertion.subject = Subject() 
560        assertion.subject.nameID = NameID()
561        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT
562        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE   
563           
564        assertion.issuer = Issuer()
565        assertion.issuer.format = Issuer.X509_SUBJECT
566        assertion.issuer.value = SAMLTestCase.ISSUER_DN
567
568        response.assertions.append(assertion)
569       
570        return response
571       
572    def test09CreateAuthzDecisionQueryResponse(self):
573        response = self._createAuthzDecisionQueryResponse()
574        self.assert_(response.assertions[0])
575        self.assert_(response.assertions[0].authzDecisionStatements[0])
576        self.assert_(response.assertions[0].authzDecisionStatements[0
577            ].decision == DecisionType.PERMIT)
578        self.assert_(response.assertions[0].authzDecisionStatements[0
579            ].resource == SAMLTestCase.RESOURCE_URI)
580        self.assert_(response.assertions[0].authzDecisionStatements[0
581            ].decision == DecisionType.PERMIT)
582        self.assert_(response.assertions[0].authzDecisionStatements[0
583            ].actions[-1].namespace == Action.GHPP_NS_URI)
584        self.assert_(response.assertions[0].authzDecisionStatements[0
585            ].actions[-1].value == Action.HTTP_GET_ACTION)
586     
587    def _serializeAuthzDecisionQueryResponse(self):
588        response = self._createAuthzDecisionQueryResponse()
589       
590        # Create ElementTree Assertion Element
591        responseElem = ResponseElementTree.toXML(response)
592        self.assert_(iselement(responseElem))
593       
594        # Serialise to output       
595        xmlOutput = prettyPrint(responseElem)
596        return xmlOutput
597   
598    def test10SerializeAuthzDecisionQueryResponse(self):
599        xmlOutput = self._serializeAuthzDecisionQueryResponse()
600        self.assert_(len(xmlOutput))
601        print("\n"+"_"*80)
602        print(xmlOutput)
603        print("_"*80)
604       
605        self.assert_('AuthzDecisionStatement' in xmlOutput)
606        self.assert_('GET' in xmlOutput)
607        self.assert_('Permit' in xmlOutput)
608
609    def test11DeserializeAuthzDecisionResponse(self):
610        xmlOutput = self._serializeAuthzDecisionQueryResponse()
611       
612        authzDecisionResponseStream = StringIO()
613        authzDecisionResponseStream.write(xmlOutput)
614        authzDecisionResponseStream.seek(0)
615
616        tree = ElementTree.parse(authzDecisionResponseStream)
617        elem = tree.getroot()
618        response = ResponseElementTree.fromXML(elem)
619       
620        self.assert_(response.assertions[0])
621        self.assert_(response.assertions[0].authzDecisionStatements[0])
622        self.assert_(response.assertions[0].authzDecisionStatements[0
623            ].decision == DecisionType.PERMIT)
624        self.assert_(response.assertions[0].authzDecisionStatements[0
625            ].resource == SAMLTestCase.RESOURCE_URI)
626        self.assert_(response.assertions[0].authzDecisionStatements[0
627            ].decision == DecisionType.PERMIT)
628        self.assert_(response.assertions[0].authzDecisionStatements[0
629            ].actions[-1].namespace == Action.GHPP_NS_URI)
630        self.assert_(response.assertions[0].authzDecisionStatements[0
631            ].actions[-1].value == Action.HTTP_GET_ACTION)
632       
633 
634    def test12PickleAssertion(self):
635        # Test pickling with __slots__
636        assertion = self._createAttributeAssertionHelper()
637        assertion.issuer = Issuer()
638        assertion.issuer.format = Issuer.X509_SUBJECT
639        assertion.issuer.value = SAMLTestCase.ISSUER_DN
640       
641        jar = pickle.dumps(assertion)
642        assertion2 = pickle.loads(jar)
643        self.assert_(isinstance(assertion2, Assertion))
644        self.assert_(assertion2.issuer.value == assertion.issuer.value)
645        self.assert_(assertion2.issuer.format == assertion.issuer.format)
646        self.assert_(len(assertion2.attributeStatements)==1)
647        self.assert_(len(assertion2.attributeStatements[0].attributes) > 0)
648        self.assert_(assertion2.attributeStatements[0].attributes[0
649                     ].attributeValues[0
650                     ].value == assertion.attributeStatements[0].attributes[0
651                                ].attributeValues[0].value)
652       
653    def test13PickleAttributeQuery(self):
654        # Test pickling with __slots__
655        samlUtil = SAMLUtil()
656        samlUtil.firstName = ''
657        samlUtil.lastName = ''
658        samlUtil.emailAddress = ''
659        query = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN,
660                                             SAMLTestCase.NAMEID_VALUE)
661       
662        jar = pickle.dumps(query)
663        query2 = pickle.loads(jar)
664
665        self.assert_(isinstance(query2, AttributeQuery))
666        self.assert_(query2.subject.nameID.value == query.subject.nameID.value)
667        self.assert_((query2.subject.nameID.format == 
668                      query.subject.nameID.format))
669        self.assert_(query2.issuer.value == query.issuer.value)
670        self.assert_(query2.issuer.format == query.issuer.format)
671        self.assert_(query2.issueInstant == query.issueInstant)
672        self.assert_(query2.id == query.id)
673        self.assert_(len(query2.attributes) == 3)
674        self.assert_(query2.attributes[0].name == "urn:esg:first:name")
675        self.assert_(query2.attributes[1].nameFormat == SAMLUtil.XSSTRING_NS)
676
677    def test14PickleAttributeQueryResponse(self):
678        response = self._createAttributeQueryResponse()
679       
680        jar = pickle.dumps(response)
681        response2 = pickle.loads(jar)
682       
683        self.assert_(isinstance(response2, Response))
684        self.assert_((response2.status.statusCode.value == 
685                      response.status.statusCode.value))
686        self.assert_((response2.status.statusMessage.value == 
687                      response.status.statusMessage.value))
688        self.assert_(len(response2.assertions) == 1)
689        self.assert_(response2.assertions[0].id == response.assertions[0].id)
690        self.assert_((response2.assertions[0].conditions.notBefore == 
691                      response.assertions[0].conditions.notBefore))
692        self.assert_((response2.assertions[0].conditions.notOnOrAfter == 
693                      response.assertions[0].conditions.notOnOrAfter))
694        self.assert_(len(response2.assertions[0].attributeStatements) == 1)
695        self.assert_(len(response2.assertions[0].attributeStatements[0
696                                                            ].attributes) == 9)
697        self.assert_(response2.assertions[0].attributeStatements[0].attributes[1
698                     ].attributeValues[0
699                     ].value == response.assertions[0].attributeStatements[0
700                                    ].attributes[1].attributeValues[0].value)
701             
702    def test15PickleAuthzDecisionQuery(self):
703        samlUtil = SAMLUtil()
704        query = samlUtil.buildAuthzDecisionQuery()
705             
706        jar = pickle.dumps(query)
707        query2 = pickle.loads(jar)
708       
709        self.assert_(isinstance(query2, AuthzDecisionQuery))
710        self.assert_(query.resource == query2.resource)
711        self.assert_(query.version == query2.version)
712        self.assert_(len(query2.actions) == 1)
713        self.assert_(query2.actions[0].value == Action.HTTP_GET_ACTION)
714        self.assert_(query2.actions[0].namespace == Action.GHPP_NS_URI)
715
716
717       
718    def test16PickleAuthzDecisionResponse(self):
719        response = self._createAuthzDecisionQueryResponse()
720       
721        jar = pickle.dumps(response)
722        response2 = pickle.loads(jar)
723       
724        self.assert_(isinstance(response2, Response))
725       
726        self.assert_(len(response.assertions) == 1)
727        self.assert_(len(response.assertions[0].authzDecisionStatements) == 1)
728         
729        self.assert_(response.assertions[0].authzDecisionStatements[0
730                        ].resource == response2.assertions[0
731                                        ].authzDecisionStatements[0].resource)
732       
733        self.assert_(len(response.assertions[0].authzDecisionStatements[0
734                        ].actions) == 1)
735        self.assert_(response.assertions[0].authzDecisionStatements[0
736                        ].actions[0].value == response2.assertions[0
737                                        ].authzDecisionStatements[0
738                                                ].actions[0].value)
739       
740        self.assert_(response2.assertions[0].authzDecisionStatements[0
741                        ].actions[0].namespace == Action.GHPP_NS_URI)       
742
743        self.assert_(response2.assertions[0].authzDecisionStatements[0
744                        ].decision == DecisionType.PERMIT)       
745       
746if __name__ == "__main__":
747    unittest.main()       
Note: See TracBrowser for help on using the repository browser.