source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py @ 6284

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py@6284
Revision 6284, 16.9 KB checked in by pjkersha, 10 years ago (diff)

Preparing 1.4 release.

Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI Authorization handler
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/05/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12import logging
13
14
15import unittest
16import os
17from urlparse import urlunsplit
18
19from os.path import expandvars as xpdVars
20from os.path import join as jnPath
21mkPath = lambda file: jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], 
22                             file)
23from ConfigParser import SafeConfigParser
24
25import paste.fixture
26from paste.deploy import loadapp
27
28from ndg.security.test.unit import BaseTestCase
29from ndg.security.server.wsgi import NDGSecurityMiddlewareBase
30from ndg.security.server.wsgi.authz.result_handler.basic import \
31    PEPResultHandlerMiddleware
32from ndg.security.server.wsgi.authz.result_handler.redirect import \
33    HTTPRedirectPEPResultHandlerMiddleware
34from ndg.security.server.wsgi.authz import (NdgPIPMiddlewareConfigError,
35                                            SamlPIPMiddlewareConfigError)
36from ndg.security.common.authz.msi import Response
37
38
39class RedirectFollowingAccessDenied(PEPResultHandlerMiddleware):
40   
41    @NDGSecurityMiddlewareBase.initCall
42    def __call__(self, environ, start_response):
43       
44        queryString = environ.get('QUERY_STRING', '')
45        if 'admin=1' in queryString:
46            # User has been rejected access to a URI requiring admin rights,
47            # try redirect to the same URI minus the admin query arg, this
48            # request will pass because admin rights aren't needed
49            queryArgs = queryString.split('&')
50            queryList = [arg for arg in queryArgs if arg != 'admin=1']
51            editedQuery = '&'.join(queryList)
52            redirectURI = urlunsplit(('', '', self.pathInfo, editedQuery, ''))
53            return self.redirect(redirectURI)
54        else:
55            return super(RedirectFollowingAccessDenied, self).__call__(
56                                                            environ,
57                                                            start_response)
58
59       
60class TestAuthZMiddleware(object):
61    '''Test Application for the Authentication handler to protect'''
62    response = "Test Authorization application"
63       
64    def __init__(self, app_conf, **local_conf):
65        pass
66   
67    def __call__(self, environ, start_response):
68       
69        if environ['PATH_INFO'] == '/test_401':
70            status = "401 Unauthorized"
71           
72        elif environ['PATH_INFO'] == '/test_403':
73            status = "403 Forbidden"
74           
75        elif environ['PATH_INFO'] == '/test_200':
76            status = "200 OK"
77           
78        elif environ['PATH_INFO'] == '/test_accessDeniedToSecuredURI':
79            # Nb. AuthZ middleware should intercept the request and bypass this
80            # response
81            status = "200 OK"
82           
83        elif environ['PATH_INFO'] == '/test_accessGrantedToSecuredURI':
84            status = "200 OK"
85        else:
86            status = "404 Not found"
87               
88        start_response(status,
89                       [('Content-length', 
90                         str(len(TestAuthZMiddleware.response))),
91                        ('Content-type', 'text/plain')])
92        return [TestAuthZMiddleware.response]
93
94
95class BeakerSessionStub(dict):
96    """Emulate beaker.session session object for purposes of the unit tests
97    """
98    def save(self):
99        pass
100 
101   
102class NdgWSGIAuthZTestCase(BaseTestCase):
103    INI_FILE = 'ndg-test.ini'
104    THIS_DIR = os.path.dirname(os.path.abspath(__file__))
105    def __init__(self, *args, **kwargs):
106        BaseTestCase.__init__(self, *args, **kwargs)
107       
108       
109        wsgiapp = loadapp('config:'+NdgWSGIAuthZTestCase.INI_FILE, 
110                          relative_to=NdgWSGIAuthZTestCase.THIS_DIR)
111        self.app = paste.fixture.TestApp(wsgiapp)
112       
113        self.startSiteAAttributeAuthority()
114       
115    def test01CatchNoBeakerSessionFound(self):
116       
117        # PEPFilterConfigError is raised if no beaker.session is set in
118        # environ
119        try:
120            response = self.app.get('/test_200')
121        except NdgPIPMiddlewareConfigError, e:
122            print("ok - expected: %s exception: %s" % (e.__class__, e))
123       
124    def test02Ensure200WithNotLoggedInAndUnsecuredURI(self):
125       
126        # Check the authZ middleware leaves the response alone if the URI
127        # is not matched in the policy
128       
129        # Simulate a beaker.session in the environ
130        extra_environ={'beaker.session.ndg.security':BeakerSessionStub()}
131        response = self.app.get('/test_200',
132                                extra_environ=extra_environ)
133
134    def test03Catch401WithLoggedIn(self):
135       
136        # Check that the application being secured can raise a HTTP 401
137        # response and that this respected by the Authorization middleware
138        # even though a user is set in the session
139       
140        extra_environ={'beaker.session.ndg.security':
141                       BeakerSessionStub(username='testuser')}
142        response = self.app.get('/test_401', 
143                                extra_environ=extra_environ,
144                                status=401)
145
146    def test04Catch403WithLoggedIn(self):
147       
148        # Check that the application being secured can raise a HTTP 403
149        # response and that this respected by the Authorization middleware
150        # even though a user is set in the session
151       
152        extra_environ={'beaker.session.ndg.security':
153                       BeakerSessionStub(username='testuser')}
154        response = self.app.get('/test_403', 
155                                extra_environ=extra_environ,
156                                status=403)
157
158    def test05Catch401WithNotLoggedInAndSecuredURI(self):
159       
160        # AuthZ middleware grants access because the URI requested is not
161        # targeted in the policy
162       
163        # AuthZ middleware checks for username key in session set by AuthN
164        # handler
165        extra_environ={'beaker.session.ndg.security':BeakerSessionStub()}       
166        response = self.app.get('/test_accessDeniedToSecuredURI',
167                                extra_environ=extra_environ,
168                                status=401)
169       
170    def test06AccessDeniedForSecuredURI(self):
171       
172        # User is logged in but doesn't have the required credentials for
173        # access
174        extra_environ={'beaker.session.ndg.security':
175                       BeakerSessionStub(username='testuser')}
176       
177        response = self.app.get('/test_accessDeniedToSecuredURI',
178                                extra_environ=extra_environ,
179                                status=403)
180        self.assert_("Insufficient privileges to access the "
181                     "resource" in response)
182        print response
183
184    def test07AccessGrantedForSecuredURI(self):
185       
186        # User is logged in and has credentials for access to a URI secured
187        # by the policy file
188        extra_environ={'beaker.session.ndg.security':
189                       BeakerSessionStub(username='testuser')}
190       
191        response = self.app.get('/test_accessGrantedToSecuredURI',
192                                extra_environ=extra_environ,
193                                status=200)
194        self.assert_(TestAuthZMiddleware.response in response)
195        print response
196
197    def test08AccessDeniedForAdminQueryArg(self):
198       
199        # User is logged in but doesn't have the required credentials for
200        # access
201        extra_environ={'beaker.session.ndg.security':
202                       BeakerSessionStub(username='testuser')}
203       
204        # Try this URI with the query arg admin=1.  This will be picked up
205        # by the policy as a request requiring admin rights.  The request is
206        # denied as the user doesn't have these rights but this then calls
207        # into play the PEP result handler defined in this module,
208        # RedirectFollowingAccessDenied.  This class reinvokes the request
209        # but without the admin query argument.  Access is then granted for
210        # the redirected request
211        response = self.app.get('/test_accessGrantedToSecuredURI',
212                                params={'admin': 1},
213                                extra_environ=extra_environ,
214                                status=302)
215        try:
216            redirectResponse = response.follow(extra_environ=extra_environ)
217        except paste.fixture.AppError, e:
218            self.failIf(TestAuthZMiddleware.response not in response)
219        print response
220
221       
222class TestAuthZMiddleware(object):
223    '''Test Application for the Authentication handler to protect'''
224    response = "Test Authorization application"
225       
226    def __init__(self, app_conf, **local_conf):
227        pass
228   
229    def __call__(self, environ, start_response):
230       
231        if environ['PATH_INFO'] == '/test_401':
232            status = "401 Unauthorized"
233           
234        elif environ['PATH_INFO'] == '/test_403':
235            status = "403 Forbidden"
236           
237        elif environ['PATH_INFO'] == '/test_200':
238            status = "200 OK"
239           
240        elif environ['PATH_INFO'] == '/test_accessDeniedToSecuredURI':
241            # Nb. AuthZ middleware should intercept the request and bypass this
242            # response
243            status = "200 OK"
244           
245        elif environ['PATH_INFO'] == '/test_accessGrantedToSecuredURI':
246            status = "200 OK"
247        else:
248            status = "404 Not found"
249               
250        start_response(status,
251                       [('Content-length', 
252                         str(len(TestAuthZMiddleware.response))),
253                        ('Content-type', 'text/plain')])
254        return [TestAuthZMiddleware.response]
255
256
257class BeakerSessionStub(dict):
258    """Emulate beaker.session session object for purposes of the unit tests
259    """
260    def save(self):
261        pass
262 
263   
264class SamlWSGIAuthZTestCase(BaseTestCase):
265    INI_FILE = 'saml-test.ini'
266    THIS_DIR = os.path.dirname(os.path.abspath(__file__))
267    def __init__(self, *args, **kwargs):       
268        BaseTestCase.__init__(self, *args, **kwargs)
269
270       
271        wsgiapp = loadapp('config:'+SamlWSGIAuthZTestCase.INI_FILE, 
272                          relative_to=SamlWSGIAuthZTestCase.THIS_DIR)
273        self.app = paste.fixture.TestApp(wsgiapp)
274       
275        self.startSiteAAttributeAuthority(withSSL=True,
276            port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
277       
278
279    def test01CatchNoBeakerSessionFound(self):
280       
281        # PEPFilterConfigError is raised if no beaker.session is set in
282        # environ
283        try:
284            response = self.app.get('/test_200')
285        except SamlPIPMiddlewareConfigError, e:
286            print("ok - expected: %s exception: %s" % (e.__class__, e))
287       
288    def test02Ensure200WithNotLoggedInAndUnsecuredURI(self):
289       
290        # Check the authZ middleware leaves the response alone if the URI
291        # is not matched in the policy
292       
293        # Simulate a beaker.session in the environ
294        extra_environ={'beaker.session.ndg.security':BeakerSessionStub()}
295        response = self.app.get('/test_200',
296                                extra_environ=extra_environ)
297
298    def test03Catch401WithLoggedIn(self):
299       
300        # Check that the application being secured can raise a HTTP 401
301        # response and that this respected by the Authorization middleware
302        # even though a user is set in the session
303       
304        extra_environ = {
305            'beaker.session.ndg.security':
306                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
307        }
308        response = self.app.get('/test_401', 
309                                extra_environ=extra_environ,
310                                status=401)
311
312    def test04Catch403WithLoggedIn(self):
313       
314        # Check that the application being secured can raise a HTTP 403
315        # response and that this respected by the Authorization middleware
316        # even though a user is set in the session
317       
318        extra_environ = {
319            'beaker.session.ndg.security':
320                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
321        }
322        response = self.app.get('/test_403', 
323                                extra_environ=extra_environ,
324                                status=403)
325
326    def test05Catch401WithNotLoggedInAndSecuredURI(self):
327       
328        # AuthZ middleware grants access because the URI requested is not
329        # targeted in the policy
330       
331        # AuthZ middleware checks for username key in session set by AuthN
332        # handler
333        extra_environ={'beaker.session.ndg.security':BeakerSessionStub()}       
334        response = self.app.get('/test_accessDeniedToSecuredURI',
335                                extra_environ=extra_environ,
336                                status=401)
337       
338    def test06AccessDeniedForSecuredURI(self):
339       
340        # User is logged in but doesn't have the required credentials for
341        # access
342        extra_environ = {
343            'beaker.session.ndg.security':
344                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
345        }
346       
347        response = self.app.get('/test_accessDeniedToSecuredURI',
348                                extra_environ=extra_environ,
349                                status=403)
350        self.assert_("Insufficient privileges to access the "
351                     "resource" in response)
352        print response
353
354    def test07AccessGrantedForSecuredURI(self):
355       
356        # User is logged in and has credentials for access to a URI secured
357        # by the policy file
358        extra_environ = {
359            'beaker.session.ndg.security':
360                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
361        }
362       
363        response = self.app.get('/test_accessGrantedToSecuredURI',
364                                extra_environ=extra_environ,
365                                status=200)
366        self.assert_(TestAuthZMiddleware.response in response)
367        print response
368
369    def test08AccessDeniedForAdminQueryArg(self):
370       
371        # User is logged in but doesn't have the required credentials for
372        # access
373        extra_environ = {
374            'beaker.session.ndg.security':
375                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
376        }
377       
378        # Try this URI with the query arg admin=1.  This will be picked up
379        # by the policy as a request requiring admin rights.  The request is
380        # denied as the user doesn't have these rights but this then calls
381        # into play the PEP result handler defined in this module,
382        # RedirectFollowingAccessDenied.  This class reinvokes the request
383        # but without the admin query argument.  Access is then granted for
384        # the redirected request
385        response = self.app.get('/test_accessGrantedToSecuredURI',
386                                params={'admin': 1},
387                                extra_environ=extra_environ,
388                                status=302)
389        try:
390            redirectResponse = response.follow(extra_environ=extra_environ)
391        except paste.fixture.AppError, e:
392            self.failIf(TestAuthZMiddleware.response not in response)
393        print response
394
395
396class PEPResultHandlerTestCase(BaseTestCase):
397    INI_FILE = 'pep-result-handler-test.ini'
398    THIS_DIR = os.path.dirname(os.path.abspath(__file__))
399    INI_FILEPATH = jnPath(THIS_DIR, INI_FILE)
400   
401    def __init__(self, *arg, **kw):
402        BaseTestCase.__init__(self, *arg, **kw)
403       
404        here_dir = os.path.dirname(os.path.abspath(__file__))
405        wsgiapp = loadapp('config:'+PEPResultHandlerTestCase.INI_FILE, 
406                          relative_to=PEPResultHandlerTestCase.THIS_DIR)
407        self.app = paste.fixture.TestApp(wsgiapp)
408       
409        cfg = SafeConfigParser(dict(here=PEPResultHandlerTestCase.THIS_DIR))
410        cfg.read(jnPath(PEPResultHandlerTestCase.INI_FILEPATH))
411        self.redirectURI = cfg.get('filter:AuthZFilter', 
412                                   'authz.pepResultHandler.redirectURI')
413       
414        self.startSiteAAttributeAuthority(withSSL=True,
415            port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
416
417       
418    def testRedirectPEPResultHandlerMiddleware(self):
419        # User is logged in but doesn't have the required credentials for
420        # access
421        extra_environ = {
422            'beaker.session.ndg.security':
423                BeakerSessionStub(username=PEPResultHandlerTestCase.OPENID_URI)
424        }
425       
426        # Expecting redirect response to specified redirect URI
427        response = self.app.get('/test_accessDeniedToSecuredURI',
428                                extra_environ=extra_environ,
429                                status=302)
430        print(response)
431        self.assert_(response.header_dict.get('location') == self.redirectURI)
432       
433if __name__ == "__main__":
434    unittest.main()       
Note: See TracBrowser for help on using the repository browser.