source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/saml/test_samlinterface.py @ 6615

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/saml/test_samlinterface.py@6615
Revision 6615, 17.1 KB checked in by pjkersha, 10 years ago (diff)

AuthzService? unit test wiht ndg.security.server.wsgi.authzservice.AuthzServiceMiddleware? near complete. Fixes required to PIP callout to Attribute Authority.

Line 
1"""Attribute Authority SAML Interface unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "21/07/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id: $'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13import unittest
14
15from datetime import datetime, timedelta
16import os
17from uuid import uuid4
18import paste.fixture
19from cStringIO import StringIO
20from xml.etree import ElementTree
21
22from ndg.saml.saml2.core import (Response, Assertion, Attribute, 
23                             AttributeStatement, SAMLVersion, Subject, NameID,
24                             Issuer, AttributeQuery, XSStringAttributeValue, 
25                             Conditions, Status, StatusCode)
26from ndg.saml.xml import XMLConstants
27from ndg.saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
28
29from ndg.security.common.soap.client import (UrlLib2SOAPClient, 
30                                             UrlLib2SOAPRequest)
31from ndg.security.common.soap.etree import SOAPEnvelope
32from ndg.security.common.utils.etree import QName, prettyPrint
33from ndg.security.common.saml_utils.esg import (EsgSamlNamespaces, 
34                                          XSGroupRoleAttributeValue)
35from ndg.security.common.saml_utils.esg.xml.etree import (
36                                        XSGroupRoleAttributeValueElementTree)
37from ndg.security.test.unit import BaseTestCase
38
39
40class SamlSoapBindingApp(object):
41    def __init__(self):
42        self.firstName = "Philip"
43        self.lastName = "Kershaw"
44        self.emailAddress = "pkershaw@somewhere.ac.uk"
45                 
46    def __call__(self, environ, start_response):
47        soapRequestStream = environ['wsgi.input']
48        soapRequest = SOAPEnvelope()
49        soapRequest.parse(soapRequestStream)
50        attributeQueryElem = soapRequest.body.elem[0]
51        attributeQuery = AttributeQueryElementTree.fromXML(attributeQueryElem)
52       
53        print("Received request from client:\n")
54        print soapRequest.prettyPrint()
55       
56        samlResponse = Response()
57       
58        samlResponse.issueInstant = datetime.utcnow()
59        samlResponse.id = str(uuid4())
60        samlResponse.issuer = Issuer()
61       
62        # SAML 2.0 spec says format must be omitted
63        #samlResponse.issuer.format = Issuer.X509_SUBJECT
64        samlResponse.issuer.value = \
65                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
66       
67        samlResponse.inResponseTo = attributeQuery.id
68       
69        assertion = Assertion()
70       
71        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
72        assertion.id = str(uuid4())
73        assertion.issueInstant = samlResponse.issueInstant
74       
75        assertion.conditions = Conditions()
76        assertion.conditions.notBefore = assertion.issueInstant
77        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
78            timedelta(seconds=60*60*8)
79       
80        assertion.subject = Subject() 
81        assertion.subject.nameID = NameID()
82        assertion.subject.nameID.format = attributeQuery.subject.nameID.format
83        assertion.subject.nameID.value = attributeQuery.subject.nameID.value
84
85        assertion.attributeStatements.append(AttributeStatement())
86       
87        for attribute in attributeQuery.attributes:
88            if attribute.name == EsgSamlNamespaces.FIRSTNAME_ATTRNAME:
89                # special case handling for 'FirstName' attribute
90                fnAttribute = Attribute()
91                fnAttribute.name = attribute.name
92                fnAttribute.nameFormat = attribute.nameFormat
93                fnAttribute.friendlyName = attribute.friendlyName
94   
95                firstName = XSStringAttributeValue()
96                firstName.value = self.firstName
97                fnAttribute.attributeValues.append(firstName)
98   
99                assertion.attributeStatements[0].attributes.append(fnAttribute)
100           
101            elif attribute.name == EsgSamlNamespaces.LASTNAME_ATTRNAME:
102                lnAttribute = Attribute()
103                lnAttribute.name = attribute.name
104                lnAttribute.nameFormat = attribute.nameFormat
105                lnAttribute.friendlyName = attribute.friendlyName
106   
107                lastName = XSStringAttributeValue()
108                lastName.value = self.lastName
109                lnAttribute.attributeValues.append(lastName)
110   
111                assertion.attributeStatements[0].attributes.append(lnAttribute)
112               
113            elif attribute.name == EsgSamlNamespaces.EMAILADDRESS_ATTRNAME:
114                emailAddressAttribute = Attribute()
115                emailAddressAttribute.name = attribute.name
116                emailAddressAttribute.nameFormat = attribute.nameFormat
117                emailAddressAttribute.friendlyName = attribute.friendlyName
118   
119                emailAddress = XSStringAttributeValue()
120                emailAddress.value = self.emailAddress
121                emailAddressAttribute.attributeValues.append(emailAddress)
122   
123                assertion.attributeStatements[0].attributes.append(
124                                                        emailAddressAttribute)
125       
126        samlResponse.assertions.append(assertion)
127       
128        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
129        # Attribute Value factory to render the XML output
130        toXMLTypeMap = {
131            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree
132        }
133
134       
135        samlResponse.status = Status()
136        samlResponse.status.statusCode = StatusCode()
137        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI       
138
139       
140        # Convert to ElementTree representation to enable attachment to SOAP
141        # response body
142        samlResponseElem = ResponseElementTree.toXML(samlResponse,
143                                            customToXMLTypeMap=toXMLTypeMap)
144        xml = ElementTree.tostring(samlResponseElem)
145       
146        # Create SOAP response and attach the SAML Response payload
147        soapResponse = SOAPEnvelope()
148        soapResponse.create()
149        soapResponse.body.elem.append(samlResponseElem)
150       
151        response = soapResponse.serialize()
152       
153        start_response("200 OK",
154                       [('Content-length', str(len(response))),
155                        ('Content-type', 'text/xml')])
156        return [response]
157
158       
159class SamlAttributeAuthorityInterfaceTestCase(BaseTestCase):
160    """TODO: test SAML Attribute Authority interface"""
161    thisDir = os.path.dirname(os.path.abspath(__file__))
162
163    def __init__(self, *args, **kwargs):
164        wsgiApp = SamlSoapBindingApp()
165        self.app = paste.fixture.TestApp(wsgiApp)
166         
167        BaseTestCase.__init__(self, *args, **kwargs)
168       
169
170    def test01AttributeQuery(self):
171        attributeQuery = AttributeQuery()
172        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
173        attributeQuery.id = str(uuid4())
174        attributeQuery.issueInstant = datetime.utcnow()
175       
176        attributeQuery.issuer = Issuer()
177        attributeQuery.issuer.format = Issuer.X509_SUBJECT
178        attributeQuery.issuer.value = \
179                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
180                       
181                       
182        attributeQuery.subject = Subject() 
183        attributeQuery.subject.nameID = NameID()
184        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
185        attributeQuery.subject.nameID.value = \
186                                    "https://openid.localhost/philip.kershaw"
187       
188        # special case handling for 'FirstName' attribute
189        fnAttribute = Attribute()
190        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
191        fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
192        fnAttribute.friendlyName = "FirstName"
193
194        attributeQuery.attributes.append(fnAttribute)
195   
196        # special case handling for 'LastName' attribute
197        lnAttribute = Attribute()
198        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
199        lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
200        lnAttribute.friendlyName = "LastName"
201
202        attributeQuery.attributes.append(lnAttribute)
203   
204        # special case handling for 'LastName' attribute
205        emailAddressAttribute = Attribute()
206        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
207        emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\
208                                    XSStringAttributeValue.TYPE_LOCAL_NAME
209        emailAddressAttribute.friendlyName = "emailAddress"
210
211        attributeQuery.attributes.append(emailAddressAttribute)                                   
212       
213        elem = AttributeQueryElementTree.toXML(attributeQuery)
214        soapRequest = SOAPEnvelope()
215        soapRequest.create()
216        soapRequest.body.elem.append(elem)
217       
218        request = soapRequest.serialize()
219       
220        header = {
221            'soapAction': "http://www.oasis-open.org/committees/security",
222            'Content-length': str(len(request)),
223            'Content-type': 'text/xml'
224        }
225        response = self.app.post('/attributeauthority', 
226                                 params=request, 
227                                 headers=header, 
228                                 status=200)
229        print("Response status=%d" % response.status)
230
231        soapResponse = SOAPEnvelope()
232       
233        responseStream = StringIO()
234        responseStream.write(response.body)
235        responseStream.seek(0)
236       
237        soapResponse.parse(responseStream)
238       
239        print("Parsed response ...")
240        print(soapResponse.serialize())
241#        print(prettyPrint(soapResponse.elem))
242       
243        response = ResponseElementTree.fromXML(soapResponse.body.elem[0])
244        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
245        self.assert_(response.inResponseTo == attributeQuery.id)
246        self.assert_(response.assertions[0].subject.nameID.value == \
247                     attributeQuery.subject.nameID.value)
248     
249    def test02AttributeQueryWithSOAPClient(self):
250           
251        # Thread a separate attribute authority instance
252        self.startSiteAAttributeAuthority()
253         
254        client = UrlLib2SOAPClient()
255       
256        # ElementTree based envelope class
257        client.responseEnvelopeClass = SOAPEnvelope
258       
259        request = UrlLib2SOAPRequest()
260        request.url = 'http://localhost:5000/AttributeAuthority/saml'
261        request.envelope = SOAPEnvelope()
262        request.envelope.create()
263       
264        # Make an attribute query
265        attributeQuery = AttributeQuery()
266        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
267        attributeQuery.id = str(uuid4())
268        attributeQuery.issueInstant = datetime.utcnow()
269       
270        attributeQuery.issuer = Issuer()
271        attributeQuery.issuer.format = Issuer.X509_SUBJECT
272        attributeQuery.issuer.value = \
273                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
274
275        attributeQuery.subject = Subject() 
276        attributeQuery.subject.nameID = NameID()
277        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
278        attributeQuery.subject.nameID.value = \
279                            "https://esg.prototype.ucar.edu/myopenid/testUser"
280       
281        # special case handling for 'FirstName' attribute
282        fnAttribute = Attribute()
283        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
284        fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
285        fnAttribute.friendlyName = "FirstName"
286
287        attributeQuery.attributes.append(fnAttribute)
288   
289        # special case handling for 'LastName' attribute
290        lnAttribute = Attribute()
291        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
292        lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
293        lnAttribute.friendlyName = "LastName"
294
295        attributeQuery.attributes.append(lnAttribute)
296   
297        # special case handling for 'LastName' attribute
298        emailAddressAttribute = Attribute()
299        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
300        emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\
301                                    XSStringAttributeValue.TYPE_LOCAL_NAME
302        emailAddressAttribute.friendlyName = "emailAddress"
303
304        attributeQuery.attributes.append(emailAddressAttribute)                                   
305       
306        attributeQueryElem = AttributeQueryElementTree.toXML(attributeQuery)
307
308        # Attach query to SOAP body
309        request.envelope.body.elem.append(attributeQueryElem)
310       
311        from M2Crypto.m2urllib2 import HTTPSHandler
312        from urllib2 import URLError
313
314        client.openerDirector.add_handler(HTTPSHandler())
315        try:
316            response = client.send(request)
317        except URLError, e:
318            self.fail("Error calling Attribute Service")
319       
320        print("Response from server:\n\n%s" % response.envelope.serialize())
321       
322        if len(response.envelope.body.elem) != 1:
323            self.fail("Expecting single child element is SOAP body")
324           
325        if QName.getLocalPart(response.envelope.body.elem[0].tag)!='Response':
326            self.fail('Expecting "Response" element in SOAP body')
327           
328        toSAMLTypeMap = [XSGroupRoleAttributeValueElementTree.factoryMatchFunc]
329        response = ResponseElementTree.fromXML(response.envelope.body.elem[0],
330                                            customToSAMLTypeMap=toSAMLTypeMap)
331        self.assert_(response)
332       
333    def test03ParseResponse(self):
334        response = \
335'''<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/">
336   <SOAP-ENV:Body>
337      <samlp:Response ID="05680cb2-4973-443d-9d31-7bc99bea87c1" InResponseTo="e3183380-ae82-4285-8827-8c40613842de" IssueInstant="2009-08-17T12:28:37.325Z" Version="2.0" xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol">
338         <saml:Issuer Format="urn:esg:issuer" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">ESG-NCAR</saml:Issuer>
339         <samlp:Status>
340            <samlp:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success" />
341         </samlp:Status>
342         <saml:Assertion ID="192c67d9-f9cd-457a-9242-999e7b943166" IssueInstant="2009-08-17T12:28:37.347Z" Version="2.0" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">
343            <saml:Issuer Format="urn:esg:issuer">ESG-NCAR</saml:Issuer>
344            <saml:Subject>
345               <saml:NameID Format="urn:esg:openid">https://esg.prototype.ucar.edu/myopenid/testUser</saml:NameID>
346            </saml:Subject>
347            <saml:Conditions NotBefore="2009-08-17T12:28:37.347Z" NotOnOrAfter="2009-08-18T12:28:37.347Z" />
348            <saml:AttributeStatement>
349               <saml:Attribute FriendlyName="FirstName" Name="urn:esg:first:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
350                  <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">Test</saml:AttributeValue>
351               </saml:Attribute>
352               <saml:Attribute FriendlyName="LastName" Name="urn:esg:last:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
353                  <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">User</saml:AttributeValue>
354               </saml:Attribute>
355               <saml:Attribute FriendlyName="EmailAddress" Name="urn:esg:first:email:address" NameFormat="http://www.w3.org/2001/XMLSchema#string">
356                  <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">ejn@ucar.edu</saml:AttributeValue>
357               </saml:Attribute>
358               <saml:Attribute FriendlyName="GroupRole" Name="urn:esg:group:role" NameFormat="groupRole">
359                  <saml:AttributeValue>
360                     <esg:groupRole group="CCSM" role="default" xmlns:esg="http://www.esg.org" />
361                  </saml:AttributeValue>
362                  <saml:AttributeValue>
363                     <esg:groupRole group="Dynamical Core" role="default" xmlns:esg="http://www.esg.org" />
364                  </saml:AttributeValue>
365                  <saml:AttributeValue>
366                     <esg:groupRole group="NARCCAP" role="default" xmlns:esg="http://www.esg.org" />
367                  </saml:AttributeValue>
368               </saml:Attribute>
369            </saml:AttributeStatement>
370         </saml:Assertion>
371      </samlp:Response>
372   </SOAP-ENV:Body>
373</SOAP-ENV:Envelope>'''
374       
375        soapResponse = SOAPEnvelope()
376       
377        responseStream = StringIO()
378        responseStream.write(response)
379        responseStream.seek(0)
380       
381        soapResponse.parse(responseStream)
382       
383        print("Parsed response ...")
384        print(soapResponse.serialize())
385       
386        toSAMLTypeMap = [XSGroupRoleAttributeValueElementTree.factoryMatchFunc]
387        response = ResponseElementTree.fromXML(soapResponse.body.elem[0],
388                                            customToSAMLTypeMap=toSAMLTypeMap)
389        self.assert_(response)
390
391           
392if __name__ == "__main__":
393    unittest.main()       
394
Note: See TracBrowser for help on using the repository browser.