source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/integration/authz/securityservicesapp.py @ 5037

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/integration/authz/securityservicesapp.py@5039
Revision 5037, 4.1 KB checked in by pjkersha, 11 years ago (diff)

ndg.security.server.wsgi.authn: AuthNMiddleware and AuthNRedirectMiddleware catch HTTP 401 responses from a WSGI application stack to be protected and redirect to OpenID Relying Party middleware running on an application server running NDG Security services.

Line 
1#!/usr/bin/env python
2"""NDG Security test harness for authorisation middleware
3
4NERC DataGrid Project
5
6"""
7__author__ = "P J Kershaw"
8__date__ = "20/11/08"
9__copyright__ = "(C) 2009 Science and Technology Facilities Council"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = "$Id$"
12import os
13from os.path import dirname, abspath, join
14
15class AuthZTestMiddleware(object):
16    method = {
17"/": 'default',
18"/test_401": "test_401",
19"/test_403": "test_403",
20"/test_securedURI": "test_securedURI"
21    }
22
23    def __init__(self, app, globalConfig, **localConfig):
24        self.app = app
25           
26    def __call__(self, environ, start_response):
27       
28        methodName = self.method.get(environ['PATH_INFO'], '').rstrip()
29        if methodName:
30            action = getattr(self, methodName)
31            return action(environ, start_response)
32        elif self.app is not None:
33            return self.app(environ, start_response)
34        else:
35            start_response('404 Not Found', [('Content-type', 'text/plain')])
36            return "Authorisation integration tests: invalid URI"
37           
38    def default(self, environ, start_response):
39        if 'REMOTE_USER' in environ:
40            response = """<html>
41    <head/>
42    <body>
43        <p>Authenticated!</p>
44        <p><a href="/logout">logout</a></p>
45    </body>
46</html>"""
47            start_response('200 OK', 
48                           [('Content-type', 'text/html'),
49                            ('Content-length', str(len(response)))])
50        else:
51            response = "Authorisation integration tests"
52            start_response('200 OK', 
53                           [('Content-type', 'text/html'),
54                            ('Content-length', str(len(response)))])
55        return response
56
57    def test_401(self, environ, start_response):
58        if 'REMOTE_USER' in environ:
59            response = """<html>
60    <head/>
61    <body>
62        <p>Authenticated!</p>
63        <p><a href="/logout">logout</a></p>
64    </body>
65</html>"""
66            start_response('200 OK', 
67                           [('Content-type', 'text/html'),
68                            ('Content-length', str(len(response)))])
69        else:
70            response = "Trigger OpenID Relying Party..."
71            start_response('401 Unauthorized', 
72                           [('Content-type', 'text/plain'),
73                            ('Content-length', str(len(response)))])
74        return response
75
76    def test_403(self, environ, start_response):
77        if 'REMOTE_USER' in environ:
78            response = """<html>
79    <head/>
80    <body>
81        <p>Authorised!</p>
82        <p><a href="/logout">logout</a></p>
83    </body>
84</html>"""
85            start_response('200 OK', 
86                           [('Content-type', 'text/html'),
87                            ('Content-length', str(len(response)))])
88        else:
89            response = "Trigger AuthZ..."
90            start_response('403 Forbidden', 
91                           [('Content-type', 'text/plain'),
92                            ('Content-length', str(len(response)))])
93        return response
94
95    def test_securedURI(self, environ, start_response):
96        response = "Access allowed"
97        start_response('200 OK', 
98                       [('Content-type', 'text/plain'),
99                        ('Content-length', str(len(response)))])
100        return response
101   
102def app_factory(globalConfig, **localConfig):
103    return AuthZTestMiddleware(None, globalConfig, **localConfig)
104
105def filter_app_factory(app, globalConfig, **localConfig):
106    return AuthZTestMiddleware(app, globalConfig, **localConfig)
107   
108# To start run
109# $ paster serve services.ini or run this file as a script
110# $ ./serverapp.py [port #]
111if __name__ == '__main__':
112    import sys
113    import logging
114    logging.basicConfig(level=logging.DEBUG)
115
116    if len(sys.argv) > 1:
117        port = int(sys.argv[1])
118    else:
119        port = 7443
120       
121    cfgFilePath = os.path.join(dirname(abspath(__file__)), 
122                               'securityservices.ini')
123       
124    from paste.httpserver import serve
125    from paste.deploy import loadapp
126   
127    app = loadapp('config:%s' % cfgFilePath)
128    serve(app, host='0.0.0.0', port=port)
Note: See TracBrowser for help on using the repository browser.