source: TI12-security/trunk/ndg_saml/ndg/saml/test/binding/soap/test_soapauthzdecisioninterface.py @ 7138

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/ndg_saml/ndg/saml/test/binding/soap/test_soapauthzdecisioninterface.py@7138
Revision 7138, 9.9 KB checked in by pjkersha, 10 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • ported SAML WSGI middleware unit tests from ndg.security.
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI SAML 2.0 SOAP Authorisation Decision Query Interface
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/2010"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "http://www.apache.org/licenses/LICENSE-2.0"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12import unittest
13from uuid import uuid4
14from datetime import datetime, timedelta
15from cStringIO import StringIO
16
17from ndg.saml.saml2.core import (SAMLVersion, Subject, NameID, Issuer, 
18                                 AuthzDecisionQuery, AuthzDecisionStatement, 
19                                 Status, StatusCode, StatusMessage, 
20                                 DecisionType, Action, Conditions, Assertion)
21from ndg.saml.xml.etree import (AuthzDecisionQueryElementTree, 
22                                ResponseElementTree)
23
24from ndg.soap.etree import SOAPEnvelope
25from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
26from ndg.security.test.unit.wsgi.saml import SoapSamlInterfaceMiddlewareTestCase
27
28
29class TestAuthorisationServiceMiddleware(object):
30    """Test Authorisation Service interface stub"""
31    QUERY_INTERFACE_KEYNAME_OPTNAME = 'queryInterfaceKeyName'
32    RESOURCE_URI = 'http://localhost/dap/data/'
33    ISSUER_DN = '/O=Test/OU=Authorisation/CN=Service Stub'
34   
35    def __init__(self, app, global_conf, **app_conf):
36        self.queryInterfaceKeyName = app_conf[
37            TestAuthorisationServiceMiddleware.QUERY_INTERFACE_KEYNAME_OPTNAME]
38        self._app = app
39   
40    def __call__(self, environ, start_response):
41        environ[self.queryInterfaceKeyName] = self.authzDecisionQueryFactory()
42        return self._app(environ, start_response)
43   
44    def authzDecisionQueryFactory(self):
45        def authzDecisionQuery(query, response):
46            now = datetime.utcnow()
47            response.issueInstant = now
48           
49            # Make up a request ID that this response is responding to
50            response.inResponseTo = query.id
51            response.id = str(uuid4())
52            response.version = SAMLVersion(SAMLVersion.VERSION_20)
53           
54            response.status = Status()
55            response.status.statusCode = StatusCode()
56            response.status.statusCode.value = StatusCode.SUCCESS_URI
57            response.status.statusMessage = StatusMessage()       
58            response.status.statusMessage.value = \
59                                                "Response created successfully"
60               
61            assertion = Assertion()
62            assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
63            assertion.id = str(uuid4())
64            assertion.issueInstant = now
65           
66            authzDecisionStatement = AuthzDecisionStatement()
67            authzDecisionStatement.decision = DecisionType.PERMIT
68            authzDecisionStatement.resource = \
69                TestAuthorisationServiceMiddleware.RESOURCE_URI
70            authzDecisionStatement.actions.append(Action())
71            authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
72            authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
73            assertion.authzDecisionStatements.append(authzDecisionStatement)
74           
75            # Add a conditions statement for a validity of 8 hours
76            assertion.conditions = Conditions()
77            assertion.conditions.notBefore = now
78            assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8)
79                   
80            assertion.subject = Subject() 
81            assertion.subject.nameID = NameID()
82            assertion.subject.nameID.format = query.subject.nameID.format
83            assertion.subject.nameID.value = query.subject.nameID.value
84               
85            assertion.issuer = Issuer()
86            assertion.issuer.format = Issuer.X509_SUBJECT
87            assertion.issuer.value = \
88                                    TestAuthorisationServiceMiddleware.ISSUER_DN
89   
90            response.assertions.append(assertion)
91            return response
92       
93        return authzDecisionQuery
94   
95   
96class SOAPAuthzDecisionInterfaceMiddlewareTestCase(
97                                        SoapSamlInterfaceMiddlewareTestCase):
98    CONFIG_FILENAME = 'authz-decision-interface.ini'
99    RESOURCE_URI = TestAuthorisationServiceMiddleware.RESOURCE_URI
100   
101    def _createAuthzDecisionQuery(self, 
102                            issuer="/O=Site A/CN=PEP",
103                            subject="https://openid.localhost/philip.kershaw",
104                            resource=None,
105                            action=Action.HTTP_GET_ACTION,
106                            actionNs=Action.GHPP_NS_URI):
107        query = AuthzDecisionQuery()
108        query.version = SAMLVersion(SAMLVersion.VERSION_20)
109        query.id = str(uuid4())
110        query.issueInstant = datetime.utcnow()
111       
112        query.issuer = Issuer()
113        query.issuer.format = Issuer.X509_SUBJECT
114        query.issuer.value = issuer
115                       
116        query.subject = Subject() 
117        query.subject.nameID = NameID()
118        query.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
119        query.subject.nameID.value = subject
120                                 
121        if resource is None:
122            query.resource = self.__class__.RESOURCE_URI
123        else:   
124            query.resource = resource
125                 
126        query.actions.append(Action())
127        query.actions[0].namespace = actionNs
128        query.actions[0].value = action   
129
130        return query
131   
132    def _makeRequest(self, query=None, **kw):
133        """Convenience method to construct queries for tests"""
134       
135        if query is None:
136            query = self._createAuthzDecisionQuery(**kw)
137           
138        elem = AuthzDecisionQueryElementTree.toXML(query)
139        soapRequest = SOAPEnvelope()
140        soapRequest.create()
141        soapRequest.body.elem.append(elem)
142       
143        request = soapRequest.serialize()
144       
145        return request
146   
147    def _getSAMLResponse(self, responseBody):
148        """Deserialise response string into ElementTree element"""
149        soapResponse = SOAPEnvelope()
150       
151        responseStream = StringIO()
152        responseStream.write(responseBody)
153        responseStream.seek(0)
154       
155        soapResponse.parse(responseStream)
156       
157        print("Parsed response ...")
158        print(soapResponse.serialize())
159#        print(prettyPrint(soapResponse.elem))
160       
161        response = ResponseElementTree.fromXML(soapResponse.body.elem[0])
162       
163        return response
164   
165    def test01ValidQuery(self):
166        query = self._createAuthzDecisionQuery()
167        request = self._makeRequest(query=query)
168       
169        header = {
170            'soapAction': "http://www.oasis-open.org/committees/security",
171            'Content-length': str(len(request)),
172            'Content-type': 'text/xml'
173        }
174        response = self.app.post('/authorisationservice/', 
175                                 params=request, 
176                                 headers=header, 
177                                 status=200)
178        print("Response status=%d" % response.status)
179        samlResponse = self._getSAMLResponse(response.body)
180
181        self.assert_(samlResponse.status.statusCode.value == \
182                     StatusCode.SUCCESS_URI)
183        self.assert_(samlResponse.inResponseTo == query.id)
184        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
185                     query.subject.nameID.value)
186        self.assert_(samlResponse.assertions[0])
187        self.assert_(samlResponse.assertions[0].authzDecisionStatements[0])
188        self.assert_(samlResponse.assertions[0].authzDecisionStatements[0
189                                            ].decision == DecisionType.PERMIT)
190
191       
192class SOAPAuthzServiceMiddlewareTestCase(
193                                SOAPAuthzDecisionInterfaceMiddlewareTestCase):
194    """Test the actual server side middleware
195    ndg.security.server.wsgi.authzservice.AuthzServiceMiddleware
196    rather than a test stub
197    """
198    CONFIG_FILENAME = 'authz-service.ini'
199    RESOURCE_URI = 'http://localhost/dap/data/my.nc.dods?time[0:1:0]&lat'
200    ACCESS_DENIED_RESOURCE_URI = \
201        'http://localhost/dap/data/test_accessDeniedToSecuredURI'
202   
203    def __init__(self, *arg, **kw):
204        """Extend base init to include SAML Attribute Authority required by
205        Authorisation Service"""
206        super(SOAPAuthzDecisionInterfaceMiddlewareTestCase, self).__init__(
207                                                                    *arg, **kw)
208        self.startSiteAAttributeAuthority(withSSL=True, port=5443)
209       
210    def test02AccessDenied(self):
211        cls = SOAPAuthzServiceMiddlewareTestCase
212        query = self._createAuthzDecisionQuery(
213                                        resource=cls.ACCESS_DENIED_RESOURCE_URI)
214        request = self._makeRequest(query=query)
215       
216        header = {
217            'soapAction': "http://www.oasis-open.org/committees/security",
218            'Content-length': str(len(request)),
219            'Content-type': 'text/xml'
220        }
221        response = self.app.post('/authorisationservice/', 
222                                 params=request, 
223                                 headers=header, 
224                                 status=200)
225        print("Response status=%d" % response.status)
226        samlResponse = self._getSAMLResponse(response.body)
227
228        self.assert_(samlResponse.status.statusCode.value == \
229                     StatusCode.SUCCESS_URI)
230        self.assert_(samlResponse.inResponseTo == query.id)
231        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
232                     query.subject.nameID.value)
233        self.assert_(samlResponse.assertions[0])
234        self.assert_(samlResponse.assertions[0].authzDecisionStatements[0])
235        self.assert_(samlResponse.assertions[0].authzDecisionStatements[0
236                                            ].decision == DecisionType.DENY)   
237   
238   
239if __name__ == "__main__":
240    unittest.main()
Note: See TracBrowser for help on using the repository browser.