source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapauthzdecisioninterface.py @ 6574

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapauthzdecisioninterface.py@6574
Revision 6574, 8.5 KB checked in by pjkersha, 10 years ago (diff)

Changes for addition of AuthzDecisionQuery? WSGI interface (Authorisation service)

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI SAML 2.0 SOAP Attribute Query Interface
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/08/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12from uuid import uuid4
13from datetime import datetime
14from cStringIO import StringIO
15
16from saml.saml2.core import (Attribute, SAMLVersion, Subject, NameID, Issuer, 
17                             AttributeQuery, XSStringAttributeValue, 
18                             StatusCode)
19from saml.xml import XMLConstants
20from saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
21
22from ndg.security.common.soap.etree import SOAPEnvelope
23from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
24
25
26class SOAPAttributeInterfaceMiddlewareTestCase(unittest.TestCase):
27    CONFIG_FILENAME = 'authz-decision-interface.ini
28
29    def _createAttributeQuery(self, 
30                        issuer="/O=Site A/CN=Authorisation Service",
31                        subject="https://openid.localhost/philip.kershaw"):
32        attributeQuery = AttributeQuery()
33        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
34        attributeQuery.id = str(uuid4())
35        attributeQuery.issueInstant = datetime.utcnow()
36       
37        attributeQuery.issuer = Issuer()
38        attributeQuery.issuer.format = Issuer.X509_SUBJECT
39        attributeQuery.issuer.value = issuer
40                       
41        attributeQuery.subject = Subject() 
42        attributeQuery.subject.nameID = NameID()
43        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
44        attributeQuery.subject.nameID.value = subject
45                                   
46       
47        # special case handling for 'FirstName' attribute
48        fnAttribute = Attribute()
49        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
50        fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
51        fnAttribute.friendlyName = "FirstName"
52
53        attributeQuery.attributes.append(fnAttribute)
54   
55        # special case handling for 'LastName' attribute
56        lnAttribute = Attribute()
57        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
58        lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
59        lnAttribute.friendlyName = "LastName"
60
61        attributeQuery.attributes.append(lnAttribute)
62   
63        # special case handling for 'LastName' attribute
64        emailAddressAttribute = Attribute()
65        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
66        emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\
67                                    XSStringAttributeValue.TYPE_LOCAL_NAME
68        emailAddressAttribute.friendlyName = "emailAddress"
69
70        attributeQuery.attributes.append(emailAddressAttribute) 
71
72        return attributeQuery
73   
74    def _makeRequest(self, attributeQuery=None, **kw):
75        """Convenience method to construct queries for tests"""
76       
77        if attributeQuery is None:
78            attributeQuery = self._createAttributeQuery(**kw)
79           
80        elem = AttributeQueryElementTree.toXML(attributeQuery)
81        soapRequest = SOAPEnvelope()
82        soapRequest.create()
83        soapRequest.body.elem.append(elem)
84       
85        request = soapRequest.serialize()
86       
87        return request
88   
89    def _getSAMLResponse(self, responseBody):
90        """Deserialise response string into ElementTree element"""
91        soapResponse = SOAPEnvelope()
92       
93        responseStream = StringIO()
94        responseStream.write(responseBody)
95        responseStream.seek(0)
96       
97        soapResponse.parse(responseStream)
98       
99        print("Parsed response ...")
100        print(soapResponse.serialize())
101#        print(prettyPrint(soapResponse.elem))
102       
103        response = ResponseElementTree.fromXML(soapResponse.body.elem[0])
104       
105        return response
106   
107    def test01ValidQuery(self):
108        attributeQuery = self._createAttributeQuery()
109        request = self._makeRequest(attributeQuery=attributeQuery)
110       
111        header = {
112            'soapAction': "http://www.oasis-open.org/committees/security",
113            'Content-length': str(len(request)),
114            'Content-type': 'text/xml'
115        }
116        response = self.app.post('/attributeauthority/saml', 
117                                 params=request, 
118                                 headers=header, 
119                                 status=200)
120        print("Response status=%d" % response.status)
121        samlResponse = self._getSAMLResponse(response.body)
122
123        self.assert_(samlResponse.status.statusCode.value == \
124                     StatusCode.SUCCESS_URI)
125        self.assert_(samlResponse.inResponseTo == attributeQuery.id)
126        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
127                     attributeQuery.subject.nameID.value)
128
129    def test02AttributeReleaseDenied(self):
130        request = self._makeRequest(issuer="/O=Site B/CN=Authorisation Service")
131       
132        header = {
133            'soapAction': "http://www.oasis-open.org/committees/security",
134            'Content-length': str(len(request)),
135            'Content-type': 'text/xml'
136        }
137       
138        response = self.app.post('/attributeauthority/saml', 
139                                 params=request, 
140                                 headers=header, 
141                                 status=200)
142       
143        print("Response status=%d" % response.status)
144       
145        samlResponse = self._getSAMLResponse(response.body)
146
147        self.assert_(samlResponse.status.statusCode.value == \
148                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
149
150    def test03InvalidAttributesRequested(self):
151        attributeQuery = self._createAttributeQuery()
152       
153        # Add an unsupported Attribute name
154        attribute = Attribute()
155        attribute.name = "urn:my:attribute"
156        attribute.nameFormat = XMLConstants.XSD_NS+"#"+\
157                                    XSStringAttributeValue.TYPE_LOCAL_NAME
158        attribute.friendlyName = "myAttribute"
159        attributeQuery.attributes.append(attribute)     
160       
161        request = self._makeRequest(attributeQuery=attributeQuery)
162           
163        header = {
164            'soapAction': "http://www.oasis-open.org/committees/security",
165            'Content-length': str(len(request)),
166            'Content-type': 'text/xml'
167        }
168       
169        response = self.app.post('/attributeauthority/saml', 
170                                 params=request, 
171                                 headers=header, 
172                                 status=200)
173       
174        print("Response status=%d" % response.status)
175       
176        samlResponse = self._getSAMLResponse(response.body)
177
178        self.assert_(samlResponse.status.statusCode.value == \
179                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
180       
181    def test04InvalidIssuer(self):
182        request = self._makeRequest(issuer="My Attribute Query Issuer")
183       
184        header = {
185            'soapAction': "http://www.oasis-open.org/committees/security",
186            'Content-length': str(len(request)),
187            'Content-type': 'text/xml'
188        }
189       
190        response = self.app.post('/attributeauthority/saml', 
191                                 params=request, 
192                                 headers=header, 
193                                 status=200)
194       
195        print("Response status=%d" % response.status)
196       
197        samlResponse = self._getSAMLResponse(response.body)
198
199        self.assert_(samlResponse.status.statusCode.value == \
200                     StatusCode.REQUEST_DENIED_URI)
201
202    def test05UnknownPrincipal(self):
203        request = self._makeRequest(subject="Joe.Bloggs")
204       
205        header = {
206            'soapAction': "http://www.oasis-open.org/committees/security",
207            'Content-length': str(len(request)),
208            'Content-type': 'text/xml'
209        }
210       
211        response = self.app.post('/attributeauthority/saml', 
212                                 params=request, 
213                                 headers=header, 
214                                 status=200)
215       
216        print("Response status=%d" % response.status)
217       
218        samlResponse = self._getSAMLResponse(response.body)
219
220        self.assert_(samlResponse.status.statusCode.value == \
221                     StatusCode.UNKNOWN_PRINCIPAL_URI)
222
223 
224if __name__ == "__main__":
225    unittest.main()
Note: See TracBrowser for help on using the repository browser.