source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/integration/authz/securedapp.py @ 5329

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/integration/authz/securedapp.py@5329
Revision 5329, 8.1 KB checked in by pjkersha, 11 years ago (diff)
  • Added AuthorizationMiddleware? unit tests: ndg.security.test.unit.wsgi.authz
  • Added check for ndg.security.server.wsgi.authz.PEPFilter to catch beaker session not set in a custom PEPFilter exception type
Line 
1#!/usr/bin/env python
2"""NDG Security test harness for authorisation middleware used to secure an
3application
4
5NERC DataGrid Project
6"""
7__author__ = "P J Kershaw"
8__date__ = "20/11/08"
9__copyright__ = "(C) 2009 Science and Technology Facilities Council"
10__license__ = "BSD - See top-level directory for LICENSE file"
11__contact__ = "Philip.Kershaw@stfc.ac.uk"
12__revision__ = "$Id$"
13
14   
15def app_factory(globalConfig, **localConfig):
16    '''AuthZTestMiddleware factory for Paste app pattern'''
17    return AuthZTestMiddleware(None, globalConfig, **localConfig)
18
19def filter_app_factory(app, globalConfig, **localConfig):
20    '''AuthZTestMiddleware factory for Paste filter app pattern'''
21    return AuthZTestMiddleware(app, globalConfig, **localConfig)
22
23class AuthZTestMiddleware(object):
24    """This class simulates the application to be secured by the NDG Security
25    authorization middleware
26    """
27    method = {
28"/": 'default',
29"/test_401": "test_401",
30"/test_403": "test_403",
31"/test_securedURI": "test_securedURI",
32"/test_accessDeniedToSecuredURI": "test_accessDeniedToSecuredURI"
33    }
34
35    def __init__(self, app, globalConfig, **localConfig):
36        self.app = app
37           
38    def __call__(self, environ, start_response):
39       
40        methodName = self.method.get(environ['PATH_INFO'], '').rstrip()
41        if methodName:
42            action = getattr(self, methodName)
43            return action(environ, start_response)
44        elif environ['PATH_INFO'] == '/logout':
45            return self.default(environ, start_response)
46       
47        elif self.app is not None:
48            return self.app(environ, start_response)
49        else:
50            start_response('404 Not Found', [('Content-type', 'text/plain')])
51            return "Authorisation integration tests: invalid URI"
52           
53    def default(self, environ, start_response):
54        if 'REMOTE_USER' in environ:
55            response = """<html>
56    <head/>
57    <body>
58        <h1>Authorisation integration tests:</h1>
59        <ul>%s</ul>
60        <p>You are logged in with OpenID [%s].  <a href="/logout">Logout</a></p>
61    </body>
62</html>
63""" % ('\n'.join(['<li><a href="%s">%s</a></li>' % (link, name) 
64                 for link,name in self.method.items() if name != 'default']),
65       environ['REMOTE_USER'])
66       
67            start_response('200 OK', 
68                           [('Content-type', 'text/html'),
69                            ('Content-length', str(len(response)))])
70        else:
71            response = """<html>
72    <head/>
73    <body>
74        <h1>Authorisation integration tests:</h1>
75        <ul>%s</ul>
76    </body>
77</html>
78""" % '\n'.join(['<li><a href="%s">%s</a></li>' % (link, name) 
79                 for link,name in self.method.items() if name != 'default'])
80
81            start_response('200 OK', 
82                           [('Content-type', 'text/html'),
83                            ('Content-length', str(len(response)))])
84        return response
85
86    def test_401(self, environ, start_response):
87        if 'REMOTE_USER' in environ:
88            response = """<html>
89    <head/>
90    <body>
91        <h1>Authenticated!</h1>
92        <ul>%s</ul>
93        <p>You are logged in.  <a href="/logout">Logout</a></p>
94    </body>
95</html>
96""" % '\n'.join(['<li><a href="%s">%s</a></li>' % (link, name) 
97                 for link,name in self.method.items() if name != 'default'])
98
99            start_response('200 OK', 
100                           [('Content-type', 'text/html'),
101                            ('Content-length', str(len(response)))])
102        else:
103            response = "Trigger OpenID Relying Party..."
104            start_response('401 Unauthorized', 
105                           [('Content-type', 'text/plain'),
106                            ('Content-length', str(len(response)))])
107        return response
108
109    def test_403(self, environ, start_response):
110        """Trigger the Authorization middleware by returning a 403 Forbidden
111        HTTP status code from this URI"""
112       
113        if 'REMOTE_USER' in environ:
114            response = """<html>
115    <head/>
116    <body>
117        <h1>Authorised!</h1>
118        <ul>%s</ul>
119        <p>You are logged in with OpenID [%s].  <a href="/logout">Logout</a></p>
120    </body>
121</html>
122""" % ('\n'.join(['<li><a href="%s">%s</a></li>' % (link, name) 
123                 for link,name in self.method.items() if name != 'default']),
124       environ['REMOTE_USER'])
125
126            start_response('200 OK', 
127                           [('Content-type', 'text/html'),
128                            ('Content-length', str(len(response)))])
129        else:
130            response = ("Authorization middleware is triggered becuase this "
131                        "page returns a 403 Forbidden status.")
132            start_response('403 Forbidden', 
133                           [('Content-type', 'text/plain'),
134                            ('Content-length', str(len(response)))])
135        return response
136
137    def test_securedURI(self, environ, start_response):
138        """To be secured, the Authorization middleware must have this URI in
139        its policy"""
140        if 'REMOTE_USER' in environ:
141            response = """<html>
142    <head/>
143    <body>
144        <h1>Authorised for path [%s]!</h1>
145        <ul>%s</ul>
146        <p>You are logged in with OpenID [%s].  <a href="/logout">Logout</a></p>
147    </body>
148</html>
149""" % (environ['PATH_INFO'],
150       '\n'.join(['<li><a href="%s">%s</a></li>' % (link, name) 
151                 for link,name in self.method.items() if name != 'default']),
152       environ['REMOTE_USER'])
153
154
155            start_response('200 OK', 
156                           [('Content-type', 'text/html'),
157                            ('Content-length', str(len(response)))])
158        else:
159            response = ("Authorization middleware must have this URI in its "
160                        "policy in order to secure it!")
161            start_response('200 OK', 
162                           [('Content-type', 'text/plain'),
163                            ('Content-length', str(len(response)))])
164        return response
165
166
167    def test_accessDeniedToSecuredURI(self, environ, start_response):
168        """To be secured, the Authorization middleware must have this URI in
169        its policy and the user must not have the required role as specified
170        in the policy.  See ndg.security.test.config.attributeauthority.sitea
171        for user role settings retrieved from the attribute authority"""
172        if 'REMOTE_USER' in environ:
173            response = """<html>
174    <head/>
175    <body>
176        <h1>Authorised for path [%s]!</h1>
177        <ul>%s</ul>
178        <p>You are logged in with OpenID [%s].  <a href="/logout">Logout</a></p>
179    </body>
180</html>
181""" % (environ['PATH_INFO'],
182       '\n'.join(['<li><a href="%s">%s</a></li>' % (link, name) 
183                 for link,name in self.method.items() if name != 'default']),
184       environ['REMOTE_USER'])
185
186
187            start_response('200 OK', 
188                           [('Content-type', 'text/html'),
189                            ('Content-length', str(len(response)))])
190        else:
191            response = ("Authorization middleware must have this URI in its "
192                        "policy in order to secure it!")
193            start_response('200 OK', 
194                           [('Content-type', 'text/plain'),
195                            ('Content-length', str(len(response)))])
196        return response
197   
198    @classmethod
199    def app_factory(cls, globalConfig, **localConfig):
200        return cls(None, globalConfig, **localConfig)
201   
202    @classmethod
203    def filter_app_factory(cls, app, globalConfig, **localConfig):
204        return cls(app, globalConfig, **localConfig)
205   
206# To start run
207# $ paster serve services.ini or run this file as a script
208# $ ./securedapp.py [port #]
209if __name__ == '__main__':
210    import sys
211    import os
212    from os.path import dirname, abspath
213    import logging
214    logging.basicConfig(level=logging.DEBUG)
215
216    if len(sys.argv) > 1:
217        port = int(sys.argv[1])
218    else:
219        port = 7080
220       
221    cfgFilePath = os.path.join(dirname(abspath(__file__)), 'securedapp.ini')
222       
223    from paste.httpserver import serve
224    from paste.deploy import loadapp
225   
226    app = loadapp('config:%s' % cfgFilePath)
227    serve(app, host='0.0.0.0', port=port)
Note: See TracBrowser for help on using the repository browser.