source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/serverapp.py @ 4682

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/serverapp.py@4682
Revision 4682, 8.8 KB checked in by pjkersha, 11 years ago (diff)
  • paster template - updated .ini_tmpl file adding $$ escapes for $ vars to be left in place
  • configfileparsers: added 'here' variable as default for INIPropertyFile class in the style of Paste Deploy ini file handling
  • WSGISessionManagerClient and WSGIAttributeAuthorityClient: more robust error handling and fixes for keying filter names from environ
  • Combined services tests: make use name substitution for section names
Line 
1#!/usr/bin/env python
2"""NDG Security test harness for combined Session Manager and Attribute
3Authority services running under a single Paste instance.
4
5NERC Data Grid Project
6
7This software may be distributed under the terms of the Q Public License,
8version 1.0 or later.
9"""
10__author__ = "P J Kershaw"
11__date__ = "20/11/08"
12__copyright__ = "(C) 2008 STFC"
13__contact__ = "Philip.Kershaw@stfc.ac.uk"
14__revision__ = "$Id$"
15import os
16from authkit.permissions import UserIn
17from authkit.authorize import authorize
18
19from ndg.security.server.wsgi.utils.sessionmanagerclient import \
20    WSGISessionManagerClient
21from ndg.security.server.wsgi.utils.attributeauthorityclient import \
22    WSGIAttributeAuthorityClient
23
24
25class HTTPBasicAuthentication(object):
26    '''Enable Authkit based HTTP Basic Authentication for test methods'''
27    def __init__(self):
28        self._userIn = UserIn([])
29       
30    def __call__(self, environ, username, password):
31        """validation function"""
32        try:
33            client = WSGISessionManagerClient(environ=environ,
34                                        environKey=self.sessionManagerFilterID)
35            res = client.connect(username, passphrase=password)
36
37            if username not in self._userIn.users:
38                self._userIn.users += [username]
39           
40            # Keep a reference to the session ID for test purposes
41            environ[client.environKey+'.user'] = res[-1]
42               
43        except Exception, e:
44            return False
45        else:
46            return True
47
48class CombinedServicesWSGI(object):
49    method = {
50"/": 'default',
51"/test_localSessionManagerConnect": "test_localSessionManagerConnect",
52"/test_localSessionManagerGetSessionStatus": "test_localSessionManagerGetSessionStatus",
53"/test_localSessionManagerDisconnect": "test_localSessionManagerDisconnect",
54"/test_localSessionManagerGetAttCert": "test_localSessionManagerGetAttCert",
55"/test_localAttributeAuthorityGetHostInfo": "test_localAttributeAuthorityGetHostInfo",
56"/test_localAttributeAuthorityGetTrustedHostInfo": "test_localAttributeAuthorityGetTrustedHostInfo",
57"/test_localAttributeAuthorityGetAllHostsInfo": "test_localAttributeAuthorityGetAllHostsInfo",
58"/test_localAttributeAuthorityGetAttCert": "test_localAttributeAuthorityGetAttCert"
59    }
60    httpBasicAuthentication = HTTPBasicAuthentication()
61
62    def __init__(self, app, globalConfig, **localConfig):
63        self.app = app
64        self.sessionManagerFilterID = localConfig.get('sessionManagerFilterID')
65        self.attributeAuthorityFilterID = \
66                                localConfig.get('attributeAuthorityFilterID')
67                               
68        CombinedServicesWSGI.httpBasicAuthentication.sessionManagerFilterID = \
69            self.sessionManagerFilterID
70           
71    def __call__(self, environ, start_response):
72       
73        methodName = self.method.get(environ['PATH_INFO'], '').rstrip()
74        if methodName:
75            action = getattr(self, methodName)
76            return action(environ, start_response)
77        elif self.app is not None:
78            return self.app(environ, start_response)
79        else:
80            start_response('404 Not Found', [('Content-type', 'text/plain')])
81            return "NDG Security Combined Services Unit tests: invalid URI"
82           
83    def default(self, environ, start_response):
84        start_response('200 OK', [('Content-type', 'text/plain')])
85        return "NDG Security Combined Services Unit Tests"
86
87    @authorize(httpBasicAuthentication._userIn)
88    def test_localSessionManagerConnect(self, environ, start_response):
89        start_response('200 OK', [('Content-type', 'text/plain')])
90        return "test_localSessionManagerConnect succeeded"
91       
92    @authorize(httpBasicAuthentication._userIn)
93    def test_localSessionManagerGetSessionStatus(self, environ,start_response):
94        client = WSGISessionManagerClient(environ=environ,
95                                        environKey=self.sessionManagerFilterID)
96        stat=client.getSessionStatus(sessID=environ[client.environKey+'.user'])
97        start_response('200 OK', [('Content-type', 'text/xml')])
98        return ("test_localSessionManagerGetSessionStatus succeeded. Response "
99                "= %s" % stat)
100
101    @authorize(httpBasicAuthentication._userIn)
102    def test_localSessionManagerDisconnect(self, environ, start_response):
103        client = WSGISessionManagerClient(environ=environ,
104                                        environKey=self.sessionManagerFilterID)
105        client.disconnect(sessID=environ[client.environKey+'.user'])
106       
107        # Re-initialise user authentication
108        CombinedServicesWSGI.httpBasicAuthentication._userIn.users = []
109        start_response('200 OK', [('Content-type', 'text/plain')])
110        return "test_localSessionManagerDisconnect succeeded."
111
112    @authorize(httpBasicAuthentication._userIn)
113    def test_localSessionManagerGetAttCert(self, environ, start_response):
114        client = WSGISessionManagerClient(environ=environ,
115                environKey=self.sessionManagerFilterID,
116                attributeAuthorityEnvironKey=self.attributeAuthorityFilterID)
117
118        attCert = client.getAttCert(sessID=environ[client.environKey+'.user'])
119        start_response('200 OK', [('Content-type', 'text/xml')])
120        return str(attCert)
121
122    def test_localAttributeAuthorityGetHostInfo(self, environ, start_response):
123        client = WSGIAttributeAuthorityClient(environ=environ,
124                                    environKey=self.attributeAuthorityFilterID)
125        hostInfo = client.getHostInfo()
126        start_response('200 OK', [('Content-type', 'text/html')])
127        return ("test_localAttributeAuthorityGetHostInfo succeeded. Response "
128                "= %s" % hostInfo)
129
130    def test_localAttributeAuthorityGetTrustedHostInfo(self, 
131                                                       environ, 
132                                                       start_response):
133        client = WSGIAttributeAuthorityClient(environ=environ,
134                                    environKey=self.attributeAuthorityFilterID)
135        role = environ.get('QUERY_STRING', '').split('=')[-1] or None
136        hostInfo = client.getTrustedHostInfo(role=role)
137        start_response('200 OK', [('Content-type', 'text/html')])
138        return ("test_localAttributeAuthorityGetTrustedHostInfo succeeded. "
139                "Response = %s" % hostInfo)
140
141    def test_localAttributeAuthorityGetAllHostsInfo(self, 
142                                                    environ, 
143                                                    start_response):
144        client = WSGIAttributeAuthorityClient(environ=environ,
145                                    environKey=self.attributeAuthorityFilterID)
146        hostInfo = client.getAllHostsInfo()
147        start_response('200 OK', [('Content-type', 'text/html')])
148        return ("test_localAttributeAuthorityGetAllHostsInfo succeeded. "
149                "Response = %s" % hostInfo)
150
151    @authorize(httpBasicAuthentication._userIn)
152    def test_localAttributeAuthorityGetAttCert(self, environ, start_response):
153       
154        client = WSGIAttributeAuthorityClient(environ=environ,
155                                    environKey=self.attributeAuthorityFilterID)
156        username=CombinedServicesWSGI.httpBasicAuthentication._userIn.users[-1]
157       
158        attCert = client.getAttCert(userId=username)
159        start_response('200 OK', [('Content-type', 'text/xml')])
160        return str(attCert)
161
162def app_factory(globalConfig, **localConfig):
163    return CombinedServicesWSGI(None, globalConfig, **localConfig)
164
165def filter_app_factory(app, globalConfig, **localConfig):
166    return CombinedServicesWSGI(app, globalConfig, **localConfig)
167
168# Initialize environment for unit tests
169if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ:
170    os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'] = \
171                                    os.path.abspath(os.path.dirname(__file__))
172   
173# To start the Site A Attribute Authority run
174# $ paster serve site-a.ini or run this file as a script
175# $ ./siteAServerApp.py [port #]
176if __name__ == '__main__':
177    import sys
178    import logging
179    logging.basicConfig(level=logging.DEBUG)
180
181    if len(sys.argv) > 1:
182        port = int(sys.argv[1])
183    else:
184        port = 8000
185       
186    cfgFilePath = os.path.join(os.path.dirname(os.path.abspath(__file__)),
187                               'services.ini')
188       
189    from paste.httpserver import serve
190    from paste.deploy import loadapp
191   
192    from paste.urlparser import StaticURLParser
193    from paste.cascade import Cascade
194   
195    app = loadapp('config:%s' % cfgFilePath)
196    rootPath = os.path.join(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], 
197                            'openidprovider')
198   
199    # Include to enable stylesheet and graphics
200    staticURLParser = StaticURLParser(rootPath)
201    app2 = Cascade([staticURLParser, app])
202    serve(app2, host='0.0.0.0', port=port)
Note: See TracBrowser for help on using the repository browser.