source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py @ 7287

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py@7287
Revision 7287, 15.8 KB checked in by pjkersha, 10 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • Working WSGI Authorisation filter with connection to SAML/XACML based Authorisation Service - unit tests: ndg.security.test.unit.wsgi.authz.test_authz
  • It may need some optimisation to avoid too many WS callouts to the Authorisation Service - perhaps add a local PDP to the authorisation filter to filter out some requests going over the wire e.g. requests for web page CSS or graphics content.
  • The XACML policy file has some big additions to it to support the various test conditions in ndg.security.test.unit.wsgi.authz.test_authz. These should be ported back to the ndg_xacml package unit tests.
  • Next major task: remove temp fix in XACML Context handler - instead of using hardwired roles for the user alter it so that the PDP makes a request back to the PIP (Policy Enforcement Point) to grab additional attributes. The PIP will call to Attibute Service(s) to pull any additional attributes needed/
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI Authorization handler
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/05/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os
17from urlparse import urlunsplit
18
19from os import path
20from ConfigParser import SafeConfigParser
21
22import paste.fixture
23from paste.deploy import loadapp
24
25from ndg.security.test.unit import BaseTestCase
26from ndg.security.server.wsgi import NDGSecurityMiddlewareBase
27from ndg.security.server.wsgi.authz.result_handler.basic import \
28    PEPResultHandlerMiddleware
29from ndg.security.server.wsgi.authz.result_handler.redirect import \
30    HTTPRedirectPEPResultHandlerMiddleware
31from ndg.security.server.wsgi.authz.pep import SamlPepFilterConfigError
32
33
34from uuid import uuid4
35from datetime import datetime, timedelta
36
37from ndg.saml.saml2.core import (SAMLVersion, Subject, NameID, Issuer, 
38                                 AuthzDecisionQuery, AuthzDecisionStatement, 
39                                 Status, StatusCode, StatusMessage, 
40                                 DecisionType, Action, Conditions, Assertion)
41from ndg.saml.xml.etree import (AuthzDecisionQueryElementTree, 
42                                ResponseElementTree)
43
44
45class TestAuthorisationServiceMiddleware(object):
46    """Test Authorisation Service interface stub"""
47    QUERY_INTERFACE_KEYNAME_OPTNAME = 'queryInterfaceKeyName'
48    RESOURCE_URI = 'http://localhost/dap/data/'
49    ISSUER_DN = '/O=Test/OU=Authorisation/CN=Service Stub'
50   
51    def __init__(self, app, global_conf, **app_conf):
52        self.queryInterfaceKeyName = app_conf[
53            TestAuthorisationServiceMiddleware.QUERY_INTERFACE_KEYNAME_OPTNAME]
54        self._app = app
55   
56    def __call__(self, environ, start_response):
57        environ[self.queryInterfaceKeyName] = self.authzDecisionQueryFactory()
58        return self._app(environ, start_response)
59   
60    def authzDecisionQueryFactory(self):
61        """Makes the authorisation decision"""
62       
63        def authzDecisionQuery(query, response):
64            """Authorisation Decision Query interface called by the next
65            middleware in the stack the SAML SOAP Query interface middleware
66            instance
67            (ndg.saml.saml2.binding.soap.server.wsgi.queryinterface.SOAPQueryInterfaceMiddleware)
68            """
69            now = datetime.utcnow()
70            response.issueInstant = now
71           
72            # Make up a request ID that this response is responding to
73            response.inResponseTo = query.id
74            response.id = str(uuid4())
75            response.version = SAMLVersion(SAMLVersion.VERSION_20)
76           
77            response.status = Status()
78            response.status.statusCode = StatusCode()
79            response.status.statusCode.value = StatusCode.SUCCESS_URI
80            response.status.statusMessage = StatusMessage()       
81            response.status.statusMessage.value = \
82                                                "Response created successfully"
83               
84            assertion = Assertion()
85            assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
86            assertion.id = str(uuid4())
87            assertion.issueInstant = now
88           
89            authzDecisionStatement = AuthzDecisionStatement()
90           
91            # Make some simple logic to simulate a full access policy
92            if query.resource == self.__class__.RESOURCE_URI:
93                if query.actions[0].value == Action.HTTP_GET_ACTION:
94                    authzDecisionStatement.decision = DecisionType.PERMIT
95                else:
96                    authzDecisionStatement.decision = DecisionType.DENY
97            else:
98                authzDecisionStatement.decision = DecisionType.INDETERMINATE
99               
100            authzDecisionStatement.resource = query.resource
101               
102            authzDecisionStatement.actions.append(Action())
103            authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
104            authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
105            assertion.authzDecisionStatements.append(authzDecisionStatement)
106           
107            # Add a conditions statement for a validity of 8 hours
108            assertion.conditions = Conditions()
109            assertion.conditions.notBefore = now
110            assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8)
111                   
112            assertion.subject = Subject() 
113            assertion.subject.nameID = NameID()
114            assertion.subject.nameID.format = query.subject.nameID.format
115            assertion.subject.nameID.value = query.subject.nameID.value
116               
117            assertion.issuer = Issuer()
118            assertion.issuer.format = Issuer.X509_SUBJECT
119            assertion.issuer.value = \
120                                    TestAuthorisationServiceMiddleware.ISSUER_DN
121   
122            response.assertions.append(assertion)
123            return response
124       
125        return authzDecisionQuery
126
127
128class RedirectFollowingAccessDenied(PEPResultHandlerMiddleware):
129   
130    @NDGSecurityMiddlewareBase.initCall
131    def __call__(self, environ, start_response):
132       
133        queryString = environ.get('QUERY_STRING', '')
134        if 'admin=1' in queryString:
135            # User has been rejected access to a URI requiring admin rights,
136            # try redirect to the same URI minus the admin query arg, this
137            # request will pass because admin rights aren't needed
138            queryArgs = queryString.split('&')
139            queryList = [arg for arg in queryArgs if arg != 'admin=1']
140            editedQuery = '&'.join(queryList)
141            redirectURI = urlunsplit(('', '', self.pathInfo, editedQuery, ''))
142            return self.redirect(redirectURI)
143        else:
144            return super(RedirectFollowingAccessDenied, self).__call__(
145                                                                environ,
146                                                                start_response)
147
148
149class TestAuthZMiddleware(object):
150    '''Test Application for the Authentication handler to protect'''
151    RESPONSE = "Test Authorization application"
152       
153    def __init__(self, app_conf, **local_conf):
154        pass
155   
156    def __call__(self, environ, start_response):
157       
158        if environ['PATH_INFO'] == '/test_401':
159            status = "401 Unauthorized"
160           
161        elif environ['PATH_INFO'] == '/test_403':
162            status = "403 Forbidden"
163           
164        elif environ['PATH_INFO'] == '/test_200':
165            status = "200 OK"
166           
167        elif environ['PATH_INFO'] == '/test_accessDeniedToSecuredURI':
168            # Nb. AuthZ middleware should intercept the request and bypass this
169            # response
170            status = "200 OK"
171           
172        elif environ['PATH_INFO'] == '/test_accessGrantedToSecuredURI':
173            status = "200 OK"
174        else:
175            status = "404 Not found"
176               
177        start_response(status,
178                       [('Content-length', 
179                         str(len(TestAuthZMiddleware.RESPONSE))),
180                        ('Content-type', 'text/plain')])
181       
182        return [TestAuthZMiddleware.RESPONSE + ' returned: ' + status]
183
184
185class BeakerSessionStub(dict):
186    """Emulate beaker.session session object for purposes of the unit tests
187    """
188    def save(self):
189        pass
190
191
192class BaseAuthzFilterTestCase(BaseTestCase):
193    """Base class for NDG Security WSGI authorisation filters
194    """
195    INI_FILE = 'saml-test.ini'
196    THIS_DIR = path.dirname(path.abspath(__file__))
197    INI_FILEPATH = None # Set in __init__ to enable derived classes to alter
198    SESSION_KEYNAME = 'beaker.session.ndg.security'
199   
200    def __init__(self, *args, **kwargs):   
201        """Test the authorisation filter using Paste fixture and set up
202        Authorisation and Attribute Services needed for making authorisation
203        decisions
204        """   
205        BaseTestCase.__init__(self, *args, **kwargs)
206
207        wsgiapp = loadapp('config:'+self.__class__.INI_FILE, 
208                          relative_to=self.__class__.THIS_DIR)
209        self.app = paste.fixture.TestApp(wsgiapp)
210       
211        self.__class__.INI_FILEPATH = os.path.join(self.__class__.THIS_DIR, 
212                                                   self.__class__.INI_FILE)
213       
214        self.startSiteAAttributeAuthority(withSSL=True,
215            port=self.__class__.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
216       
217        self.startAuthorisationService() 
218         
219         
220class SamlPepFilterTestCase(BaseAuthzFilterTestCase):
221    """Test SAML based Policy Enforcement Filter.  This has a SAML authorisation
222    decision query interface to call to a remote authorisation service"""
223
224    def test01CatchNoBeakerSessionFound(self):
225       
226        # PEPFilterConfigError is raised if no beaker.session is set in
227        # environ
228        self.assertRaises(SamlPepFilterConfigError, self.app.get, 
229                          '/test_200')
230       
231    def test02Ensure200WithNotLoggedInAndUnsecuredURI(self):
232       
233        # Check the authZ middleware leaves the response alone if the URI
234        # is not matched in the policy
235       
236        # Simulate a beaker.session in the environ
237        extra_environ={self.__class__.SESSION_KEYNAME:BeakerSessionStub()}
238        response = self.app.get('/test_200',
239                                extra_environ=extra_environ)
240        print response
241
242    def test03Catch401WithLoggedIn(self):
243       
244        # Check that the application being secured can raise a HTTP 401
245        # response and that this respected by the Authorization middleware
246        # even though a user is set in the session
247       
248        extra_environ = {
249            self.__class__.SESSION_KEYNAME:
250                BeakerSessionStub(username=self.__class__.OPENID_URI),
251            'REMOTE_USER': self.__class__.OPENID_URI
252        }
253        response = self.app.get('/test_401', 
254                                extra_environ=extra_environ,
255                                status=401)
256        print response
257
258    def test04Catch403WithLoggedIn(self):
259        # Check that the application being secured can raise a HTTP 403
260        # response and that this respected by the Authorization middleware
261        # even though a user is set in the session
262       
263        extra_environ = {
264            self.__class__.SESSION_KEYNAME:
265                BeakerSessionStub(username=SamlPepFilterTestCase.OPENID_URI),
266            'REMOTE_USER': self.__class__.OPENID_URI
267        }
268        response = self.app.get('/test_403', 
269                                extra_environ=extra_environ,
270                                status=403)
271        print response
272
273    def test05Catch401WithNotLoggedInAndSecuredURI(self):
274        # AuthZ middleware grants access because the URI requested has no
275        # subject restriction set in the policy rule
276       
277        # AuthZ middleware checks for username key in session set by AuthN
278        # handler
279        extra_environ={self.__class__.SESSION_KEYNAME:BeakerSessionStub()}       
280        response = self.app.get('/test_accessDeniedToSecuredURI',
281                                extra_environ=extra_environ,
282                                status=401)
283        print response
284       
285    def test06AccessDeniedForSecuredURI(self):
286       
287        # User is logged in but doesn't have the required credentials for
288        # access
289        extra_environ = {
290            self.__class__.SESSION_KEYNAME:
291                BeakerSessionStub(username=SamlPepFilterTestCase.OPENID_URI),
292            'REMOTE_USER': self.__class__.OPENID_URI
293        }
294       
295        response = self.app.get('/test_accessDeniedToSecuredURI',
296                                extra_environ=extra_environ,
297                                status=403)
298        print response
299
300    def test07AccessGrantedForSecuredURI(self):
301       
302        # User is logged in and has credentials for access to a URI secured
303        # by the policy file
304        extra_environ = {
305            self.__class__.SESSION_KEYNAME:
306                BeakerSessionStub(username=SamlPepFilterTestCase.OPENID_URI),
307            'REMOTE_USER': self.__class__.OPENID_URI
308        }
309       
310        response = self.app.get('/test_accessGrantedToSecuredURI',
311                                extra_environ=extra_environ,
312                                status=200)
313        self.assert_(TestAuthZMiddleware.RESPONSE in response)
314        print response
315
316
317class PEPResultHandlerTestCase(BaseAuthzFilterTestCase):
318    """Test Authorisation Filter - this contains the PEP filter and a result
319    handler which enables customisation of behaviour on 403 Forbidden responses
320    """
321    INI_FILE = 'pep-result-handler-test.ini'
322    AUTHZ_FILTER_SECTION = 'filter:AuthZFilter'
323    AUTHZ_RESULT_HANDLER_REDIRECT_URI_OPTNAME = 'authz.resultHandler.redirectURI'
324   
325    def __init__(self, *arg, **kw):
326        BaseAuthzFilterTestCase.__init__(self, *arg, **kw)
327       
328        cfgParser = SafeConfigParser()
329        cfgParser.read(self.__class__.INI_FILEPATH)
330       
331        self.redirectURI = cfgParser.get(self.__class__.AUTHZ_FILTER_SECTION,
332                    self.__class__.AUTHZ_RESULT_HANDLER_REDIRECT_URI_OPTNAME)
333       
334    def test01RedirectPEPResultHandlerMiddleware(self):
335        # User is logged in but doesn't have the required credentials for
336        extra_environ = {
337            self.__class__.SESSION_KEYNAME:
338                        BeakerSessionStub(username=self.__class__.OPENID_URI),
339            'REMOTE_USER': self.__class__.OPENID_URI
340        }
341       
342        # Expecting result handler to be invoked overriding the 403 response
343        response = self.app.get('/test_accessDeniedToSecuredURI',
344                                extra_environ=extra_environ,
345                                status=302)
346        print("Result handler has intercepted the 403 Forbidden response "
347              "from the PEP and set this redirect response instead: %s" %
348              response)
349        self.assert_(response.header_dict.get('location') == self.redirectURI)
350
351    def test02RedirectFollowingAccessDeniedForAdminQueryArg(self):
352       
353        # User is logged in but doesn't have the required credentials for
354        # access
355        extra_environ = {
356            self.__class__.SESSION_KEYNAME:
357                BeakerSessionStub(username=SamlPepFilterTestCase.OPENID_URI),
358            'REMOTE_USER': self.__class__.OPENID_URI
359        }
360       
361        # Try this URI with the query arg admin=1.  This will be picked up
362        # by the policy as a request requiring admin rights.  The request is
363        # denied as the user doesn't have these rights but this then calls
364        # into play the PEP result handler defined in this module,
365        # RedirectFollowingAccessDenied.  This class reinvokes the request
366        # but without the admin query argument (see the ini file for what this
367        # location is.  Access is then granted because the user has access
368        # rights for the new location.
369        response = self.app.get('/test_accessGrantedToSecuredURI',
370                                params={'admin': 1},
371                                extra_environ=extra_environ,
372                                status=302)
373
374        print("Redirect Handler has interrupted the 403 Denied response and "
375              "added this redirect response instead: %s" % response)
376       
377        # Follow the redirect - the policy should allow access to the new
378        # location
379        redirectResponse = response.follow(extra_environ=extra_environ,
380                                           status=200)
381        print("Following the redirect to location %r gives this response: %s" %
382              (response.header_dict.get('location'), redirectResponse))
383       
384       
385if __name__ == "__main__":
386    unittest.main()       
Note: See TracBrowser for help on using the repository browser.