source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py @ 7168

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py@7168
Revision 7168, 14.5 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • testing ndg.security.server.wsgi.authz.pep.SamlPepMiddleware? in ndg.security.test.unit.authz.test_authz
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI Authorization handler
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/05/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os
17from urlparse import urlunsplit
18
19from os import path
20from ConfigParser import SafeConfigParser
21
22import paste.fixture
23from paste.deploy import loadapp
24
25from ndg.security.test.unit import BaseTestCase
26from ndg.security.server.wsgi import NDGSecurityMiddlewareBase
27from ndg.security.server.wsgi.authz.result_handler.basic import \
28    PEPResultHandlerMiddleware
29from ndg.security.server.wsgi.authz.result_handler.redirect import \
30    HTTPRedirectPEPResultHandlerMiddleware
31from ndg.security.server.wsgi.authz.pep import SamlPepMiddlewareConfigError
32
33
34from uuid import uuid4
35from datetime import datetime, timedelta
36
37from ndg.saml.saml2.core import (SAMLVersion, Subject, NameID, Issuer, 
38                                 AuthzDecisionQuery, AuthzDecisionStatement, 
39                                 Status, StatusCode, StatusMessage, 
40                                 DecisionType, Action, Conditions, Assertion)
41from ndg.saml.xml.etree import (AuthzDecisionQueryElementTree, 
42                                ResponseElementTree)
43
44class TestAuthorisationServiceMiddleware(object):
45    """Test Authorisation Service interface stub"""
46    QUERY_INTERFACE_KEYNAME_OPTNAME = 'queryInterfaceKeyName'
47    RESOURCE_URI = 'http://localhost/dap/data/'
48    ISSUER_DN = '/O=Test/OU=Authorisation/CN=Service Stub'
49   
50    def __init__(self, app, global_conf, **app_conf):
51        self.queryInterfaceKeyName = app_conf[
52            TestAuthorisationServiceMiddleware.QUERY_INTERFACE_KEYNAME_OPTNAME]
53        self._app = app
54   
55    def __call__(self, environ, start_response):
56        environ[self.queryInterfaceKeyName] = self.authzDecisionQueryFactory()
57        return self._app(environ, start_response)
58   
59    def authzDecisionQueryFactory(self):
60        """Makes the authorisation decision"""
61       
62        def authzDecisionQuery(query, response):
63            """Authorisation Decision Query interface called by the next
64            middleware in the stack the SAML SOAP Query interface middleware
65            instance
66            (ndg.saml.saml2.binding.soap.server.wsgi.queryinterface.SOAPQueryInterfaceMiddleware)
67            """
68            now = datetime.utcnow()
69            response.issueInstant = now
70           
71            # Make up a request ID that this response is responding to
72            response.inResponseTo = query.id
73            response.id = str(uuid4())
74            response.version = SAMLVersion(SAMLVersion.VERSION_20)
75           
76            response.status = Status()
77            response.status.statusCode = StatusCode()
78            response.status.statusCode.value = StatusCode.SUCCESS_URI
79            response.status.statusMessage = StatusMessage()       
80            response.status.statusMessage.value = \
81                                                "Response created successfully"
82               
83            assertion = Assertion()
84            assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
85            assertion.id = str(uuid4())
86            assertion.issueInstant = now
87           
88            authzDecisionStatement = AuthzDecisionStatement()
89           
90            # Make some simple logic to simulate a full access policy
91            if query.resource == self.__class__.RESOURCE_URI:
92                if query.actions[0].value == Action.HTTP_GET_ACTION:
93                    authzDecisionStatement.decision = DecisionType.PERMIT
94                else:
95                    authzDecisionStatement.decision = DecisionType.DENY
96            else:
97                authzDecisionStatement.decision = DecisionType.INDETERMINATE
98               
99            authzDecisionStatement.resource = query.resource
100               
101            authzDecisionStatement.actions.append(Action())
102            authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
103            authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
104            assertion.authzDecisionStatements.append(authzDecisionStatement)
105           
106            # Add a conditions statement for a validity of 8 hours
107            assertion.conditions = Conditions()
108            assertion.conditions.notBefore = now
109            assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8)
110                   
111            assertion.subject = Subject() 
112            assertion.subject.nameID = NameID()
113            assertion.subject.nameID.format = query.subject.nameID.format
114            assertion.subject.nameID.value = query.subject.nameID.value
115               
116            assertion.issuer = Issuer()
117            assertion.issuer.format = Issuer.X509_SUBJECT
118            assertion.issuer.value = \
119                                    TestAuthorisationServiceMiddleware.ISSUER_DN
120   
121            response.assertions.append(assertion)
122            return response
123       
124        return authzDecisionQuery
125
126class RedirectFollowingAccessDenied(PEPResultHandlerMiddleware):
127   
128    @NDGSecurityMiddlewareBase.initCall
129    def __call__(self, environ, start_response):
130       
131        queryString = environ.get('QUERY_STRING', '')
132        if 'admin=1' in queryString:
133            # User has been rejected access to a URI requiring admin rights,
134            # try redirect to the same URI minus the admin query arg, this
135            # request will pass because admin rights aren't needed
136            queryArgs = queryString.split('&')
137            queryList = [arg for arg in queryArgs if arg != 'admin=1']
138            editedQuery = '&'.join(queryList)
139            redirectURI = urlunsplit(('', '', self.pathInfo, editedQuery, ''))
140            return self.redirect(redirectURI)
141        else:
142            return super(RedirectFollowingAccessDenied, self).__call__(
143                                                            environ,
144                                                            start_response)
145
146       
147class TestAuthZMiddleware(object):
148    '''Test Application for the Authentication handler to protect'''
149    response = "Test Authorization application"
150       
151    def __init__(self, app_conf, **local_conf):
152        pass
153   
154    def __call__(self, environ, start_response):
155       
156        if environ['PATH_INFO'] == '/test_401':
157            status = "401 Unauthorized"
158           
159        elif environ['PATH_INFO'] == '/test_403':
160            status = "403 Forbidden"
161           
162        elif environ['PATH_INFO'] == '/test_200':
163            status = "200 OK"
164           
165        elif environ['PATH_INFO'] == '/test_accessDeniedToSecuredURI':
166            # Nb. AuthZ middleware should intercept the request and bypass this
167            # response
168            status = "200 OK"
169           
170        elif environ['PATH_INFO'] == '/test_accessGrantedToSecuredURI':
171            status = "200 OK"
172        else:
173            status = "404 Not found"
174               
175        start_response(status,
176                       [('Content-length', 
177                         str(len(TestAuthZMiddleware.response))),
178                        ('Content-type', 'text/plain')])
179        return [TestAuthZMiddleware.response]
180
181
182class BeakerSessionStub(dict):
183    """Emulate beaker.session session object for purposes of the unit tests
184    """
185    def save(self):
186        pass
187 
188   
189class SamlWSGIAuthZTestCase(BaseTestCase):
190    INI_FILE = 'saml-test.ini'
191    THIS_DIR = path.dirname(path.abspath(__file__))
192    SESSION_KEYNAME = 'beaker.session.ndg.security'
193   
194    def __init__(self, *args, **kwargs):       
195        BaseTestCase.__init__(self, *args, **kwargs)
196
197       
198        wsgiapp = loadapp('config:'+SamlWSGIAuthZTestCase.INI_FILE, 
199                          relative_to=SamlWSGIAuthZTestCase.THIS_DIR)
200        self.app = paste.fixture.TestApp(wsgiapp)
201       
202        self.startSiteAAttributeAuthority(withSSL=True,
203            port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
204       
205        self.startAuthorisationService()
206
207    def test01CatchNoBeakerSessionFound(self):
208       
209        # PEPFilterConfigError is raised if no beaker.session is set in
210        # environ
211        self.assertRaises(SamlPepMiddlewareConfigError, self.app.get, 
212                          '/test_200')
213       
214    def test02Ensure200WithNotLoggedInAndUnsecuredURI(self):
215       
216        # Check the authZ middleware leaves the response alone if the URI
217        # is not matched in the policy
218       
219        # Simulate a beaker.session in the environ
220        extra_environ={self.__class__.SESSION_KEYNAME:BeakerSessionStub()}
221        response = self.app.get('/test_200',
222                                extra_environ=extra_environ)
223
224    def test03Catch401WithLoggedIn(self):
225       
226        # Check that the application being secured can raise a HTTP 401
227        # response and that this respected by the Authorization middleware
228        # even though a user is set in the session
229       
230        extra_environ = {
231            self.__class__.SESSION_KEYNAME:
232                BeakerSessionStub(username=self.__class__.OPENID_URI),
233            'REMOTE_USER': self.__class__.OPENID_URI
234        }
235        response = self.app.get('/test_401', 
236                                extra_environ=extra_environ,
237                                status=401)
238
239    def test04Catch403WithLoggedIn(self):
240       
241        # Check that the application being secured can raise a HTTP 403
242        # response and that this respected by the Authorization middleware
243        # even though a user is set in the session
244       
245        extra_environ = {
246            self.__class__.SESSION_KEYNAME:
247                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
248        }
249        response = self.app.get('/test_403', 
250                                extra_environ=extra_environ,
251                                status=403)
252
253    def test05Catch401WithNotLoggedInAndSecuredURI(self):
254       
255        # AuthZ middleware grants access because the URI requested is not
256        # targeted in the policy
257       
258        # AuthZ middleware checks for username key in session set by AuthN
259        # handler
260        extra_environ={self.__class__.SESSION_KEYNAME:BeakerSessionStub()}       
261        response = self.app.get('/test_accessDeniedToSecuredURI',
262                                extra_environ=extra_environ,
263                                status=401)
264       
265    def test06AccessDeniedForSecuredURI(self):
266       
267        # User is logged in but doesn't have the required credentials for
268        # access
269        extra_environ = {
270            self.__class__.SESSION_KEYNAME:
271                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
272        }
273       
274        response = self.app.get('/test_accessDeniedToSecuredURI',
275                                extra_environ=extra_environ,
276                                status=403)
277        print response
278        self.assert_("Insufficient privileges to access the "
279                     "resource" in response)
280
281    def test07AccessGrantedForSecuredURI(self):
282       
283        # User is logged in and has credentials for access to a URI secured
284        # by the policy file
285        extra_environ = {
286            self.__class__.SESSION_KEYNAME:
287                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
288        }
289       
290        response = self.app.get('/test_accessGrantedToSecuredURI',
291                                extra_environ=extra_environ,
292                                status=200)
293        self.assert_(TestAuthZMiddleware.response in response)
294        print response
295
296    def test08AccessDeniedForAdminQueryArg(self):
297       
298        # User is logged in but doesn't have the required credentials for
299        # access
300        extra_environ = {
301            self.__class__.SESSION_KEYNAME:
302                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
303        }
304       
305        # Try this URI with the query arg admin=1.  This will be picked up
306        # by the policy as a request requiring admin rights.  The request is
307        # denied as the user doesn't have these rights but this then calls
308        # into play the PEP result handler defined in this module,
309        # RedirectFollowingAccessDenied.  This class reinvokes the request
310        # but without the admin query argument.  Access is then granted for
311        # the redirected request
312        response = self.app.get('/test_accessGrantedToSecuredURI',
313                                params={'admin': 1},
314                                extra_environ=extra_environ,
315                                status=302)
316        try:
317            redirectResponse = response.follow(extra_environ=extra_environ)
318        except paste.fixture.AppError, e:
319            self.failIf(TestAuthZMiddleware.response not in response)
320        print response
321
322
323class PEPResultHandlerTestCase(BaseTestCase):
324    INI_FILE = 'pep-result-handler-test.ini'
325    THIS_DIR = os.path.dirname(os.path.abspath(__file__))
326    INI_FILEPATH = path.join(THIS_DIR, INI_FILE)
327    SESSION_KEYNAME = 'beaker.session.ndg.security'
328   
329    def __init__(self, *arg, **kw):
330        BaseTestCase.__init__(self, *arg, **kw)
331       
332        here_dir = os.path.dirname(os.path.abspath(__file__))
333        wsgiapp = loadapp('config:'+self.__class__.INI_FILE, 
334                          relative_to=self.__class__.THIS_DIR)
335        self.app = paste.fixture.TestApp(wsgiapp)
336       
337        cfg = SafeConfigParser(dict(here=self.__class__.THIS_DIR))
338        cfg.read(self.__class__.INI_FILEPATH)
339        self.redirectURI = cfg.get('filter:AuthZFilter', 
340                                   'authz.pepResultHandler.redirectURI')
341       
342        self.startSiteAAttributeAuthority(withSSL=True,
343            port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
344
345       
346    def testRedirectPEPResultHandlerMiddleware(self):
347        # User is logged in but doesn't have the required credentials for
348        # access
349        extra_environ = {
350            self.__class__.SESSION_KEYNAME:
351                        BeakerSessionStub(username=self.__class__.OPENID_URI)
352        }
353       
354        # Expecting redirect response to specified redirect URI
355        response = self.app.get('/test_accessDeniedToSecuredURI',
356                                extra_environ=extra_environ,
357                                status=302)
358        print(response)
359        self.assert_(response.header_dict.get('location') == self.redirectURI)
360       
361if __name__ == "__main__":
362    unittest.main()       
Note: See TracBrowser for help on using the repository browser.