source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py @ 7257

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py@7257
Revision 7257, 14.8 KB checked in by pjkersha, 10 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • cleaning out more old modules containing retired NDG2 security functionality
  • progress with ndg.security.test.unit.wsgi.authz.test_authz unit tests integrating SAML/XACML authorisation service to WSGI filter SAML PEP
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI Authorization handler
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/05/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os
17from urlparse import urlunsplit
18
19from os import path
20from ConfigParser import SafeConfigParser
21
22import paste.fixture
23from paste.deploy import loadapp
24
25from ndg.security.test.unit import BaseTestCase
26from ndg.security.server.wsgi import NDGSecurityMiddlewareBase
27from ndg.security.server.wsgi.authz.result_handler.basic import \
28    PEPResultHandlerMiddleware
29from ndg.security.server.wsgi.authz.result_handler.redirect import \
30    HTTPRedirectPEPResultHandlerMiddleware
31from ndg.security.server.wsgi.authz.pep import SamlPepMiddlewareConfigError
32
33
34from uuid import uuid4
35from datetime import datetime, timedelta
36
37from ndg.saml.saml2.core import (SAMLVersion, Subject, NameID, Issuer, 
38                                 AuthzDecisionQuery, AuthzDecisionStatement, 
39                                 Status, StatusCode, StatusMessage, 
40                                 DecisionType, Action, Conditions, Assertion)
41from ndg.saml.xml.etree import (AuthzDecisionQueryElementTree, 
42                                ResponseElementTree)
43
44
45class TestAuthorisationServiceMiddleware(object):
46    """Test Authorisation Service interface stub"""
47    QUERY_INTERFACE_KEYNAME_OPTNAME = 'queryInterfaceKeyName'
48    RESOURCE_URI = 'http://localhost/dap/data/'
49    ISSUER_DN = '/O=Test/OU=Authorisation/CN=Service Stub'
50   
51    def __init__(self, app, global_conf, **app_conf):
52        self.queryInterfaceKeyName = app_conf[
53            TestAuthorisationServiceMiddleware.QUERY_INTERFACE_KEYNAME_OPTNAME]
54        self._app = app
55   
56    def __call__(self, environ, start_response):
57        environ[self.queryInterfaceKeyName] = self.authzDecisionQueryFactory()
58        return self._app(environ, start_response)
59   
60    def authzDecisionQueryFactory(self):
61        """Makes the authorisation decision"""
62       
63        def authzDecisionQuery(query, response):
64            """Authorisation Decision Query interface called by the next
65            middleware in the stack the SAML SOAP Query interface middleware
66            instance
67            (ndg.saml.saml2.binding.soap.server.wsgi.queryinterface.SOAPQueryInterfaceMiddleware)
68            """
69            now = datetime.utcnow()
70            response.issueInstant = now
71           
72            # Make up a request ID that this response is responding to
73            response.inResponseTo = query.id
74            response.id = str(uuid4())
75            response.version = SAMLVersion(SAMLVersion.VERSION_20)
76           
77            response.status = Status()
78            response.status.statusCode = StatusCode()
79            response.status.statusCode.value = StatusCode.SUCCESS_URI
80            response.status.statusMessage = StatusMessage()       
81            response.status.statusMessage.value = \
82                                                "Response created successfully"
83               
84            assertion = Assertion()
85            assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
86            assertion.id = str(uuid4())
87            assertion.issueInstant = now
88           
89            authzDecisionStatement = AuthzDecisionStatement()
90           
91            # Make some simple logic to simulate a full access policy
92            if query.resource == self.__class__.RESOURCE_URI:
93                if query.actions[0].value == Action.HTTP_GET_ACTION:
94                    authzDecisionStatement.decision = DecisionType.PERMIT
95                else:
96                    authzDecisionStatement.decision = DecisionType.DENY
97            else:
98                authzDecisionStatement.decision = DecisionType.INDETERMINATE
99               
100            authzDecisionStatement.resource = query.resource
101               
102            authzDecisionStatement.actions.append(Action())
103            authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
104            authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
105            assertion.authzDecisionStatements.append(authzDecisionStatement)
106           
107            # Add a conditions statement for a validity of 8 hours
108            assertion.conditions = Conditions()
109            assertion.conditions.notBefore = now
110            assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8)
111                   
112            assertion.subject = Subject() 
113            assertion.subject.nameID = NameID()
114            assertion.subject.nameID.format = query.subject.nameID.format
115            assertion.subject.nameID.value = query.subject.nameID.value
116               
117            assertion.issuer = Issuer()
118            assertion.issuer.format = Issuer.X509_SUBJECT
119            assertion.issuer.value = \
120                                    TestAuthorisationServiceMiddleware.ISSUER_DN
121   
122            response.assertions.append(assertion)
123            return response
124       
125        return authzDecisionQuery
126
127
128class RedirectFollowingAccessDenied(PEPResultHandlerMiddleware):
129   
130    @NDGSecurityMiddlewareBase.initCall
131    def __call__(self, environ, start_response):
132       
133        queryString = environ.get('QUERY_STRING', '')
134        if 'admin=1' in queryString:
135            # User has been rejected access to a URI requiring admin rights,
136            # try redirect to the same URI minus the admin query arg, this
137            # request will pass because admin rights aren't needed
138            queryArgs = queryString.split('&')
139            queryList = [arg for arg in queryArgs if arg != 'admin=1']
140            editedQuery = '&'.join(queryList)
141            redirectURI = urlunsplit(('', '', self.pathInfo, editedQuery, ''))
142            return self.redirect(redirectURI)
143        else:
144            return super(RedirectFollowingAccessDenied, self).__call__(
145                                                                environ,
146                                                                start_response)
147
148
149class TestAuthZMiddleware(object):
150    '''Test Application for the Authentication handler to protect'''
151    response = "Test Authorization application"
152       
153    def __init__(self, app_conf, **local_conf):
154        pass
155   
156    def __call__(self, environ, start_response):
157       
158        if environ['PATH_INFO'] == '/test_401':
159            status = "401 Unauthorized"
160           
161        elif environ['PATH_INFO'] == '/test_403':
162            status = "403 Forbidden"
163           
164        elif environ['PATH_INFO'] == '/test_200':
165            status = "200 OK"
166           
167        elif environ['PATH_INFO'] == '/test_accessDeniedToSecuredURI':
168            # Nb. AuthZ middleware should intercept the request and bypass this
169            # response
170            status = "200 OK"
171           
172        elif environ['PATH_INFO'] == '/test_accessGrantedToSecuredURI':
173            status = "200 OK"
174        else:
175            status = "404 Not found"
176               
177        start_response(status,
178                       [('Content-length', 
179                         str(len(TestAuthZMiddleware.response))),
180                        ('Content-type', 'text/plain')])
181       
182        return [TestAuthZMiddleware.response]
183
184
185class BeakerSessionStub(dict):
186    """Emulate beaker.session session object for purposes of the unit tests
187    """
188    def save(self):
189        pass
190 
191   
192class SamlWSGIAuthZTestCase(BaseTestCase):
193    INI_FILE = 'saml-test.ini'
194    THIS_DIR = path.dirname(path.abspath(__file__))
195    SESSION_KEYNAME = 'beaker.session.ndg.security'
196   
197    def __init__(self, *args, **kwargs):       
198        BaseTestCase.__init__(self, *args, **kwargs)
199
200        wsgiapp = loadapp('config:'+SamlWSGIAuthZTestCase.INI_FILE, 
201                          relative_to=SamlWSGIAuthZTestCase.THIS_DIR)
202        self.app = paste.fixture.TestApp(wsgiapp)
203       
204        self.startSiteAAttributeAuthority(withSSL=True,
205            port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
206       
207        self.startAuthorisationService()
208
209    def test01CatchNoBeakerSessionFound(self):
210       
211        # PEPFilterConfigError is raised if no beaker.session is set in
212        # environ
213        self.assertRaises(SamlPepMiddlewareConfigError, self.app.get, 
214                          '/test_200')
215       
216    def test02Ensure200WithNotLoggedInAndUnsecuredURI(self):
217       
218        # Check the authZ middleware leaves the response alone if the URI
219        # is not matched in the policy
220       
221        # Simulate a beaker.session in the environ
222        extra_environ={self.__class__.SESSION_KEYNAME:BeakerSessionStub()}
223        response = self.app.get('/test_200',
224                                extra_environ=extra_environ)
225
226    def test03Catch401WithLoggedIn(self):
227       
228        # Check that the application being secured can raise a HTTP 401
229        # response and that this respected by the Authorization middleware
230        # even though a user is set in the session
231       
232        extra_environ = {
233            self.__class__.SESSION_KEYNAME:
234                BeakerSessionStub(username=self.__class__.OPENID_URI),
235            'REMOTE_USER': self.__class__.OPENID_URI
236        }
237        response = self.app.get('/test_401', 
238                                extra_environ=extra_environ,
239                                status=401)
240
241    def test04Catch403WithLoggedIn(self):
242        # Check that the application being secured can raise a HTTP 403
243        # response and that this respected by the Authorization middleware
244        # even though a user is set in the session
245       
246        extra_environ = {
247            self.__class__.SESSION_KEYNAME:
248                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI),
249            'REMOTE_USER': self.__class__.OPENID_URI
250        }
251        response = self.app.get('/test_403', 
252                                extra_environ=extra_environ,
253                                status=403)
254
255    def test05Catch401WithNotLoggedInAndSecuredURI(self):
256        # AuthZ middleware grants access because the URI requested is has no
257        # subject restriction set in the policy rule
258       
259        # AuthZ middleware checks for username key in session set by AuthN
260        # handler
261        extra_environ={self.__class__.SESSION_KEYNAME:BeakerSessionStub()}       
262        response = self.app.get('/test_accessDeniedToSecuredURI',
263                                extra_environ=extra_environ,
264                                status=401)
265       
266    def test06AccessDeniedForSecuredURI(self):
267       
268        # User is logged in but doesn't have the required credentials for
269        # access
270        extra_environ = {
271            self.__class__.SESSION_KEYNAME:
272                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI),
273            'REMOTE_USER': self.__class__.OPENID_URI
274        }
275       
276        response = self.app.get('/test_accessDeniedToSecuredURI',
277                                extra_environ=extra_environ,
278                                status=403)
279        print response
280        self.assert_("Insufficient privileges to access the "
281                     "resource" in response)
282
283    def test07AccessGrantedForSecuredURI(self):
284       
285        # User is logged in and has credentials for access to a URI secured
286        # by the policy file
287        extra_environ = {
288            self.__class__.SESSION_KEYNAME:
289                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI),
290            'REMOTE_USER': self.__class__.OPENID_URI
291        }
292       
293        response = self.app.get('/test_accessGrantedToSecuredURI',
294                                extra_environ=extra_environ,
295                                status=200)
296        self.assert_(TestAuthZMiddleware.response in response)
297        print response
298
299    def test08AccessDeniedForAdminQueryArg(self):
300       
301        # User is logged in but doesn't have the required credentials for
302        # access
303        extra_environ = {
304            self.__class__.SESSION_KEYNAME:
305                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI),
306            'REMOTE_USER': self.__class__.OPENID_URI
307        }
308       
309        # Try this URI with the query arg admin=1.  This will be picked up
310        # by the policy as a request requiring admin rights.  The request is
311        # denied as the user doesn't have these rights but this then calls
312        # into play the PEP result handler defined in this module,
313        # RedirectFollowingAccessDenied.  This class reinvokes the request
314        # but without the admin query argument.  Access is then granted for
315        # the redirected request
316        response = self.app.get('/test_accessGrantedToSecuredURI',
317                                params={'admin': 1},
318                                extra_environ=extra_environ,
319                                status=302)
320        try:
321            redirectResponse = response.follow(extra_environ=extra_environ)
322        except paste.fixture.AppError, e:
323            self.failIf(TestAuthZMiddleware.response not in response)
324        print response
325
326
327class PEPResultHandlerTestCase(BaseTestCase):
328    INI_FILE = 'pep-result-handler-test.ini'
329    THIS_DIR = os.path.dirname(os.path.abspath(__file__))
330    INI_FILEPATH = path.join(THIS_DIR, INI_FILE)
331    SESSION_KEYNAME = 'beaker.session.ndg.security'
332   
333    def __init__(self, *arg, **kw):
334        BaseTestCase.__init__(self, *arg, **kw)
335       
336        here_dir = os.path.dirname(os.path.abspath(__file__))
337        wsgiapp = loadapp('config:'+self.__class__.INI_FILE, 
338                          relative_to=self.__class__.THIS_DIR)
339        self.app = paste.fixture.TestApp(wsgiapp)
340       
341        cfg = SafeConfigParser(dict(here=self.__class__.THIS_DIR))
342        cfg.read(self.__class__.INI_FILEPATH)
343        self.redirectURI = cfg.get('filter:AuthZFilter', 
344                                   'authz.pepResultHandler.redirectURI')
345       
346        self.startSiteAAttributeAuthority(withSSL=True,
347            port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
348
349       
350    def testRedirectPEPResultHandlerMiddleware(self):
351        # User is logged in but doesn't have the required credentials for
352        # access
353        raise NotImplementedError('TODO: fix recursion error')
354#        extra_environ = {
355#            self.__class__.SESSION_KEYNAME:
356#                        BeakerSessionStub(username=self.__class__.OPENID_URI)
357#        }
358#       
359#        # Expecting redirect response to specified redirect URI
360#        response = self.app.get('/test_accessDeniedToSecuredURI',
361#                                extra_environ=extra_environ,
362#                                status=302)
363#        print(response)
364#        self.assert_(response.header_dict.get('location') == self.redirectURI)
365       
366if __name__ == "__main__":
367    unittest.main()       
Note: See TracBrowser for help on using the repository browser.