source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/integration/combinedservices/serverapp.py @ 5181

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/integration/combinedservices/serverapp.py@5181
Revision 5181, 8.7 KB checked in by pjkersha, 11 years ago (diff)

Added a Policy Information Point to encapsulate subject attribute retrieval.

Line 
1#!/usr/bin/env python
2"""NDG Security test harness for combined Session Manager and Attribute
3Authority services running under a single Paste instance.
4
5NERC Data Grid Project
6
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/08"
10__copyright__ = "(C) 2009 Science and Technology Facilities Council"
11__contact__ = "Philip.Kershaw@stfc.ac.uk"
12__revision__ = "$Id$"
13import os
14from os.path import dirname, abspath, join
15from authkit.permissions import UserIn
16from authkit.authorize import authorize
17
18from ndg.security.server.wsgi.utils.sessionmanagerclient import \
19    WSGISessionManagerClient
20from ndg.security.server.wsgi.utils.attributeauthorityclient import \
21    WSGIAttributeAuthorityClient
22
23
24class HTTPBasicAuthentication(object):
25    '''Enable Authkit based HTTP Basic Authentication for test methods'''
26    def __init__(self):
27        self._userIn = UserIn([])
28       
29    def __call__(self, environ, username, password):
30        """validation function"""
31        try:
32            client = WSGISessionManagerClient(environ=environ,
33                                    environKeyName=self.sessionManagerFilterID)
34            res = client.connect(username, passphrase=password)
35
36            if username not in self._userIn.users:
37                self._userIn.users += [username]
38           
39            # Keep a reference to the session ID for test purposes
40            environ[client.environKeyName+'.user'] = res[-1]
41               
42        except Exception, e:
43            return False
44        else:
45            return True
46
47class CombinedServicesWSGI(object):
48    method = {
49"/": 'default',
50"/test_localSessionManagerConnect": "test_localSessionManagerConnect",
51"/test_localSessionManagerGetSessionStatus": "test_localSessionManagerGetSessionStatus",
52"/test_localSessionManagerDisconnect": "test_localSessionManagerDisconnect",
53"/test_localSessionManagerGetAttCert": "test_localSessionManagerGetAttCert",
54"/test_localAttributeAuthorityGetHostInfo": "test_localAttributeAuthorityGetHostInfo",
55"/test_localAttributeAuthorityGetTrustedHostInfo": "test_localAttributeAuthorityGetTrustedHostInfo",
56"/test_localAttributeAuthorityGetAllHostsInfo": "test_localAttributeAuthorityGetAllHostsInfo",
57"/test_localAttributeAuthorityGetAttCert": "test_localAttributeAuthorityGetAttCert"
58    }
59    httpBasicAuthentication = HTTPBasicAuthentication()
60
61    def __init__(self, app, globalConfig, **localConfig):
62        self.app = app
63        self.sessionManagerFilterID = localConfig.get('sessionManagerFilterID')
64        self.attributeAuthorityFilterID = \
65                                localConfig.get('attributeAuthorityFilterID')
66                               
67        CombinedServicesWSGI.httpBasicAuthentication.sessionManagerFilterID = \
68            self.sessionManagerFilterID
69           
70    def __call__(self, environ, start_response):
71       
72        methodName = self.method.get(environ['PATH_INFO'], '').rstrip()
73        if methodName:
74            action = getattr(self, methodName)
75            return action(environ, start_response)
76        elif self.app is not None:
77            return self.app(environ, start_response)
78        else:
79            start_response('404 Not Found', [('Content-type', 'text/plain')])
80            return "NDG Security Combined Services Unit tests: invalid URI"
81           
82    def default(self, environ, start_response):
83        start_response('200 OK', [('Content-type', 'text/plain')])
84        return "NDG Security Combined Services Unit Tests"
85
86    @authorize(httpBasicAuthentication._userIn)
87    def test_localSessionManagerConnect(self, environ, start_response):
88        start_response('200 OK', [('Content-type', 'text/plain')])
89        return "test_localSessionManagerConnect succeeded"
90       
91    @authorize(httpBasicAuthentication._userIn)
92    def test_localSessionManagerGetSessionStatus(self, environ,start_response):
93        client = WSGISessionManagerClient(environ=environ,
94                                    environKeyName=self.sessionManagerFilterID)
95        stat=client.getSessionStatus(
96                                sessID=environ[client.environKeyName+'.user'])
97        start_response('200 OK', [('Content-type', 'text/xml')])
98        return ("test_localSessionManagerGetSessionStatus succeeded. Response "
99                "= %s" % stat)
100
101    @authorize(httpBasicAuthentication._userIn)
102    def test_localSessionManagerDisconnect(self, environ, start_response):
103        client = WSGISessionManagerClient(environ=environ,
104                                    environKeyName=self.sessionManagerFilterID)
105        client.disconnect(sessID=environ[client.environKeyName+'.user'])
106       
107        # Re-initialise user authentication
108        CombinedServicesWSGI.httpBasicAuthentication._userIn.users = []
109        start_response('200 OK', [('Content-type', 'text/plain')])
110        return "test_localSessionManagerDisconnect succeeded."
111
112    @authorize(httpBasicAuthentication._userIn)
113    def test_localSessionManagerGetAttCert(self, environ, start_response):
114        client = WSGISessionManagerClient(environ=environ,
115            environKeyName=self.sessionManagerFilterID,
116            attributeAuthorityEnvironKeyName=self.attributeAuthorityFilterID)
117
118        attCert = client.getAttCert(
119                                sessID=environ[client.environKeyName+'.user'])
120        start_response('200 OK', [('Content-type', 'text/xml')])
121        return str(attCert)
122
123    def test_localAttributeAuthorityGetHostInfo(self, environ, start_response):
124        client = WSGIAttributeAuthorityClient(environ=environ,
125                                environKeyName=self.attributeAuthorityFilterID)
126        hostInfo = client.getHostInfo()
127        start_response('200 OK', [('Content-type', 'text/html')])
128        return ("test_localAttributeAuthorityGetHostInfo succeeded. Response "
129                "= %s" % hostInfo)
130
131    def test_localAttributeAuthorityGetTrustedHostInfo(self, 
132                                                       environ, 
133                                                       start_response):
134        client = WSGIAttributeAuthorityClient(environ=environ,
135                                environKeyName=self.attributeAuthorityFilterID)
136        role = environ.get('QUERY_STRING', '').split('=')[-1] or None
137        hostInfo = client.getTrustedHostInfo(role=role)
138        start_response('200 OK', [('Content-type', 'text/html')])
139        return ("test_localAttributeAuthorityGetTrustedHostInfo succeeded. "
140                "Response = %s" % hostInfo)
141
142    def test_localAttributeAuthorityGetAllHostsInfo(self, 
143                                                    environ, 
144                                                    start_response):
145        client = WSGIAttributeAuthorityClient(environ=environ,
146                                environKeyName=self.attributeAuthorityFilterID)
147        hostInfo = client.getAllHostsInfo()
148        start_response('200 OK', [('Content-type', 'text/html')])
149        return ("test_localAttributeAuthorityGetAllHostsInfo succeeded. "
150                "Response = %s" % hostInfo)
151
152    @authorize(httpBasicAuthentication._userIn)
153    def test_localAttributeAuthorityGetAttCert(self, environ, start_response):
154       
155        client = WSGIAttributeAuthorityClient(environ=environ,
156                                environKeyName=self.attributeAuthorityFilterID)
157        username=CombinedServicesWSGI.httpBasicAuthentication._userIn.users[-1]
158       
159        attCert = client.getAttCert(userId=username)
160        start_response('200 OK', [('Content-type', 'text/xml')])
161        return str(attCert)
162
163def app_factory(globalConfig, **localConfig):
164    return CombinedServicesWSGI(None, globalConfig, **localConfig)
165
166def filter_app_factory(app, globalConfig, **localConfig):
167    return CombinedServicesWSGI(app, globalConfig, **localConfig)
168
169   
170from ndg.security.test import BaseTestCase
171
172# Initialize environment for unit tests
173if BaseTestCase.configDirEnvVarName not in os.environ:
174    os.environ[BaseTestCase.configDirEnvVarName] = \
175                            join(dirname(abspath(dirname(__file__))), 'config')
176
177# Initialize environment for unit tests
178if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ:
179    os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR']=abspath(dirname(__file__))
180   
181# To start run
182# $ paster serve services.ini or run this file as a script
183# $ ./serverapp.py [port #]
184if __name__ == '__main__':
185    import sys
186    import logging
187    logging.basicConfig(level=logging.DEBUG)
188
189    if len(sys.argv) > 1:
190        port = int(sys.argv[1])
191    else:
192        port = 8000
193       
194    cfgFilePath = os.path.join(dirname(abspath(__file__)), 'services.ini')
195       
196    from paste.httpserver import serve
197    from paste.deploy import loadapp
198   
199    app = loadapp('config:%s' % cfgFilePath)
200    serve(app, host='0.0.0.0', port=port)
Note: See TracBrowser for help on using the repository browser.