1 | #!/usr/bin/env python |
---|
2 | """Test harness for NDG Session Manager - makes requests for |
---|
3 | authentication and attribute retrieval. Attribute Authority services must be |
---|
4 | running for *AttCert* test methods. See README in this directory for details |
---|
5 | |
---|
6 | NERC Data Grid Project |
---|
7 | """ |
---|
8 | __author__ = "P J Kershaw" |
---|
9 | __date__ = "20/11/07" |
---|
10 | __copyright__ = "(C) 2007 STFC & NERC" |
---|
11 | __license__ = \ |
---|
12 | """This software may be distributed under the terms of the Q Public |
---|
13 | License, version 1.0 or later.""" |
---|
14 | __contact__ = "Philip.Kershaw@stfc.ac.uk" |
---|
15 | __revision__ = '$Id$' |
---|
16 | |
---|
17 | import unittest |
---|
18 | import os, sys, getpass, re |
---|
19 | from ConfigParser import SafeConfigParser |
---|
20 | import traceback |
---|
21 | |
---|
22 | from ndg.security.common.utils.ConfigFileParsers import \ |
---|
23 | CaseSensitiveConfigParser |
---|
24 | from ndg.security.common.X509 import X509CertParse |
---|
25 | from ndg.security.server.sessionmanager import SessionManager |
---|
26 | from ndg.security.server.attributeauthority import AttributeAuthority |
---|
27 | |
---|
28 | from os.path import expandvars as xpdVars |
---|
29 | from os.path import join as jnPath |
---|
30 | mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file) |
---|
31 | |
---|
32 | import logging |
---|
33 | logging.basicConfig(level=logging.DEBUG) |
---|
34 | |
---|
35 | |
---|
36 | class SessionManagerTestCase(unittest.TestCase): |
---|
37 | """Unit test case for ndg.security.server.sessionmanager.SessionManager |
---|
38 | class. |
---|
39 | |
---|
40 | This class manages server side sessions""" |
---|
41 | |
---|
42 | passphrase = None |
---|
43 | test4Passphrase = None |
---|
44 | |
---|
45 | def setUp(self): |
---|
46 | |
---|
47 | if 'NDGSEC_INT_DEBUG' in os.environ: |
---|
48 | import pdb |
---|
49 | pdb.set_trace() |
---|
50 | |
---|
51 | if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ: |
---|
52 | os.environ['NDGSEC_SM_UNITTEST_DIR'] = \ |
---|
53 | os.path.abspath(os.path.dirname(__file__)) |
---|
54 | |
---|
55 | self.cfg = CaseSensitiveConfigParser() |
---|
56 | configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], |
---|
57 | "sessionMgrTest.cfg") |
---|
58 | self.cfg.read(configFilePath) |
---|
59 | |
---|
60 | # Initialise the Session Manager client connection |
---|
61 | # Omit traceFile keyword to leave out SOAP debug info |
---|
62 | self.propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath')) |
---|
63 | self.sm = SessionManager(propFilePath=self.propFilePath) |
---|
64 | |
---|
65 | def _connect(self): |
---|
66 | '''Helper method to set up connections''' |
---|
67 | print "Connecting to session manager..." |
---|
68 | section = 'DEFAULT' |
---|
69 | |
---|
70 | username = self.cfg.get(section, 'username') |
---|
71 | if SessionManagerTestCase.passphrase is None and \ |
---|
72 | self.cfg.has_option(section, 'passphrase'): |
---|
73 | SessionManagerTestCase.passphrase=self.cfg.get(section, |
---|
74 | 'passphrase') |
---|
75 | |
---|
76 | if not SessionManagerTestCase.passphrase: |
---|
77 | SessionManagerTestCase.passphrase = getpass.getpass( |
---|
78 | prompt="\nPass-phrase for user %s: " % username) |
---|
79 | |
---|
80 | print("Connecting to session manager as user: %s..." % username) |
---|
81 | userX509Cert, userPriKey, issuingCert, self.sessID = \ |
---|
82 | self.sm.connect(username=username, |
---|
83 | passphrase=SessionManagerTestCase.passphrase) |
---|
84 | |
---|
85 | print("User '%s' connected to Session Manager:\n%s" % (username, |
---|
86 | self.sessID)) |
---|
87 | print("Finished setting up connection") |
---|
88 | |
---|
89 | def _connect2UserX509CertAuthNService(self): |
---|
90 | '''Same as _connect but Session Manager is using an Authentication |
---|
91 | Service that returns PKI credentials i.e. like MyProxy''' |
---|
92 | |
---|
93 | section = 'DEFAULT' |
---|
94 | |
---|
95 | print("Connecting to session manager with AuthN service returning " |
---|
96 | "PKI creds...") |
---|
97 | |
---|
98 | # Change to alternative authentication service |
---|
99 | userX509CertFilePath = self.cfg.get(section, 'userX509CertFilePath') |
---|
100 | userPriKeyFilePath = self.cfg.get(section, 'userPriKeyFilePath') |
---|
101 | userPriKeyPwd = self.cfg.get(section, 'userPriKeyPwd') |
---|
102 | |
---|
103 | self.sm['authNService'] = { |
---|
104 | 'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'], |
---|
105 | 'moduleName': 'userx509certauthn', |
---|
106 | 'className': 'UserX509CertAuthN', |
---|
107 | 'userX509CertFilePath': userX509CertFilePath, |
---|
108 | 'userPriKeyFilePath': userPriKeyFilePath |
---|
109 | } |
---|
110 | |
---|
111 | self.sm.initAuthNService() |
---|
112 | |
---|
113 | username = self.cfg.get(section, 'username') |
---|
114 | if SessionManagerTestCase.passphrase is None and \ |
---|
115 | self.cfg.has_option(section, 'passphrase'): |
---|
116 | SessionManagerTestCase.passphrase=self.cfg.get(section, |
---|
117 | 'passphrase') |
---|
118 | |
---|
119 | if not SessionManagerTestCase.passphrase: |
---|
120 | SessionManagerTestCase.passphrase = getpass.getpass(\ |
---|
121 | prompt="\nPass-phrase for user %s: " % username) |
---|
122 | |
---|
123 | print("Connecting to session manager as user: %s..." % username) |
---|
124 | userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \ |
---|
125 | self.sm.connect(username=username, |
---|
126 | passphrase=SessionManagerTestCase.passphrase) |
---|
127 | self.userX509Cert = X509CertParse(userX509Cert) |
---|
128 | |
---|
129 | print("User '%s' connected to Session Manager:\n%s" % (username, |
---|
130 | self.sessID)) |
---|
131 | print("Finished setting up connection") |
---|
132 | |
---|
133 | def test01Connect2AuthNServiceWithNoUserX509CertReturned(self): |
---|
134 | |
---|
135 | thisSection = 'test01Connect2AuthNServiceWithNoUserX509CertReturned' |
---|
136 | username = self.cfg.get(thisSection, 'username') |
---|
137 | if SessionManagerTestCase.passphrase is None and \ |
---|
138 | self.cfg.has_option(thisSection, 'passphrase'): |
---|
139 | SessionManagerTestCase.passphrase=self.cfg.get(thisSection, |
---|
140 | 'passphrase') |
---|
141 | |
---|
142 | if not SessionManagerTestCase.passphrase: |
---|
143 | SessionManagerTestCase.passphrase = getpass.getpass( |
---|
144 | prompt="\ntest1Connect pass-phrase for user %s: " % username) |
---|
145 | |
---|
146 | print "Connecting to session manager as user: %s..." %username |
---|
147 | userX509Cert, userPriKey, issuingCert, sessID = self.sm.connect( |
---|
148 | username=username, |
---|
149 | passphrase=SessionManagerTestCase.passphrase) |
---|
150 | assert(userX509Cert is None) |
---|
151 | assert(userPriKey is None) |
---|
152 | assert(issuingCert is None) |
---|
153 | |
---|
154 | print("User '%s' connected to Session Manager:\n%s"%(username, sessID)) |
---|
155 | |
---|
156 | def test02Connect2AuthNServiceReturningAUserX509Cert(self): |
---|
157 | |
---|
158 | section = 'test02Connect2AuthNServiceReturningAUserX509Cert' |
---|
159 | |
---|
160 | # Change to alternative authentication service |
---|
161 | userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath') |
---|
162 | userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath') |
---|
163 | userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd') |
---|
164 | outputCredFilePath = self.cfg.get(section, 'outputCredsFilePath') |
---|
165 | |
---|
166 | self.sm['authNService'] = { |
---|
167 | 'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'], |
---|
168 | 'moduleName': 'userX509certauthn', |
---|
169 | 'className': 'UserX509CertAuthN', |
---|
170 | 'userX509CertFilePath': userX509CertFilePath, |
---|
171 | 'userPriKeyFilePath': userPriKeyFilePath |
---|
172 | } |
---|
173 | |
---|
174 | self.sm.initAuthNService() |
---|
175 | |
---|
176 | print("Connecting to session manager...") |
---|
177 | userX509Cert, self.userPriKey, self.issuingCert, sessID = self.sm.connect( |
---|
178 | passphrase=userPriKeyPwd) |
---|
179 | self.userX509Cert = X509CertParse(userX509Cert) |
---|
180 | |
---|
181 | print("Connected to Session Manager:\n%s" % sessID) |
---|
182 | creds='\n'.join((self.issuingCert or '', |
---|
183 | self.userX509Cert.asPEM().strip(), |
---|
184 | self.userPriKey)) |
---|
185 | open(mkPath(outputCredFilePath), "w").write(creds) |
---|
186 | |
---|
187 | |
---|
188 | def test03GetSessionStatus(self): |
---|
189 | """test03GetSessionStatus: check a session is alive""" |
---|
190 | |
---|
191 | self._connect() |
---|
192 | assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead" |
---|
193 | print "User connected to Session Manager with sessID=%s" % self.sessID |
---|
194 | |
---|
195 | assert not self.sm.getSessionStatus(sessID='abc'), \ |
---|
196 | "sessID=abc shouldn't exist!" |
---|
197 | |
---|
198 | print "CORRECT: sessID=abc doesn't exist" |
---|
199 | |
---|
200 | def test04ConnectNoCreateServerSess(self): |
---|
201 | """test04ConnectNoCreateServerSess: Connect to retrieve credentials |
---|
202 | only - no session is created. This makes sense only for an AuthN |
---|
203 | Service that returns user credentials""" |
---|
204 | section = 'test04ConnectNoCreateServerSess' |
---|
205 | |
---|
206 | # Change to alternative authentication service |
---|
207 | userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath') |
---|
208 | userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath') |
---|
209 | userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd') |
---|
210 | |
---|
211 | self.sm['authNService'] = { |
---|
212 | 'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'], |
---|
213 | 'moduleName': 'userX509certauthn', |
---|
214 | 'className': 'UserX509CertAuthN', |
---|
215 | 'userX509CertFilePath': userX509CertFilePath, |
---|
216 | 'userPriKeyFilePath': userPriKeyFilePath |
---|
217 | } |
---|
218 | |
---|
219 | self.sm.initAuthNService() |
---|
220 | |
---|
221 | |
---|
222 | username = self.cfg.get(section, 'username') |
---|
223 | |
---|
224 | if SessionManagerTestCase.test4Passphrase is None and \ |
---|
225 | self.cfg.has_option(section, 'passphrase'): |
---|
226 | SessionManagerTestCase.test4Passphrase = self.cfg.get(section, |
---|
227 | 'passphrase') |
---|
228 | |
---|
229 | if not SessionManagerTestCase.test4Passphrase: |
---|
230 | SessionManagerTestCase.test4Passphrase = getpass.getpass(prompt=\ |
---|
231 | "\n%s pass-phrase for user %s: " % |
---|
232 | (section, username)) |
---|
233 | |
---|
234 | userX509Cert, userPriKey, issuingCert, sessID = \ |
---|
235 | self.sm.connect(username=username, |
---|
236 | passphrase=SessionManagerTestCase.test4Passphrase, |
---|
237 | createServerSess=False) |
---|
238 | |
---|
239 | # Expect null session ID |
---|
240 | assert not sessID, "Expecting a null session ID!" |
---|
241 | |
---|
242 | print("User '%s' retrieved creds. from Session Manager:\n%s" % |
---|
243 | (username, sessID)) |
---|
244 | |
---|
245 | |
---|
246 | def test05DisconnectWithSessID(self): |
---|
247 | """test05DisconnectWithSessID: disconnect as if acting as a browser |
---|
248 | client |
---|
249 | """ |
---|
250 | |
---|
251 | self._connect() |
---|
252 | self.sm.deleteUserSession(sessID=self.sessID) |
---|
253 | |
---|
254 | print "User disconnected from Session Manager:\n%s" % self.sessID |
---|
255 | |
---|
256 | |
---|
257 | def test06DisconnectWithUserX509Cert(self): |
---|
258 | """test5DisconnectWithUserX509Cert: Disconnect based on a user X.509 |
---|
259 | cert. credential from an earlier call to connect |
---|
260 | """ |
---|
261 | |
---|
262 | self._connect2UserX509CertAuthNService() |
---|
263 | |
---|
264 | # User cert DN determines ID of session to delete |
---|
265 | self.sm.deleteUserSession(userX509Cert=self.userX509Cert) |
---|
266 | print "User disconnected from Session Manager:\n%s" % self.userX509Cert |
---|
267 | |
---|
268 | |
---|
269 | def test07GetAttCertWithSessID(self): |
---|
270 | """test07GetAttCertWithSessID: make an attribute request using |
---|
271 | a session ID as authentication credential""" |
---|
272 | |
---|
273 | self._connect() |
---|
274 | |
---|
275 | section = 'test07GetAttCertWithSessID' |
---|
276 | aaURI = self.cfg.get(section, 'aaURI') |
---|
277 | attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, |
---|
278 | attributeAuthorityURI=aaURI) |
---|
279 | if errMsg: |
---|
280 | self.fail(errMsg) |
---|
281 | |
---|
282 | print("Attribute Certificate:\n%s" % attCert) |
---|
283 | attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) |
---|
284 | attCert.write() |
---|
285 | |
---|
286 | |
---|
287 | def test08GetAttCertRefusedWithSessID(self): |
---|
288 | """test08GetAttCertRefusedWithSessID: make an attribute request using |
---|
289 | a sessID as authentication credential requesting an AC from an |
---|
290 | Attribute Authority where the user is NOT registered""" |
---|
291 | |
---|
292 | self._connect() |
---|
293 | |
---|
294 | aaURI = self.cfg.get('test08GetAttCertRefusedWithSessID', 'aaURI') |
---|
295 | |
---|
296 | attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, |
---|
297 | attributeAuthorityURI=aaURI, |
---|
298 | mapFromTrustedHosts=False) |
---|
299 | if errMsg: |
---|
300 | print("SUCCESS - obtained expected result: %s" % errMsg) |
---|
301 | return |
---|
302 | |
---|
303 | self.fail("Request allowed from AA where user is NOT registered!") |
---|
304 | |
---|
305 | |
---|
306 | def test09GetMappedAttCertWithSessID(self): |
---|
307 | """test09GetMappedAttCertWithSessID: make an attribute request using |
---|
308 | a session ID as authentication credential""" |
---|
309 | |
---|
310 | self._connect() |
---|
311 | |
---|
312 | # Attribute Certificate cached in test 6 can be used to get a mapped |
---|
313 | # AC for this test ... |
---|
314 | self.test07GetAttCertWithSessID() |
---|
315 | |
---|
316 | aaURI = self.cfg.get('test09GetMappedAttCertWithSessID', 'aaURI') |
---|
317 | |
---|
318 | attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, |
---|
319 | attributeAuthorityURI=aaURI, |
---|
320 | mapFromTrustedHosts=True) |
---|
321 | if errMsg: |
---|
322 | self.fail(errMsg) |
---|
323 | |
---|
324 | print("Attribute Certificate:\n%s" % attCert) |
---|
325 | |
---|
326 | |
---|
327 | def test10GetAttCertWithExtAttCertListWithSessID(self): |
---|
328 | """test10GetAttCertWithExtAttCertListWithSessID: make an attribute |
---|
329 | request using a session ID as authentication credential""" |
---|
330 | |
---|
331 | self._connect() |
---|
332 | section = 'test10GetAttCertWithExtAttCertListWithSessID' |
---|
333 | aaURI = self.cfg.get(section, 'aaURI') |
---|
334 | |
---|
335 | # Use output from test6GetAttCertWithSessID! |
---|
336 | extACFilePath = xpdVars(self.cfg.get(section, 'extACFilePath')) |
---|
337 | extAttCert = open(extACFilePath).read() |
---|
338 | |
---|
339 | attCert, errMsg, extAttCertList = self.sm.getAttCert( |
---|
340 | sessID=self.sessID, |
---|
341 | attributeAuthorityURI=aaURI, |
---|
342 | extAttCertList=[extAttCert]) |
---|
343 | if errMsg: |
---|
344 | self.fail(errMsg) |
---|
345 | |
---|
346 | print("Attribute Certificate:\n%s" % attCert) |
---|
347 | |
---|
348 | |
---|
349 | def test11GetAttCertWithUserX509Cert(self): |
---|
350 | """test11GetAttCertWithUserX509Cert: make an attribute request using |
---|
351 | a user cert as authentication credential""" |
---|
352 | self._connect2UserX509CertAuthNService() |
---|
353 | |
---|
354 | # Request an attribute certificate from an Attribute Authority |
---|
355 | # using the userX509Cert returned from connect() |
---|
356 | |
---|
357 | aaURI = self.cfg.get('test11GetAttCertWithUserX509Cert', 'aaURI') |
---|
358 | attCert, errMsg, extAttCertList = self.sm.getAttCert( |
---|
359 | userX509Cert=self.userX509Cert, |
---|
360 | attributeAuthorityURI=aaURI) |
---|
361 | if errMsg: |
---|
362 | self.fail(errMsg) |
---|
363 | |
---|
364 | print("Attribute Certificate:\n%s" % attCert) |
---|
365 | |
---|
366 | |
---|
367 | def test12GetAttCertFromLocalAAInstance(self): |
---|
368 | """test12GetAttCertFromLocalAAInstance: make an attribute request to a |
---|
369 | locally instantiated Attribute Authority""" |
---|
370 | |
---|
371 | self._connect() |
---|
372 | |
---|
373 | section = 'test12GetAttCertFromLocalAAInstance' |
---|
374 | aaPropFilePath = self.cfg.get(section, 'aaPropFilePath') |
---|
375 | attributeAuthority=AttributeAuthority(propFilePath=aaPropFilePath, |
---|
376 | propPrefix='attributeAuthority') |
---|
377 | |
---|
378 | attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, |
---|
379 | attributeAuthority=attributeAuthority) |
---|
380 | if errMsg: |
---|
381 | self.fail(errMsg) |
---|
382 | |
---|
383 | print("Attribute Certificate:\n%s" % attCert) |
---|
384 | attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) |
---|
385 | attCert.write() |
---|
386 | |
---|
387 | |
---|
388 | class SessionManagerTestSuite(unittest.TestSuite): |
---|
389 | |
---|
390 | def __init__(self): |
---|
391 | print "SessionManagerTestSuite ..." |
---|
392 | smTestCaseMap = map(SessionManagerTestCase, |
---|
393 | ( |
---|
394 | "test01Connect2AuthNServiceWithNoUserX509CertReturned", |
---|
395 | "test02Connect2AuthNServiceReturningAUserX509Cert", |
---|
396 | "test03GetSessionStatus", |
---|
397 | "test04ConnectNoCreateServerSess", |
---|
398 | "test05DisconnectWithSessID", |
---|
399 | "test06DisconnectWithUserX509Cert", |
---|
400 | "test07GetAttCertWithSessID", |
---|
401 | "test08GetAttCertRefusedWithSessID", |
---|
402 | "test09GetMappedAttCertWithSessID", |
---|
403 | "test10GetAttCertWithExtAttCertListWithSessID", |
---|
404 | "test11GetAttCertWithUserX509Cert", |
---|
405 | "test12GetAttCertFromLocalAAInstance", |
---|
406 | )) |
---|
407 | unittest.TestSuite.__init__(self, smTestCaseMap) |
---|
408 | |
---|
409 | |
---|
410 | if __name__ == "__main__": |
---|
411 | # suite = SessionManagerTestSuite() |
---|
412 | # unittest.TextTestRunner(verbosity=2).run(suite) |
---|
413 | unittest.main() |
---|