1 | #!/usr/bin/env python |
---|
2 | """Test harness for NDG Session Manager - makes requests for |
---|
3 | authentication and attribute retrieval. Attribute Authority services must be |
---|
4 | running for *AttCert* test methods. See README in this directory for details |
---|
5 | |
---|
6 | NERC Data Grid Project |
---|
7 | """ |
---|
8 | __author__ = "P J Kershaw" |
---|
9 | __date__ = "20/11/07" |
---|
10 | __copyright__ = "(C) 2007 STFC & NERC" |
---|
11 | __license__ = \ |
---|
12 | """This software may be distributed under the terms of the Q Public |
---|
13 | License, version 1.0 or later.""" |
---|
14 | __contact__ = "Philip.Kershaw@stfc.ac.uk" |
---|
15 | __revision__ = '$Id$' |
---|
16 | |
---|
17 | import unittest |
---|
18 | import os, sys, getpass, re |
---|
19 | from ConfigParser import SafeConfigParser |
---|
20 | import traceback |
---|
21 | |
---|
22 | from ndg.security.common.utils.ConfigFileParsers import \ |
---|
23 | CaseSensitiveConfigParser |
---|
24 | from ndg.security.common.X509 import X509CertParse |
---|
25 | from ndg.security.server.sessionmanager import * |
---|
26 | |
---|
27 | from os.path import expandvars as xpdVars |
---|
28 | from os.path import join as jnPath |
---|
29 | mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file) |
---|
30 | |
---|
31 | import logging |
---|
32 | logging.basicConfig(level=logging.DEBUG) |
---|
33 | |
---|
34 | |
---|
35 | class SessionManagerTestCase(unittest.TestCase): |
---|
36 | """Unit test case for ndg.security.server.sessionmanager.SessionManager |
---|
37 | class. |
---|
38 | |
---|
39 | This class manages server side sessions""" |
---|
40 | |
---|
41 | passphrase = None |
---|
42 | test4Passphrase = None |
---|
43 | |
---|
44 | def setUp(self): |
---|
45 | |
---|
46 | if 'NDGSEC_INT_DEBUG' in os.environ: |
---|
47 | import pdb |
---|
48 | pdb.set_trace() |
---|
49 | |
---|
50 | if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ: |
---|
51 | os.environ['NDGSEC_SM_UNITTEST_DIR'] = \ |
---|
52 | os.path.abspath(os.path.dirname(__file__)) |
---|
53 | |
---|
54 | self.cfg = CaseSensitiveConfigParser() |
---|
55 | configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], |
---|
56 | "sessionMgrTest.cfg") |
---|
57 | self.cfg.read(configFilePath) |
---|
58 | |
---|
59 | # Initialise the Session Manager client connection |
---|
60 | # Omit traceFile keyword to leave out SOAP debug info |
---|
61 | self.propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath')) |
---|
62 | self.sm = SessionManager(propFilePath=self.propFilePath) |
---|
63 | |
---|
64 | def _connect(self): |
---|
65 | '''Helper method to set up connections''' |
---|
66 | print "Connecting to session manager..." |
---|
67 | section = 'DEFAULT' |
---|
68 | |
---|
69 | username = self.cfg.get(section, 'username') |
---|
70 | if SessionManagerTestCase.passphrase is None and \ |
---|
71 | self.cfg.has_option(section, 'passphrase'): |
---|
72 | SessionManagerTestCase.passphrase=self.cfg.get(section, |
---|
73 | 'passphrase') |
---|
74 | |
---|
75 | if not SessionManagerTestCase.passphrase: |
---|
76 | SessionManagerTestCase.passphrase = getpass.getpass( |
---|
77 | prompt="\nPass-phrase for user %s: " % username) |
---|
78 | |
---|
79 | print("Connecting to session manager as user: %s..." % username) |
---|
80 | userX509Cert, userPriKey, issuingCert, self.sessID = \ |
---|
81 | self.sm.connect(username=username, |
---|
82 | passphrase=SessionManagerTestCase.passphrase) |
---|
83 | |
---|
84 | print("User '%s' connected to Session Manager:\n%s" % (username, |
---|
85 | self.sessID)) |
---|
86 | print("Finished setting up connection") |
---|
87 | |
---|
88 | def _connect2UserCertAuthNService(self): |
---|
89 | '''Same as _connect but Session Manager is using an Authentication |
---|
90 | Service that returns PKI credentials i.e. like MyProxy''' |
---|
91 | |
---|
92 | section = 'DEFAULT' |
---|
93 | |
---|
94 | print("Connecting to session manager with AuthN service returning " |
---|
95 | "PKI creds...") |
---|
96 | |
---|
97 | # Change to alternative authentication service |
---|
98 | userX509CertFilePath = self.cfg.get(section, 'userX509CertFilePath') |
---|
99 | userPriKeyFilePath = self.cfg.get(section, 'userPriKeyFilePath') |
---|
100 | userPriKeyPwd = self.cfg.get(section, 'userPriKeyPwd') |
---|
101 | |
---|
102 | self.sm['authNService'] = { |
---|
103 | 'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'], |
---|
104 | 'moduleName': 'usercertauthn', |
---|
105 | 'className': 'UserCertAuthN', |
---|
106 | 'userX509CertFilePath': userX509CertFilePath, |
---|
107 | 'userPriKeyFilePath': userPriKeyFilePath |
---|
108 | } |
---|
109 | |
---|
110 | self.sm.initAuthNService() |
---|
111 | |
---|
112 | username = self.cfg.get(section, 'username') |
---|
113 | if SessionManagerTestCase.passphrase is None and \ |
---|
114 | self.cfg.has_option(section, 'passphrase'): |
---|
115 | SessionManagerTestCase.passphrase=self.cfg.get(section, |
---|
116 | 'passphrase') |
---|
117 | |
---|
118 | if not SessionManagerTestCase.passphrase: |
---|
119 | SessionManagerTestCase.passphrase = getpass.getpass(\ |
---|
120 | prompt="\nPass-phrase for user %s: " % username) |
---|
121 | |
---|
122 | print("Connecting to session manager as user: %s..." % username) |
---|
123 | userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \ |
---|
124 | self.sm.connect(username=username, |
---|
125 | passphrase=SessionManagerTestCase.passphrase) |
---|
126 | self.userX509Cert = X509CertParse(userX509Cert) |
---|
127 | |
---|
128 | print("User '%s' connected to Session Manager:\n%s" % (username, |
---|
129 | self.sessID)) |
---|
130 | print("Finished setting up connection") |
---|
131 | |
---|
132 | def test1Connect2AuthNServiceWithNoUserCertReturned(self): |
---|
133 | |
---|
134 | username = self.cfg.get('test1Connect', 'username') |
---|
135 | if SessionManagerTestCase.passphrase is None and \ |
---|
136 | self.cfg.has_option('test1Connect', 'passphrase'): |
---|
137 | SessionManagerTestCase.passphrase=self.cfg.get('test1Connect', |
---|
138 | 'passphrase') |
---|
139 | |
---|
140 | if not SessionManagerTestCase.passphrase: |
---|
141 | SessionManagerTestCase.passphrase = getpass.getpass( |
---|
142 | prompt="\ntest1Connect pass-phrase for user %s: " % username) |
---|
143 | |
---|
144 | print "Connecting to session manager as user: %s..." %username |
---|
145 | userX509Cert, userPriKey, issuingCert, sessID = self.sm.connect( |
---|
146 | username=username, |
---|
147 | passphrase=SessionManagerTestCase.passphrase) |
---|
148 | assert(userX509Cert is None) |
---|
149 | assert(userPriKey is None) |
---|
150 | assert(issuingCert is None) |
---|
151 | |
---|
152 | print "User '%s' connected to Session Manager:\n%s" % \ |
---|
153 | (username, sessID) |
---|
154 | |
---|
155 | def test2Connect2AuthNServiceReturningAUserCert(self): |
---|
156 | |
---|
157 | section = 'test2Connect2AuthNServiceReturningAUserCert' |
---|
158 | |
---|
159 | # Change to alternative authentication service |
---|
160 | userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath') |
---|
161 | userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath') |
---|
162 | userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd') |
---|
163 | outputCredFilePath = self.cfg.get(section, 'outputCredsFilePath') |
---|
164 | |
---|
165 | self.sm['authNService'] = { |
---|
166 | 'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'], |
---|
167 | 'moduleName': 'usercertauthn', |
---|
168 | 'className': 'UserCertAuthN', |
---|
169 | 'userX509CertFilePath': userX509CertFilePath, |
---|
170 | 'userPriKeyFilePath': userPriKeyFilePath |
---|
171 | } |
---|
172 | |
---|
173 | self.sm.initAuthNService() |
---|
174 | |
---|
175 | print("Connecting to session manager...") |
---|
176 | userX509Cert, self.userPriKey, self.issuingCert, sessID = self.sm.connect( |
---|
177 | passphrase=userPriKeyPwd) |
---|
178 | self.userX509Cert = X509CertParse(userX509Cert) |
---|
179 | |
---|
180 | print("Connected to Session Manager:\n%s" % sessID) |
---|
181 | creds='\n'.join((self.issuingCert or '', |
---|
182 | self.userX509Cert.asPEM().strip(), |
---|
183 | self.userPriKey)) |
---|
184 | open(mkPath(outputCredFilePath), "w").write(creds) |
---|
185 | |
---|
186 | |
---|
187 | def test3GetSessionStatus(self): |
---|
188 | """test3GetSessionStatus: check a session is alive""" |
---|
189 | |
---|
190 | self._connect() |
---|
191 | assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead" |
---|
192 | print "User connected to Session Manager with sessID=%s" % self.sessID |
---|
193 | |
---|
194 | assert not self.sm.getSessionStatus(sessID='abc'), \ |
---|
195 | "sessID=abc shouldn't exist!" |
---|
196 | |
---|
197 | print "CORRECT: sessID=abc doesn't exist" |
---|
198 | |
---|
199 | def test4ConnectNoCreateServerSess(self): |
---|
200 | """test4ConnectNoCreateServerSess: Connect to retrieve credentials |
---|
201 | only - no session is created. This makes sense only for an AuthN |
---|
202 | Service that returns user credentials""" |
---|
203 | section = 'test4ConnectNoCreateServerSess' |
---|
204 | |
---|
205 | # Change to alternative authentication service |
---|
206 | userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath') |
---|
207 | userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath') |
---|
208 | userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd') |
---|
209 | |
---|
210 | self.sm['authNService'] = { |
---|
211 | 'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'], |
---|
212 | 'moduleName': 'usercertauthn', |
---|
213 | 'className': 'UserCertAuthN', |
---|
214 | 'userX509CertFilePath': userX509CertFilePath, |
---|
215 | 'userPriKeyFilePath': userPriKeyFilePath |
---|
216 | } |
---|
217 | |
---|
218 | self.sm.initAuthNService() |
---|
219 | |
---|
220 | |
---|
221 | username = self.cfg.get(section, 'username') |
---|
222 | |
---|
223 | if SessionManagerTestCase.test4Passphrase is None and \ |
---|
224 | self.cfg.has_option(section, 'passphrase'): |
---|
225 | SessionManagerTestCase.test4Passphrase = self.cfg.get(section, |
---|
226 | 'passphrase') |
---|
227 | |
---|
228 | if not SessionManagerTestCase.test4Passphrase: |
---|
229 | SessionManagerTestCase.test4Passphrase = getpass.getpass(prompt=\ |
---|
230 | "\n%s pass-phrase for user %s: " % |
---|
231 | (section, username)) |
---|
232 | |
---|
233 | userX509Cert, userPriKey, issuingCert, sessID = \ |
---|
234 | self.sm.connect(username=username, |
---|
235 | passphrase=SessionManagerTestCase.test4Passphrase, |
---|
236 | createServerSess=False) |
---|
237 | |
---|
238 | # Expect null session ID |
---|
239 | assert not sessID, "Expecting a null session ID!" |
---|
240 | |
---|
241 | print("User '%s' retrieved creds. from Session Manager:\n%s" % |
---|
242 | (username, sessID)) |
---|
243 | |
---|
244 | |
---|
245 | def test5DisconnectWithSessID(self): |
---|
246 | """test5DisconnectWithSessID: disconnect as if acting as a browser |
---|
247 | client |
---|
248 | """ |
---|
249 | |
---|
250 | self._connect() |
---|
251 | self.sm.deleteUserSession(sessID=self.sessID) |
---|
252 | |
---|
253 | print "User disconnected from Session Manager:\n%s" % self.sessID |
---|
254 | |
---|
255 | |
---|
256 | def test6DisconnectWithUserCert(self): |
---|
257 | """test5DisconnectWithUserCert: Disconnect based on a user X.509 |
---|
258 | cert. credential from an earlier call to connect |
---|
259 | """ |
---|
260 | |
---|
261 | self._connect2UserCertAuthNService() |
---|
262 | |
---|
263 | # User cert DN determines ID of session to delete |
---|
264 | self.sm.deleteUserSession(userX509Cert=self.userX509Cert) |
---|
265 | print "User disconnected from Session Manager:\n%s" % self.userX509Cert |
---|
266 | |
---|
267 | |
---|
268 | def test7GetAttCertWithSessID(self): |
---|
269 | """test7GetAttCertWithSessID: make an attribute request using |
---|
270 | a session ID as authentication credential""" |
---|
271 | |
---|
272 | self._connect() |
---|
273 | |
---|
274 | section = 'test7GetAttCertWithSessID' |
---|
275 | aaURI = self.cfg.get(section, 'aaURI') |
---|
276 | attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, |
---|
277 | attributeAuthorityURI=aaURI) |
---|
278 | if errMsg: |
---|
279 | self.fail(errMsg) |
---|
280 | |
---|
281 | print("Attribute Certificate:\n%s" % attCert) |
---|
282 | attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) |
---|
283 | attCert.write() |
---|
284 | |
---|
285 | return self.sm |
---|
286 | |
---|
287 | |
---|
288 | def test8GetAttCertRefusedWithSessID(self): |
---|
289 | """test8GetAttCertRefusedWithSessID: make an attribute request using |
---|
290 | a sessID as authentication credential requesting an AC from an |
---|
291 | Attribute Authority where the user is NOT registered""" |
---|
292 | |
---|
293 | self._connect() |
---|
294 | |
---|
295 | aaURI = self.cfg.get('test8GetAttCertRefusedWithSessID', 'aaURI') |
---|
296 | |
---|
297 | attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, |
---|
298 | attributeAuthorityURI=aaURI, |
---|
299 | mapFromTrustedHosts=False) |
---|
300 | if errMsg: |
---|
301 | print("SUCCESS - obtained expected result: %s" % errMsg) |
---|
302 | return |
---|
303 | |
---|
304 | self.fail("Request allowed from AA where user is NOT registered!") |
---|
305 | |
---|
306 | |
---|
307 | def test9GetMappedAttCertWithSessID(self): |
---|
308 | """test9GetMappedAttCertWithSessID: make an attribute request using |
---|
309 | a session ID as authentication credential""" |
---|
310 | |
---|
311 | self._connect() |
---|
312 | |
---|
313 | # Attribute Certificate cached in test 6 can be used to get a mapped |
---|
314 | # AC for this test ... |
---|
315 | self.sm = self.test7GetAttCertWithSessID() |
---|
316 | |
---|
317 | aaURI = self.cfg.get('test9GetMappedAttCertWithSessID', 'aaURI') |
---|
318 | |
---|
319 | attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, |
---|
320 | attributeAuthorityURI=aaURI, |
---|
321 | mapFromTrustedHosts=True) |
---|
322 | if errMsg: |
---|
323 | self.fail(errMsg) |
---|
324 | |
---|
325 | print("Attribute Certificate:\n%s" % attCert) |
---|
326 | |
---|
327 | |
---|
328 | def test10GetAttCertWithExtAttCertListWithSessID(self): |
---|
329 | """test10GetAttCertWithExtAttCertListWithSessID: make an attribute |
---|
330 | request using a session ID as authentication credential""" |
---|
331 | |
---|
332 | self._connect() |
---|
333 | section = 'test10GetAttCertWithExtAttCertListWithSessID' |
---|
334 | aaURI = self.cfg.get(section, 'aaURI') |
---|
335 | |
---|
336 | # Use output from test6GetAttCertWithSessID! |
---|
337 | extACFilePath = xpdVars(self.cfg.get(section, 'extACFilePath')) |
---|
338 | extAttCert = open(extACFilePath).read() |
---|
339 | |
---|
340 | attCert, errMsg, extAttCertList = self.sm.getAttCert( |
---|
341 | sessID=self.sessID, |
---|
342 | attributeAuthorityURI=aaURI, |
---|
343 | extAttCertList=[extAttCert]) |
---|
344 | if errMsg: |
---|
345 | self.fail(errMsg) |
---|
346 | |
---|
347 | print("Attribute Certificate:\n%s" % attCert) |
---|
348 | |
---|
349 | |
---|
350 | def test11GetAttCertWithUserCert(self): |
---|
351 | """test11GetAttCertWithUserCert: make an attribute request using |
---|
352 | a user cert as authentication credential""" |
---|
353 | self._connect2UserCertAuthNService() |
---|
354 | |
---|
355 | # Request an attribute certificate from an Attribute Authority |
---|
356 | # using the userX509Cert returned from connect() |
---|
357 | |
---|
358 | aaURI = self.cfg.get('test11GetAttCertWithUserCert', 'aaURI') |
---|
359 | attCert, errMsg, extAttCertList = self.sm.getAttCert( |
---|
360 | userX509Cert=self.userX509Cert, |
---|
361 | attributeAuthorityURI=aaURI) |
---|
362 | if errMsg: |
---|
363 | self.fail(errMsg) |
---|
364 | |
---|
365 | print("Attribute Certificate:\n%s" % attCert) |
---|
366 | |
---|
367 | |
---|
368 | class SessionManagerTestSuite(unittest.TestSuite): |
---|
369 | |
---|
370 | def __init__(self): |
---|
371 | print "SessionManagerTestSuite ..." |
---|
372 | smTestCaseMap = map(SessionManagerTestCase, |
---|
373 | ( |
---|
374 | "test1Connect2AuthNServiceWithNoUserCertReturned", |
---|
375 | "test2Connect2AuthNServiceReturningAUserCert", |
---|
376 | "test3GetSessionStatus", |
---|
377 | "test4ConnectNoCreateServerSess", |
---|
378 | "test5DisconnectWithSessID", |
---|
379 | "test6DisconnectWithUserCert", |
---|
380 | "test7GetAttCertWithSessID", |
---|
381 | "test8GetAttCertRefusedWithSessID", |
---|
382 | "test9GetMappedAttCertWithSessID", |
---|
383 | "test10GetAttCertWithExtAttCertListWithSessID", |
---|
384 | "test11GetAttCertWithUserCert", |
---|
385 | )) |
---|
386 | unittest.TestSuite.__init__(self, smTestCaseMap) |
---|
387 | |
---|
388 | |
---|
389 | if __name__ == "__main__": |
---|
390 | # suite = SessionManagerTestSuite() |
---|
391 | # unittest.TextTestRunner(verbosity=2).run(suite) |
---|
392 | unittest.main() |
---|