1 | #!/usr/bin/env python |
---|
2 | """NDG Security test harness for combined Session Manager and Attribute |
---|
3 | Authority services running under a single Paste instance. |
---|
4 | |
---|
5 | NERC Data Grid Project |
---|
6 | |
---|
7 | This software may be distributed under the terms of the Q Public License, |
---|
8 | version 1.0 or later. |
---|
9 | """ |
---|
10 | __author__ = "P J Kershaw" |
---|
11 | __date__ = "20/11/08" |
---|
12 | __copyright__ = "(C) 2008 STFC & NERC" |
---|
13 | __contact__ = "Philip.Kershaw@stfc.ac.uk" |
---|
14 | __revision__ = "$Id$" |
---|
15 | import os |
---|
16 | from authkit.permissions import UserIn |
---|
17 | from authkit.authorize import authorize |
---|
18 | |
---|
19 | from ndg.security.server.wsgi.utils.sessionmanagerclient import \ |
---|
20 | WSGISessionManagerClient |
---|
21 | from ndg.security.server.wsgi.utils.attributeauthorityclient import \ |
---|
22 | WSGIAttributeAuthorityClient |
---|
23 | |
---|
24 | |
---|
25 | class HTTPBasicAuthentication(object): |
---|
26 | '''Enable Authkit based HTTP Basic Authentication for test methods''' |
---|
27 | def __init__(self): |
---|
28 | self._userIn = UserIn([]) |
---|
29 | |
---|
30 | def __call__(self, environ, username, password): |
---|
31 | """validation function""" |
---|
32 | try: |
---|
33 | client = WSGISessionManagerClient(environ=environ) |
---|
34 | res = client.connect(username, passphrase=password) |
---|
35 | |
---|
36 | if username not in self._userIn.users: |
---|
37 | self._userIn.users += [username] |
---|
38 | |
---|
39 | # Keep a reference to the session ID for test purposes |
---|
40 | environ[client.environKey+'.user'] = res[-1] |
---|
41 | |
---|
42 | except Exception, e: |
---|
43 | return False |
---|
44 | else: |
---|
45 | return True |
---|
46 | |
---|
47 | class CombinedServicesWSGI(object): |
---|
48 | method = { |
---|
49 | "/": 'default', |
---|
50 | "/test_localSessionManagerConnect": "test_localSessionManagerConnect", |
---|
51 | "/test_localSessionManagerGetSessionStatus": "test_localSessionManagerGetSessionStatus", |
---|
52 | "/test_localSessionManagerDisconnect": "test_localSessionManagerDisconnect", |
---|
53 | "/test_localSessionManagerGetAttCert": "test_localSessionManagerGetAttCert", |
---|
54 | "/test_localAttributeAuthorityGetHostInfo": "test_localAttributeAuthorityGetHostInfo", |
---|
55 | "/test_localAttributeAuthorityGetTrustedHostInfo": "test_localAttributeAuthorityGetTrustedHostInfo", |
---|
56 | "/test_localAttributeAuthorityGetAllHostsInfo": "test_localAttributeAuthorityGetAllHostsInfo", |
---|
57 | "/test_localAttributeAuthorityGetAttCert": "test_localAttributeAuthorityGetAttCert" |
---|
58 | } |
---|
59 | httpBasicAuthentication = HTTPBasicAuthentication() |
---|
60 | |
---|
61 | def __init__(self, app, globalConfig, **localConfig): |
---|
62 | self.app = app |
---|
63 | |
---|
64 | def __call__(self, environ, start_response): |
---|
65 | |
---|
66 | methodName = self.method.get(environ['PATH_INFO'], '').rstrip() |
---|
67 | if methodName: |
---|
68 | action = getattr(self, methodName) |
---|
69 | return action(environ, start_response) |
---|
70 | elif self.app is not None: |
---|
71 | return self.app(environ, start_response) |
---|
72 | else: |
---|
73 | start_response('404 Not Found', [('Content-type', 'text/plain')]) |
---|
74 | return "NDG Security Combined Services Unit tests: invalid URI" |
---|
75 | |
---|
76 | def default(self, environ, start_response): |
---|
77 | start_response('200 OK', [('Content-type', 'text/plain')]) |
---|
78 | return "NDG Security Combined Services Unit Tests" |
---|
79 | |
---|
80 | @authorize(httpBasicAuthentication._userIn) |
---|
81 | def test_localSessionManagerConnect(self, environ, start_response): |
---|
82 | start_response('200 OK', [('Content-type', 'text/plain')]) |
---|
83 | return "test_localSessionManagerConnect succeeded" |
---|
84 | |
---|
85 | @authorize(httpBasicAuthentication._userIn) |
---|
86 | def test_localSessionManagerGetSessionStatus(self, environ,start_response): |
---|
87 | client = WSGISessionManagerClient(environ=environ) |
---|
88 | stat=client.getSessionStatus(sessID=environ[client.environKey+'.user']) |
---|
89 | start_response('200 OK', [('Content-type', 'text/xml')]) |
---|
90 | return ("test_localSessionManagerGetSessionStatus succeeded. Response " |
---|
91 | "= %s" % stat) |
---|
92 | |
---|
93 | @authorize(httpBasicAuthentication._userIn) |
---|
94 | def test_localSessionManagerDisconnect(self, environ, start_response): |
---|
95 | client = WSGISessionManagerClient(environ=environ) |
---|
96 | client.disconnect(sessID=environ[client.environKey+'.user']) |
---|
97 | |
---|
98 | # Re-initialise user authentication |
---|
99 | CombinedServicesWSGI.httpBasicAuthentication._userIn.users = [] |
---|
100 | start_response('200 OK', [('Content-type', 'text/plain')]) |
---|
101 | return "test_localSessionManagerDisconnect succeeded." |
---|
102 | |
---|
103 | @authorize(httpBasicAuthentication._userIn) |
---|
104 | def test_localSessionManagerGetAttCert(self, environ, start_response): |
---|
105 | client = WSGISessionManagerClient(environ=environ) |
---|
106 | attCert = client.getAttCert(sessID=environ[client.environKey+'.user']) |
---|
107 | start_response('200 OK', [('Content-type', 'text/xml')]) |
---|
108 | return str(attCert) |
---|
109 | |
---|
110 | def test_localAttributeAuthorityGetHostInfo(self, environ, start_response): |
---|
111 | client = WSGIAttributeAuthorityClient(environ=environ) |
---|
112 | hostInfo = client.getHostInfo() |
---|
113 | start_response('200 OK', [('Content-type', 'text/html')]) |
---|
114 | return ("test_localAttributeAuthorityGetHostInfo succeeded. Response " |
---|
115 | "= %s" % hostInfo) |
---|
116 | |
---|
117 | def test_localAttributeAuthorityGetTrustedHostInfo(self, |
---|
118 | environ, |
---|
119 | start_response): |
---|
120 | client = WSGIAttributeAuthorityClient(environ=environ) |
---|
121 | role = environ.get('QUERY_STRING', '').split('=')[-1] or None |
---|
122 | hostInfo = client.getTrustedHostInfo(role=role) |
---|
123 | start_response('200 OK', [('Content-type', 'text/html')]) |
---|
124 | return ("test_localAttributeAuthorityGetTrustedHostInfo succeeded. " |
---|
125 | "Response = %s" % hostInfo) |
---|
126 | |
---|
127 | def test_localAttributeAuthorityGetAllHostsInfo(self, |
---|
128 | environ, |
---|
129 | start_response): |
---|
130 | client = WSGIAttributeAuthorityClient(environ=environ) |
---|
131 | hostInfo = client.getAllHostsInfo() |
---|
132 | start_response('200 OK', [('Content-type', 'text/html')]) |
---|
133 | return ("test_localAttributeAuthorityGetAllHostsInfo succeeded. " |
---|
134 | "Response = %s" % hostInfo) |
---|
135 | |
---|
136 | @authorize(httpBasicAuthentication._userIn) |
---|
137 | def test_localAttributeAuthorityGetAttCert(self, environ, start_response): |
---|
138 | |
---|
139 | client = WSGIAttributeAuthorityClient(environ=environ) |
---|
140 | username=CombinedServicesWSGI.httpBasicAuthentication._userIn.users[-1] |
---|
141 | |
---|
142 | attCert = client.getAttCert(userId=username) |
---|
143 | start_response('200 OK', [('Content-type', 'text/xml')]) |
---|
144 | return str(attCert) |
---|
145 | |
---|
146 | def app_factory(globalConfig, **localConfig): |
---|
147 | return CombinedServicesWSGI(None, globalConfig, **localConfig) |
---|
148 | |
---|
149 | def filter_app_factory(app, globalConfig, **localConfig): |
---|
150 | return CombinedServicesWSGI(app, globalConfig, **localConfig) |
---|
151 | |
---|
152 | # Initialize environment for unit tests |
---|
153 | if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ: |
---|
154 | os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'] = \ |
---|
155 | os.path.abspath(os.path.dirname(__file__)) |
---|
156 | |
---|
157 | # To start the Site A Attribute Authority run |
---|
158 | # $ paster serve site-a.ini or run this file as a script |
---|
159 | # $ ./siteAServerApp.py [port #] |
---|
160 | if __name__ == '__main__': |
---|
161 | import sys |
---|
162 | import logging |
---|
163 | logging.basicConfig(level=logging.DEBUG) |
---|
164 | |
---|
165 | if len(sys.argv) > 1: |
---|
166 | port = int(sys.argv[1]) |
---|
167 | else: |
---|
168 | port = 8000 |
---|
169 | |
---|
170 | cfgFilePath = os.path.join(os.path.dirname(os.path.abspath(__file__)), |
---|
171 | 'services.ini') |
---|
172 | |
---|
173 | from paste.httpserver import serve |
---|
174 | from paste.deploy import loadapp |
---|
175 | |
---|
176 | from paste.urlparser import StaticURLParser |
---|
177 | from paste.cascade import Cascade |
---|
178 | |
---|
179 | app = loadapp('config:%s' % cfgFilePath) |
---|
180 | rootPath = os.path.join(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], |
---|
181 | 'openidprovider') |
---|
182 | |
---|
183 | # Include to enable stylesheet and graphics |
---|
184 | staticURLParser = StaticURLParser(rootPath) |
---|
185 | app2 = Cascade([staticURLParser, app]) |
---|
186 | serve(app2, host='0.0.0.0', port=port) |
---|