source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapauthzdecisioninterface.py @ 6575

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapauthzdecisioninterface.py@6575
Revision 6575, 8.3 KB checked in by pjkersha, 11 years ago (diff)

Changes for addition of AuthzDecisionQuery? WSGI interface (Authorisation service)

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI SAML 2.0 SOAP Attribute Query Interface
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/08/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12from uuid import uuid4
13from datetime import datetime
14from cStringIO import StringIO
15
16from saml.saml2.core import (Attribute, SAMLVersion, Subject, NameID, Issuer, 
17                             AttributeQuery, XSStringAttributeValue, 
18                             StatusCode)
19from saml.xml import XMLConstants
20from saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
21
22from ndg.security.common.soap.etree import SOAPEnvelope
23from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
24from ndg.security.test.unit.wsgi.saml import SoapSamlInterfaceMiddlewareTestCase
25
26
27class SOAPAuthzDecisionInterfaceMiddlewareTestCase(unittest.TestCase):
28    CONFIG_FILENAME = 'authz-decision-interface.ini'
29
30    def _createAuthzDecisionQuery(self, 
31                        issuer="/O=Site A/CN=Authorisation Service",
32                        subject="https://openid.localhost/philip.kershaw"):
33        query = AttributeQuery()
34        query.version = SAMLVersion(SAMLVersion.VERSION_20)
35        query.id = str(uuid4())
36        query.issueInstant = datetime.utcnow()
37       
38        query.issuer = Issuer()
39        query.issuer.format = Issuer.X509_SUBJECT
40        query.issuer.value = issuer
41                       
42        query.subject = Subject() 
43        query.subject.nameID = NameID()
44        query.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
45        query.subject.nameID.value = subject
46                                   
47       
48        # special case handling for 'FirstName' attribute
49        fnAttribute = Attribute()
50        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
51        fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
52        fnAttribute.friendlyName = "FirstName"
53
54        query.attributes.append(fnAttribute)
55   
56        # special case handling for 'LastName' attribute
57        lnAttribute = Attribute()
58        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
59        lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
60        lnAttribute.friendlyName = "LastName"
61
62        query.attributes.append(lnAttribute)
63   
64        # special case handling for 'LastName' attribute
65        emailAddressAttribute = Attribute()
66        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
67        emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\
68                                    XSStringAttributeValue.TYPE_LOCAL_NAME
69        emailAddressAttribute.friendlyName = "emailAddress"
70
71        query.attributes.append(emailAddressAttribute) 
72
73        return query
74   
75    def _makeRequest(self, query=None, **kw):
76        """Convenience method to construct queries for tests"""
77       
78        if query is None:
79            query = self._createAuthzDecisionQuery(**kw)
80           
81        elem = AuthzDecusionQueryElementTree.toXML(query)
82        soapRequest = SOAPEnvelope()
83        soapRequest.create()
84        soapRequest.body.elem.append(elem)
85       
86        request = soapRequest.serialize()
87       
88        return request
89   
90    def _getSAMLResponse(self, responseBody):
91        """Deserialise response string into ElementTree element"""
92        soapResponse = SOAPEnvelope()
93       
94        responseStream = StringIO()
95        responseStream.write(responseBody)
96        responseStream.seek(0)
97       
98        soapResponse.parse(responseStream)
99       
100        print("Parsed response ...")
101        print(soapResponse.serialize())
102#        print(prettyPrint(soapResponse.elem))
103       
104        response = ResponseElementTree.fromXML(soapResponse.body.elem[0])
105       
106        return response
107   
108    def test01ValidQuery(self):
109        query = self._createAuthzDecisionQuery()
110        request = self._makeRequest(query=query)
111       
112        header = {
113            'soapAction': "http://www.oasis-open.org/committees/security",
114            'Content-length': str(len(request)),
115            'Content-type': 'text/xml'
116        }
117        response = self.app.post('/attributeauthority/saml', 
118                                 params=request, 
119                                 headers=header, 
120                                 status=200)
121        print("Response status=%d" % response.status)
122        samlResponse = self._getSAMLResponse(response.body)
123
124        self.assert_(samlResponse.status.statusCode.value == \
125                     StatusCode.SUCCESS_URI)
126        self.assert_(samlResponse.inResponseTo == query.id)
127        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
128                     query.subject.nameID.value)
129
130    def test02AttributeReleaseDenied(self):
131        request = self._makeRequest(issuer="/O=Site B/CN=Authorisation Service")
132       
133        header = {
134            'soapAction': "http://www.oasis-open.org/committees/security",
135            'Content-length': str(len(request)),
136            'Content-type': 'text/xml'
137        }
138       
139        response = self.app.post('/attributeauthority/saml', 
140                                 params=request, 
141                                 headers=header, 
142                                 status=200)
143       
144        print("Response status=%d" % response.status)
145       
146        samlResponse = self._getSAMLResponse(response.body)
147
148        self.assert_(samlResponse.status.statusCode.value == \
149                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
150
151    def test03InvalidAttributesRequested(self):
152        query = self._createAuthzDecisionQuery()
153       
154        # Add an unsupported Attribute name
155        attribute = Attribute()
156        attribute.name = "urn:my:attribute"
157        attribute.nameFormat = XMLConstants.XSD_NS+"#"+\
158                                    XSStringAttributeValue.TYPE_LOCAL_NAME
159        attribute.friendlyName = "myAttribute"
160        query.attributes.append(attribute)     
161       
162        request = self._makeRequest(query=query)
163           
164        header = {
165            'soapAction': "http://www.oasis-open.org/committees/security",
166            'Content-length': str(len(request)),
167            'Content-type': 'text/xml'
168        }
169       
170        response = self.app.post('/attributeauthority/saml', 
171                                 params=request, 
172                                 headers=header, 
173                                 status=200)
174       
175        print("Response status=%d" % response.status)
176       
177        samlResponse = self._getSAMLResponse(response.body)
178
179        self.assert_(samlResponse.status.statusCode.value == \
180                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
181       
182    def test04InvalidIssuer(self):
183        request = self._makeRequest(issuer="My Attribute Query Issuer")
184       
185        header = {
186            'soapAction': "http://www.oasis-open.org/committees/security",
187            'Content-length': str(len(request)),
188            'Content-type': 'text/xml'
189        }
190       
191        response = self.app.post('/attributeauthority/saml', 
192                                 params=request, 
193                                 headers=header, 
194                                 status=200)
195       
196        print("Response status=%d" % response.status)
197       
198        samlResponse = self._getSAMLResponse(response.body)
199
200        self.assert_(samlResponse.status.statusCode.value == \
201                     StatusCode.REQUEST_DENIED_URI)
202
203    def test05UnknownPrincipal(self):
204        request = self._makeRequest(subject="Joe.Bloggs")
205       
206        header = {
207            'soapAction': "http://www.oasis-open.org/committees/security",
208            'Content-length': str(len(request)),
209            'Content-type': 'text/xml'
210        }
211       
212        response = self.app.post('/attributeauthority/saml', 
213                                 params=request, 
214                                 headers=header, 
215                                 status=200)
216       
217        print("Response status=%d" % response.status)
218       
219        samlResponse = self._getSAMLResponse(response.body)
220
221        self.assert_(samlResponse.status.statusCode.value == \
222                     StatusCode.UNKNOWN_PRINCIPAL_URI)
223
224 
225if __name__ == "__main__":
226    unittest.main()
Note: See TracBrowser for help on using the repository browser.