Ignore:
Timestamp:
10/02/10 13:40:54 (11 years ago)
Author:
pjkersha
Message:

Working ElementTree serialisation/deserialisation for AuthzDecisionQuery?

File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/ndg_security_saml/saml/test/test_saml.py

    r6546 r6547  
    2121from xml.etree import ElementTree 
    2222 
    23 from saml.saml2.core import Action 
    2423from saml.saml2.core import (SAMLVersion, Attribute, AttributeStatement,  
    2524                             Assertion, AttributeQuery, Response, Issuer,  
    2625                             Subject, NameID, StatusCode,  
    2726                             StatusMessage, Status, Conditions,  
    28                              XSStringAttributeValue, 
     27                             XSStringAttributeValue, Action, 
    2928                             AuthzDecisionQuery) 
    3029 
    3130from saml.common.xml import SAMLConstants 
    3231from saml.xml.etree import (prettyPrint, AssertionElementTree,  
    33                             AttributeQueryElementTree, ResponseElementTree) 
     32                            AttributeQueryElementTree, ResponseElementTree, 
     33                            AuthzDecisionQueryElementTree) 
    3434 
    3535 
     
    3838    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service 
    3939    """ 
     40    NAMEID_FORMAT = "urn:esg:openid" 
     41    NAMEID_VALUE = "https://openid.localhost/philip.kershaw" 
     42    ISSUER_DN = "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk" 
     43    UNCORRECTED_RESOURCE_URI = "http://LOCALHOST:80/My Secured URI" 
     44    RESOURCE_URI = "http://localhost/My%20Secured%20URI" 
    4045     
    4146    def __init__(self): 
     
    97102        attributeQuery.subject = Subject()   
    98103        attributeQuery.subject.nameID = NameID() 
    99         attributeQuery.subject.nameID.format = "urn:esg:openid" 
     104        attributeQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT 
    100105        attributeQuery.subject.nameID.value = subjectNameID 
    101106                                     
     
    162167             
    163168        return attributes 
    164  
     169     
     170    def buildAuthzDecisionQuery(self,  
     171                                issuer=ISSUER_DN, 
     172                                issuerFormat=Issuer.X509_SUBJECT, 
     173                                subjectNameID=NAMEID_VALUE,  
     174                                subjectNameIdFormat=NAMEID_FORMAT, 
     175                                resource=UNCORRECTED_RESOURCE_URI, 
     176                                actions=((Action.HTTP_GET_ACTION,  
     177                                          Action.GHPP_NS_URI),)): 
     178        """Convenience utility to make an Authorisation decision query""" 
     179        authzDecisionQuery = AuthzDecisionQuery() 
     180 
     181        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 
     182        authzDecisionQuery.id = str(uuid4()) 
     183        authzDecisionQuery.issueInstant = datetime.utcnow() 
     184         
     185        authzDecisionQuery.issuer = Issuer() 
     186        authzDecisionQuery.issuer.format = issuerFormat 
     187        authzDecisionQuery.issuer.value = issuer 
     188         
     189        authzDecisionQuery.subject = Subject() 
     190        authzDecisionQuery.subject.nameID = NameID() 
     191        authzDecisionQuery.subject.nameID.format = subjectNameIdFormat 
     192        authzDecisionQuery.subject.nameID.value = subjectNameID 
     193         
     194        authzDecisionQuery.resource = resource 
     195         
     196        for action, namespace in actions: 
     197            authzDecisionQuery.actions.append(Action()) 
     198            authzDecisionQuery.actions[-1].value = action 
     199            authzDecisionQuery.actions[-1].namespace = namespace 
     200             
     201        return authzDecisionQuery 
     202             
    165203 
    166204class SAMLTestCase(unittest.TestCase): 
    167205    """Test SAML implementation for use with CMIP5 federation""" 
     206    NAMEID_FORMAT = SAMLUtil.NAMEID_FORMAT 
     207    NAMEID_VALUE = SAMLUtil.NAMEID_VALUE 
     208    ISSUER_DN = SAMLUtil.ISSUER_DN 
     209    UNCORRECTED_RESOURCE_URI = SAMLUtil.UNCORRECTED_RESOURCE_URI 
     210    RESOURCE_URI = SAMLUtil.RESOURCE_URI 
    168211     
    169212    def _createAssertionHelper(self): 
     
    240283        samlUtil.lastName = '' 
    241284        samlUtil.emailAddress = '' 
    242         attributeQuery = samlUtil.buildAttributeQuery( 
    243                         "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk", 
    244                         "https://openid.localhost/philip.kershaw") 
     285        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN, 
     286                                                      SAMLTestCase.NAMEID_VALUE) 
    245287         
    246288        elem = AttributeQueryElementTree.toXML(attributeQuery)         
     
    256298        samlUtil.lastName = '' 
    257299        samlUtil.emailAddress = '' 
    258         attributeQuery = samlUtil.buildAttributeQuery( 
    259                         "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk", 
    260                         "https://openid.localhost/philip.kershaw") 
     300        attributeQuery = samlUtil.buildAttributeQuery(SAMLTestCase.ISSUER_DN, 
     301                                                      SAMLTestCase.NAMEID_VALUE) 
    261302         
    262303        elem = AttributeQueryElementTree.toXML(attributeQuery)         
     
    298339        response.issuer.format = Issuer.X509_SUBJECT 
    299340        response.issuer.value = \ 
    300                         "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk" 
     341                        SAMLTestCase.ISSUER_DN 
    301342         
    302343        response.status = Status() 
     
    316357        assertion.subject = Subject()   
    317358        assertion.subject.nameID = NameID() 
    318         assertion.subject.nameID.format = "urn:esg:openid" 
    319         assertion.subject.nameID.value = \ 
    320                         "https://openid.localhost/philip.kershaw"     
     359        assertion.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT 
     360        assertion.subject.nameID.value = SAMLTestCase.NAMEID_VALUE     
    321361             
    322362        assertion.issuer = Issuer() 
    323363        assertion.issuer.format = Issuer.X509_SUBJECT 
    324         assertion.issuer.value = \ 
    325                         "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk" 
     364        assertion.issuer.value = SAMLTestCase.ISSUER_DN 
    326365 
    327366        response.assertions.append(assertion) 
     
    341380    def test06CreateAuthzDecisionQuery(self): 
    342381        authzDecisionQuery = AuthzDecisionQuery() 
     382 
     383        authzDecisionQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 
     384        authzDecisionQuery.id = str(uuid4()) 
     385        authzDecisionQuery.issueInstant = datetime.utcnow() 
     386         
     387        authzDecisionQuery.issuer = Issuer() 
     388        authzDecisionQuery.issuer.format = Issuer.X509_SUBJECT 
     389        authzDecisionQuery.issuer.value = SAMLTestCase.ISSUER_DN 
     390         
     391        authzDecisionQuery.subject = Subject() 
     392        authzDecisionQuery.subject.nameID = NameID() 
     393        authzDecisionQuery.subject.nameID.format = SAMLTestCase.NAMEID_FORMAT 
     394        authzDecisionQuery.subject.nameID.value = SAMLTestCase.NAMEID_VALUE 
     395         
    343396        authzDecisionQuery.resource = "http://LOCALHOST:80/My Secured URI" 
    344397        self.assert_(":80" not in authzDecisionQuery.resource) 
     
    352405        self.assert_("somewhere.ac.uk" in authzDecisionQuery.resource) 
    353406        self.assert_("yes=True" in authzDecisionQuery.resource) 
     407         
     408        authzDecisionQuery.actions.append(Action()) 
     409        authzDecisionQuery.actions[0].value = Action.HTTP_GET_ACTION 
     410        authzDecisionQuery.actions[0].namespace = Action.GHPP_NS_URI 
     411         
     412        self.assert_( 
     413            authzDecisionQuery.actions[0].value == Action.HTTP_GET_ACTION) 
     414        self.assert_( 
     415            authzDecisionQuery.actions[0].namespace == Action.GHPP_NS_URI) 
     416         
     417    def test07SerializeAuthzDecisionQuery(self): 
     418        samlUtil = SAMLUtil() 
     419        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery() 
     420         
     421        # Create ElementTree Assertion Element 
     422        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML( 
     423                                                            authzDecisionQuery) 
     424         
     425        self.assert_(iselement(authzDecisionQueryElem)) 
     426         
     427        # Serialise to output  
     428        xmlOutput = prettyPrint(authzDecisionQueryElem)        
     429        self.assert_(len(xmlOutput)) 
     430         
     431        print("\n"+"_"*80) 
     432        print(xmlOutput) 
     433        print("_"*80) 
    354434    
     435    def test08DeserializeAuthzDecisionQuery(self): 
     436        samlUtil = SAMLUtil() 
     437        authzDecisionQuery = samlUtil.buildAuthzDecisionQuery() 
     438         
     439        # Create ElementTree Assertion Element 
     440        authzDecisionQueryElem = AuthzDecisionQueryElementTree.toXML( 
     441                                                            authzDecisionQuery) 
     442         
     443        self.assert_(iselement(authzDecisionQueryElem)) 
     444         
     445        # Serialise to output  
     446        xmlOutput = prettyPrint(authzDecisionQueryElem)        
     447        self.assert_(len(xmlOutput)) 
     448         
     449        authzDecisionQueryStream = StringIO() 
     450        authzDecisionQueryStream.write(xmlOutput) 
     451        authzDecisionQueryStream.seek(0) 
     452 
     453        tree = ElementTree.parse(authzDecisionQueryStream) 
     454        elem2 = tree.getroot() 
     455         
     456        authzDecisionQuery2 = AuthzDecisionQueryElementTree.fromXML(elem2) 
     457        self.assert_(authzDecisionQuery2) 
     458        self.assert_( 
     459        authzDecisionQuery2.subject.nameID.value == SAMLTestCase.NAMEID_VALUE) 
     460        self.assert_( 
     461        authzDecisionQuery2.subject.nameID.format == SAMLTestCase.NAMEID_FORMAT) 
     462        self.assert_( 
     463            authzDecisionQuery2.issuer.value == SAMLTestCase.ISSUER_DN) 
     464        self.assert_( 
     465            authzDecisionQuery2.resource == SAMLTestCase.RESOURCE_URI) 
     466        self.assert_(len(authzDecisionQuery2.actions) == 1) 
     467        self.assert_( 
     468            authzDecisionQuery2.actions[0].value == Action.HTTP_GET_ACTION) 
     469        self.assert_( 
     470            authzDecisionQuery2.actions[0].namespace == Action.GHPP_NS_URI) 
     471        self.assert_(authzDecisionQuery2.evidence is None) 
     472 
     473        
    355474if __name__ == "__main__": 
    356475    unittest.main()         
Note: See TracChangeset for help on using the changeset viewer.