source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/serverapp.py @ 4521

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/serverapp.py@4522
Revision 4521, 7.1 KB checked in by pjkersha, 11 years ago (diff)

Completed tests running Attribute Authority and Session Manager in the same WSGI stack:

  • ndg.security.server.wsgi.utils.attributeauthorityclient.WSGIAttributeAuthorityClient: completed this class and tested in combinedservices unit tests. This class enables WSGI apps to access an AttributeAuthority? WSGI app running in the same stack or else make a callout to a remote SOAP service.
  • ndg.security.server.wsgi.wssecurity: improved config set-up
Line 
1#!/usr/bin/env python
2"""NDG Security test harness for Ccombined Session Manager and Attribute
3Authority services running under a single Paste instance.
4
5NERC Data Grid Project
6
7This software may be distributed under the terms of the Q Public License,
8version 1.0 or later.
9"""
10__author__ = "P J Kershaw"
11__date__ = "20/11/08"
12__copyright__ = "(C) 2008 STFC & NERC"
13__contact__ = "Philip.Kershaw@stfc.ac.uk"
14__revision__ = "$Id$"
15import os
16from authkit.permissions import UserIn
17from authkit.authorize import authorize
18
19from ndg.security.server.wsgi.utils.sessionmanagerclient import \
20    WSGISessionManagerClient
21from ndg.security.server.wsgi.utils.attributeauthorityclient import \
22    WSGIAttributeAuthorityClient
23
24
25class HTTPBasicAuthentication(object):
26    '''Enable Authkit based HTTP Basic Authentication for test methods'''
27    def __init__(self):
28        self._userIn = UserIn([])
29       
30    def __call__(self, environ, username, password):
31        """validation function"""
32        try:
33            client = WSGISessionManagerClient(environ=environ)
34            res = client.connect(username, passphrase=password)
35
36            if username not in self._userIn.users:
37                self._userIn.users += [username]
38           
39            # Keep a reference to the session ID for test purposes
40            environ[client.environKey+'.user'] = res[-1]
41               
42        except Exception, e:
43            return False
44        else:
45            return True
46
47class CombinedServicesWSGI(object):
48    method = {
49"/": 'default',
50"/test_localSessionManagerConnect": "test_localSessionManagerConnect",
51"/test_localSessionManagerGetSessionStatus": "test_localSessionManagerGetSessionStatus",
52"/test_localSessionManagerDisconnect": "test_localSessionManagerDisconnect",
53"/test_localSessionManagerGetAttCert": "test_localSessionManagerGetAttCert",
54"/test_localAttributeAuthorityGetHostInfo": "test_localAttributeAuthorityGetHostInfo",
55"/test_localAttributeAuthorityGetTrustedHostInfo": "test_localAttributeAuthorityGetTrustedHostInfo",
56"/test_localAttributeAuthorityGetAllHostsInfo": "test_localAttributeAuthorityGetAllHostsInfo",
57"/test_localAttributeAuthorityGetAttCert": "test_localAttributeAuthorityGetAttCert"
58    }
59    httpBasicAuthentication = HTTPBasicAuthentication()
60   
61    def __call__(self, environ, start_response):
62       
63        methodName = self.method.get(environ['PATH_INFO'], '').rstrip()
64        if methodName:
65            action = getattr(self, methodName)
66            return action(environ, start_response)
67        else:
68            start_response('404 Not Found', [('Content-type', 'text/plain')])
69            return "NDG Security Combined Services Unit tests: invalid URI"
70           
71    def default(self, environ, start_response):
72        start_response('200 OK', [('Content-type', 'text/plain')])
73        return "NDG Security Combined Services Unit Tests"
74
75    @authorize(httpBasicAuthentication._userIn)
76    def test_localSessionManagerConnect(self, environ, start_response):
77        start_response('200 OK', [('Content-type', 'text/plain')])
78        return "test_localSessionManagerConnect succeeded"
79       
80    @authorize(httpBasicAuthentication._userIn)
81    def test_localSessionManagerGetSessionStatus(self, environ,start_response):
82        client = WSGISessionManagerClient(environ=environ)
83        stat=client.getSessionStatus(sessID=environ[client.environKey+'.user'])
84        start_response('200 OK', [('Content-type', 'text/xml')])
85        return ("test_localSessionManagerGetSessionStatus succeeded. Response "
86                "= %s" % stat)
87
88    @authorize(httpBasicAuthentication._userIn)
89    def test_localSessionManagerDisconnect(self, environ, start_response):
90        client = WSGISessionManagerClient(environ=environ)
91        client.disconnect(sessID=environ[client.environKey+'.user'])
92       
93        # Re-initialise user authentication
94        CombinedServicesWSGI.httpBasicAuthentication._userIn.users = []
95        start_response('200 OK', [('Content-type', 'text/plain')])
96        return "test_localSessionManagerDisconnect succeeded."
97
98    @authorize(httpBasicAuthentication._userIn)
99    def test_localSessionManagerGetAttCert(self, environ, start_response):
100        client = WSGISessionManagerClient(environ=environ)
101        attCert = client.getAttCert(sessID=environ[client.environKey+'.user'])
102        start_response('200 OK', [('Content-type', 'text/xml')])
103        return str(attCert)
104
105    def test_localAttributeAuthorityGetHostInfo(self, environ, start_response):
106        client = WSGIAttributeAuthorityClient(environ=environ)
107        hostInfo = client.getHostInfo()
108        start_response('200 OK', [('Content-type', 'text/html')])
109        return ("test_localAttributeAuthorityGetHostInfo succeeded. Response "
110                "= %s" % hostInfo)
111
112    def test_localAttributeAuthorityGetTrustedHostInfo(self, 
113                                                       environ, 
114                                                       start_response):
115        client = WSGIAttributeAuthorityClient(environ=environ)
116        role = environ.get('QUERY_STRING', '').split('=')[-1] or None
117        hostInfo = client.getTrustedHostInfo(role=role)
118        start_response('200 OK', [('Content-type', 'text/html')])
119        return ("test_localAttributeAuthorityGetTrustedHostInfo succeeded. "
120                "Response = %s" % hostInfo)
121
122    def test_localAttributeAuthorityGetAllHostsInfo(self, 
123                                                    environ, 
124                                                    start_response):
125        client = WSGIAttributeAuthorityClient(environ=environ)
126        hostInfo = client.getAllHostsInfo()
127        start_response('200 OK', [('Content-type', 'text/html')])
128        return ("test_localAttributeAuthorityGetAllHostsInfo succeeded. "
129                "Response = %s" % hostInfo)
130
131    @authorize(httpBasicAuthentication._userIn)
132    def test_localAttributeAuthorityGetAttCert(self, environ, start_response):
133       
134        client = WSGIAttributeAuthorityClient(environ=environ)
135        username=CombinedServicesWSGI.httpBasicAuthentication._userIn.users[-1]
136       
137        attCert = client.getAttCert(userId=username)
138        start_response('200 OK', [('Content-type', 'text/xml')])
139        return str(attCert)
140       
141       
142def app_factory(global_config, **local_conf):
143    return CombinedServicesWSGI()
144
145
146# Initialize environment for unit tests
147if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ:
148    os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'] = \
149                                    os.path.abspath(os.path.dirname(__file__))
150   
151# To start the Site A Attribute Authority run
152# $ paster serve site-a.ini or run this file as a script
153# $ ./siteAServerApp.py [port #]
154if __name__ == '__main__':
155    import sys
156    import logging
157    logging.basicConfig(level=logging.DEBUG)
158
159    if len(sys.argv) > 1:
160        port = int(sys.argv[1])
161    else:
162        port = 8000
163       
164    cfgFilePath = os.path.join(os.path.dirname(os.path.abspath(__file__)),
165                               'services.ini')
166       
167    from paste.httpserver import serve
168    from paste.deploy import loadapp
169
170    app = loadapp('config:%s' % cfgFilePath)
171    serve(app, host='0.0.0.0', port=port)
Note: See TracBrowser for help on using the repository browser.