source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/serverapp.py @ 4739

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/serverapp.py@4739
Revision 4739, 9.0 KB checked in by pjkersha, 11 years ago (diff)

Refactored x509, xmlsec, XMLSecDoc and combinedservices unit tests separating out test files into the config dir.

Line 
1#!/usr/bin/env python
2"""NDG Security test harness for combined Session Manager and Attribute
3Authority services running under a single Paste instance.
4
5NERC Data Grid Project
6
7This software may be distributed under the terms of the Q Public License,
8version 1.0 or later.
9"""
10__author__ = "P J Kershaw"
11__date__ = "20/11/08"
12__copyright__ = "(C) 2008 STFC"
13__contact__ = "Philip.Kershaw@stfc.ac.uk"
14__revision__ = "$Id$"
15import os
16from os.path import dirname, abspath, join
17from authkit.permissions import UserIn
18from authkit.authorize import authorize
19
20from ndg.security.server.wsgi.utils.sessionmanagerclient import \
21    WSGISessionManagerClient
22from ndg.security.server.wsgi.utils.attributeauthorityclient import \
23    WSGIAttributeAuthorityClient
24
25
26class HTTPBasicAuthentication(object):
27    '''Enable Authkit based HTTP Basic Authentication for test methods'''
28    def __init__(self):
29        self._userIn = UserIn([])
30       
31    def __call__(self, environ, username, password):
32        """validation function"""
33        try:
34            client = WSGISessionManagerClient(environ=environ,
35                                        environKey=self.sessionManagerFilterID)
36            res = client.connect(username, passphrase=password)
37
38            if username not in self._userIn.users:
39                self._userIn.users += [username]
40           
41            # Keep a reference to the session ID for test purposes
42            environ[client.environKey+'.user'] = res[-1]
43               
44        except Exception, e:
45            return False
46        else:
47            return True
48
49class CombinedServicesWSGI(object):
50    method = {
51"/": 'default',
52"/test_localSessionManagerConnect": "test_localSessionManagerConnect",
53"/test_localSessionManagerGetSessionStatus": "test_localSessionManagerGetSessionStatus",
54"/test_localSessionManagerDisconnect": "test_localSessionManagerDisconnect",
55"/test_localSessionManagerGetAttCert": "test_localSessionManagerGetAttCert",
56"/test_localAttributeAuthorityGetHostInfo": "test_localAttributeAuthorityGetHostInfo",
57"/test_localAttributeAuthorityGetTrustedHostInfo": "test_localAttributeAuthorityGetTrustedHostInfo",
58"/test_localAttributeAuthorityGetAllHostsInfo": "test_localAttributeAuthorityGetAllHostsInfo",
59"/test_localAttributeAuthorityGetAttCert": "test_localAttributeAuthorityGetAttCert"
60    }
61    httpBasicAuthentication = HTTPBasicAuthentication()
62
63    def __init__(self, app, globalConfig, **localConfig):
64        self.app = app
65        self.sessionManagerFilterID = localConfig.get('sessionManagerFilterID')
66        self.attributeAuthorityFilterID = \
67                                localConfig.get('attributeAuthorityFilterID')
68                               
69        CombinedServicesWSGI.httpBasicAuthentication.sessionManagerFilterID = \
70            self.sessionManagerFilterID
71           
72    def __call__(self, environ, start_response):
73       
74        methodName = self.method.get(environ['PATH_INFO'], '').rstrip()
75        if methodName:
76            action = getattr(self, methodName)
77            return action(environ, start_response)
78        elif self.app is not None:
79            return self.app(environ, start_response)
80        else:
81            start_response('404 Not Found', [('Content-type', 'text/plain')])
82            return "NDG Security Combined Services Unit tests: invalid URI"
83           
84    def default(self, environ, start_response):
85        start_response('200 OK', [('Content-type', 'text/plain')])
86        return "NDG Security Combined Services Unit Tests"
87
88    @authorize(httpBasicAuthentication._userIn)
89    def test_localSessionManagerConnect(self, environ, start_response):
90        start_response('200 OK', [('Content-type', 'text/plain')])
91        return "test_localSessionManagerConnect succeeded"
92       
93    @authorize(httpBasicAuthentication._userIn)
94    def test_localSessionManagerGetSessionStatus(self, environ,start_response):
95        client = WSGISessionManagerClient(environ=environ,
96                                        environKey=self.sessionManagerFilterID)
97        stat=client.getSessionStatus(sessID=environ[client.environKey+'.user'])
98        start_response('200 OK', [('Content-type', 'text/xml')])
99        return ("test_localSessionManagerGetSessionStatus succeeded. Response "
100                "= %s" % stat)
101
102    @authorize(httpBasicAuthentication._userIn)
103    def test_localSessionManagerDisconnect(self, environ, start_response):
104        client = WSGISessionManagerClient(environ=environ,
105                                        environKey=self.sessionManagerFilterID)
106        client.disconnect(sessID=environ[client.environKey+'.user'])
107       
108        # Re-initialise user authentication
109        CombinedServicesWSGI.httpBasicAuthentication._userIn.users = []
110        start_response('200 OK', [('Content-type', 'text/plain')])
111        return "test_localSessionManagerDisconnect succeeded."
112
113    @authorize(httpBasicAuthentication._userIn)
114    def test_localSessionManagerGetAttCert(self, environ, start_response):
115        client = WSGISessionManagerClient(environ=environ,
116                environKey=self.sessionManagerFilterID,
117                attributeAuthorityEnvironKey=self.attributeAuthorityFilterID)
118
119        attCert = client.getAttCert(sessID=environ[client.environKey+'.user'])
120        start_response('200 OK', [('Content-type', 'text/xml')])
121        return str(attCert)
122
123    def test_localAttributeAuthorityGetHostInfo(self, environ, start_response):
124        client = WSGIAttributeAuthorityClient(environ=environ,
125                                    environKey=self.attributeAuthorityFilterID)
126        hostInfo = client.getHostInfo()
127        start_response('200 OK', [('Content-type', 'text/html')])
128        return ("test_localAttributeAuthorityGetHostInfo succeeded. Response "
129                "= %s" % hostInfo)
130
131    def test_localAttributeAuthorityGetTrustedHostInfo(self, 
132                                                       environ, 
133                                                       start_response):
134        client = WSGIAttributeAuthorityClient(environ=environ,
135                                    environKey=self.attributeAuthorityFilterID)
136        role = environ.get('QUERY_STRING', '').split('=')[-1] or None
137        hostInfo = client.getTrustedHostInfo(role=role)
138        start_response('200 OK', [('Content-type', 'text/html')])
139        return ("test_localAttributeAuthorityGetTrustedHostInfo succeeded. "
140                "Response = %s" % hostInfo)
141
142    def test_localAttributeAuthorityGetAllHostsInfo(self, 
143                                                    environ, 
144                                                    start_response):
145        client = WSGIAttributeAuthorityClient(environ=environ,
146                                    environKey=self.attributeAuthorityFilterID)
147        hostInfo = client.getAllHostsInfo()
148        start_response('200 OK', [('Content-type', 'text/html')])
149        return ("test_localAttributeAuthorityGetAllHostsInfo succeeded. "
150                "Response = %s" % hostInfo)
151
152    @authorize(httpBasicAuthentication._userIn)
153    def test_localAttributeAuthorityGetAttCert(self, environ, start_response):
154       
155        client = WSGIAttributeAuthorityClient(environ=environ,
156                                    environKey=self.attributeAuthorityFilterID)
157        username=CombinedServicesWSGI.httpBasicAuthentication._userIn.users[-1]
158       
159        attCert = client.getAttCert(userId=username)
160        start_response('200 OK', [('Content-type', 'text/xml')])
161        return str(attCert)
162
163def app_factory(globalConfig, **localConfig):
164    return CombinedServicesWSGI(None, globalConfig, **localConfig)
165
166def filter_app_factory(app, globalConfig, **localConfig):
167    return CombinedServicesWSGI(app, globalConfig, **localConfig)
168
169   
170from ndg.security.test import BaseTestCase
171
172# Initialize environment for unit tests
173if BaseTestCase.configDirEnvVarName not in os.environ:
174    os.environ[BaseTestCase.configDirEnvVarName] = \
175                            join(dirname(abspath(dirname(__file__))), 'config')
176
177# Initialize environment for unit tests
178if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ:
179    os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR']=abspath(dirname(__file__))
180   
181# To start the Site A Attribute Authority run
182# $ paster serve site-a.ini or run this file as a script
183# $ ./siteAServerApp.py [port #]
184if __name__ == '__main__':
185    import sys
186    import logging
187    logging.basicConfig(level=logging.DEBUG)
188
189    if len(sys.argv) > 1:
190        port = int(sys.argv[1])
191    else:
192        port = 8000
193       
194    cfgFilePath = os.path.join(dirname(abspath(__file__)), 'services.ini')
195       
196    from paste.httpserver import serve
197    from paste.deploy import loadapp
198   
199    from paste.urlparser import StaticURLParser
200    from paste.cascade import Cascade
201   
202    app = loadapp('config:%s' % cfgFilePath)
203    rootPath = os.path.join(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], 
204                            'openidprovider')
205   
206    # Include to enable stylesheet and graphics
207    staticURLParser = StaticURLParser(rootPath)
208    app2 = Cascade([staticURLParser, app])
209    serve(app2, host='0.0.0.0', port=port)
Note: See TracBrowser for help on using the repository browser.