source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/serverapp.py @ 4680

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/serverapp.py@4680
Revision 4680, 7.8 KB checked in by pjkersha, 11 years ago (diff)

Global replace to fix copyright from STFC & NERC to STFC alone because it's not possible to have copyright held by two orgs.

Line 
1#!/usr/bin/env python
2"""NDG Security test harness for combined Session Manager and Attribute
3Authority services running under a single Paste instance.
4
5NERC Data Grid Project
6
7This software may be distributed under the terms of the Q Public License,
8version 1.0 or later.
9"""
10__author__ = "P J Kershaw"
11__date__ = "20/11/08"
12__copyright__ = "(C) 2008 STFC"
13__contact__ = "Philip.Kershaw@stfc.ac.uk"
14__revision__ = "$Id$"
15import os
16from authkit.permissions import UserIn
17from authkit.authorize import authorize
18
19from ndg.security.server.wsgi.utils.sessionmanagerclient import \
20    WSGISessionManagerClient
21from ndg.security.server.wsgi.utils.attributeauthorityclient import \
22    WSGIAttributeAuthorityClient
23
24
25class HTTPBasicAuthentication(object):
26    '''Enable Authkit based HTTP Basic Authentication for test methods'''
27    def __init__(self):
28        self._userIn = UserIn([])
29       
30    def __call__(self, environ, username, password):
31        """validation function"""
32        try:
33            client = WSGISessionManagerClient(environ=environ)
34            res = client.connect(username, passphrase=password)
35
36            if username not in self._userIn.users:
37                self._userIn.users += [username]
38           
39            # Keep a reference to the session ID for test purposes
40            environ[client.environKey+'.user'] = res[-1]
41               
42        except Exception, e:
43            return False
44        else:
45            return True
46
47class CombinedServicesWSGI(object):
48    method = {
49"/": 'default',
50"/test_localSessionManagerConnect": "test_localSessionManagerConnect",
51"/test_localSessionManagerGetSessionStatus": "test_localSessionManagerGetSessionStatus",
52"/test_localSessionManagerDisconnect": "test_localSessionManagerDisconnect",
53"/test_localSessionManagerGetAttCert": "test_localSessionManagerGetAttCert",
54"/test_localAttributeAuthorityGetHostInfo": "test_localAttributeAuthorityGetHostInfo",
55"/test_localAttributeAuthorityGetTrustedHostInfo": "test_localAttributeAuthorityGetTrustedHostInfo",
56"/test_localAttributeAuthorityGetAllHostsInfo": "test_localAttributeAuthorityGetAllHostsInfo",
57"/test_localAttributeAuthorityGetAttCert": "test_localAttributeAuthorityGetAttCert"
58    }
59    httpBasicAuthentication = HTTPBasicAuthentication()
60
61    def __init__(self, app, globalConfig, **localConfig):
62        self.app = app
63       
64    def __call__(self, environ, start_response):
65       
66        methodName = self.method.get(environ['PATH_INFO'], '').rstrip()
67        if methodName:
68            action = getattr(self, methodName)
69            return action(environ, start_response)
70        elif self.app is not None:
71            return self.app(environ, start_response)
72        else:
73            start_response('404 Not Found', [('Content-type', 'text/plain')])
74            return "NDG Security Combined Services Unit tests: invalid URI"
75           
76    def default(self, environ, start_response):
77        start_response('200 OK', [('Content-type', 'text/plain')])
78        return "NDG Security Combined Services Unit Tests"
79
80    @authorize(httpBasicAuthentication._userIn)
81    def test_localSessionManagerConnect(self, environ, start_response):
82        start_response('200 OK', [('Content-type', 'text/plain')])
83        return "test_localSessionManagerConnect succeeded"
84       
85    @authorize(httpBasicAuthentication._userIn)
86    def test_localSessionManagerGetSessionStatus(self, environ,start_response):
87        client = WSGISessionManagerClient(environ=environ)
88        stat=client.getSessionStatus(sessID=environ[client.environKey+'.user'])
89        start_response('200 OK', [('Content-type', 'text/xml')])
90        return ("test_localSessionManagerGetSessionStatus succeeded. Response "
91                "= %s" % stat)
92
93    @authorize(httpBasicAuthentication._userIn)
94    def test_localSessionManagerDisconnect(self, environ, start_response):
95        client = WSGISessionManagerClient(environ=environ)
96        client.disconnect(sessID=environ[client.environKey+'.user'])
97       
98        # Re-initialise user authentication
99        CombinedServicesWSGI.httpBasicAuthentication._userIn.users = []
100        start_response('200 OK', [('Content-type', 'text/plain')])
101        return "test_localSessionManagerDisconnect succeeded."
102
103    @authorize(httpBasicAuthentication._userIn)
104    def test_localSessionManagerGetAttCert(self, environ, start_response):
105        client = WSGISessionManagerClient(environ=environ)
106        attCert = client.getAttCert(sessID=environ[client.environKey+'.user'])
107        start_response('200 OK', [('Content-type', 'text/xml')])
108        return str(attCert)
109
110    def test_localAttributeAuthorityGetHostInfo(self, environ, start_response):
111        client = WSGIAttributeAuthorityClient(environ=environ)
112        hostInfo = client.getHostInfo()
113        start_response('200 OK', [('Content-type', 'text/html')])
114        return ("test_localAttributeAuthorityGetHostInfo succeeded. Response "
115                "= %s" % hostInfo)
116
117    def test_localAttributeAuthorityGetTrustedHostInfo(self, 
118                                                       environ, 
119                                                       start_response):
120        client = WSGIAttributeAuthorityClient(environ=environ)
121        role = environ.get('QUERY_STRING', '').split('=')[-1] or None
122        hostInfo = client.getTrustedHostInfo(role=role)
123        start_response('200 OK', [('Content-type', 'text/html')])
124        return ("test_localAttributeAuthorityGetTrustedHostInfo succeeded. "
125                "Response = %s" % hostInfo)
126
127    def test_localAttributeAuthorityGetAllHostsInfo(self, 
128                                                    environ, 
129                                                    start_response):
130        client = WSGIAttributeAuthorityClient(environ=environ)
131        hostInfo = client.getAllHostsInfo()
132        start_response('200 OK', [('Content-type', 'text/html')])
133        return ("test_localAttributeAuthorityGetAllHostsInfo succeeded. "
134                "Response = %s" % hostInfo)
135
136    @authorize(httpBasicAuthentication._userIn)
137    def test_localAttributeAuthorityGetAttCert(self, environ, start_response):
138       
139        client = WSGIAttributeAuthorityClient(environ=environ)
140        username=CombinedServicesWSGI.httpBasicAuthentication._userIn.users[-1]
141       
142        attCert = client.getAttCert(userId=username)
143        start_response('200 OK', [('Content-type', 'text/xml')])
144        return str(attCert)
145
146def app_factory(globalConfig, **localConfig):
147    return CombinedServicesWSGI(None, globalConfig, **localConfig)
148
149def filter_app_factory(app, globalConfig, **localConfig):
150    return CombinedServicesWSGI(app, globalConfig, **localConfig)
151
152# Initialize environment for unit tests
153if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ:
154    os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'] = \
155                                    os.path.abspath(os.path.dirname(__file__))
156   
157# To start the Site A Attribute Authority run
158# $ paster serve site-a.ini or run this file as a script
159# $ ./siteAServerApp.py [port #]
160if __name__ == '__main__':
161    import sys
162    import logging
163    logging.basicConfig(level=logging.DEBUG)
164
165    if len(sys.argv) > 1:
166        port = int(sys.argv[1])
167    else:
168        port = 8000
169       
170    cfgFilePath = os.path.join(os.path.dirname(os.path.abspath(__file__)),
171                               'services.ini')
172       
173    from paste.httpserver import serve
174    from paste.deploy import loadapp
175   
176    from paste.urlparser import StaticURLParser
177    from paste.cascade import Cascade
178   
179    app = loadapp('config:%s' % cfgFilePath)
180    rootPath = os.path.join(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], 
181                            'openidprovider')
182   
183    # Include to enable stylesheet and graphics
184    staticURLParser = StaticURLParser(rootPath)
185    app2 = Cascade([staticURLParser, app])
186    serve(app2, host='0.0.0.0', port=port)
Note: See TracBrowser for help on using the repository browser.