source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanager/test_sessionmanager.py @ 4680

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanager/test_sessionmanager.py@4680
Revision 4680, 16.7 KB checked in by pjkersha, 11 years ago (diff)

Global replace to fix copyright from STFC & NERC to STFC alone because it's not possible to have copyright held by two orgs.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and attribute retrieval.  Attribute Authority services must be
4running for *AttCert* test methods.  See README in this directory for details
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "Philip.Kershaw@stfc.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20import traceback
21
22from ndg.security.common.utils.configfileparsers import \
23    CaseSensitiveConfigParser
24from ndg.security.common.X509 import X509CertParse
25from ndg.security.server.sessionmanager import SessionManager, \
26    CredentialWalletAttributeRequestDenied
27from ndg.security.server.attributeauthority import AttributeAuthority
28
29from os.path import expandvars as xpdVars
30from os.path import join as jnPath
31mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
32
33import logging
34logging.basicConfig(level=logging.DEBUG)
35
36
37class SessionManagerTestCase(unittest.TestCase):
38    """Unit test case for ndg.security.server.sessionmanager.SessionManager
39    class.
40   
41    This class manages server side sessions"""
42   
43    passphrase = None
44    test4Passphrase = None
45   
46    def setUp(self):
47       
48        if 'NDGSEC_INT_DEBUG' in os.environ:
49            import pdb
50            pdb.set_trace()
51       
52        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
53            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
54                os.path.abspath(os.path.dirname(__file__))
55       
56        self.cfg = CaseSensitiveConfigParser()
57        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
58                                "sessionMgrTest.cfg")
59        self.cfg.read(configFilePath)
60                   
61        # Initialise the Session Manager client connection
62        # Omit traceFile keyword to leave out SOAP debug info
63        self.propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
64        self.sm = SessionManager(propFilePath=self.propFilePath)
65
66    def _connect(self):
67        '''Helper method to set up connections'''
68        print "Connecting to session manager..."
69        section = 'DEFAULT'
70       
71        username = self.cfg.get(section, 'username')
72        if SessionManagerTestCase.passphrase is None and \
73           self.cfg.has_option(section, 'passphrase'):
74            SessionManagerTestCase.passphrase=self.cfg.get(section,
75                                                           'passphrase')
76       
77        if not SessionManagerTestCase.passphrase:
78            SessionManagerTestCase.passphrase = getpass.getpass(
79                            prompt="\nPass-phrase for user %s: " % username)
80
81        print("Connecting to session manager as user: %s..." % username)
82        userX509Cert, userPriKey, issuingCert, self.sessID = \
83            self.sm.connect(username=username, 
84                            passphrase=SessionManagerTestCase.passphrase)
85
86        print("User '%s' connected to Session Manager:\n%s" % (username, 
87                                                               self.sessID))
88        print("Finished setting up connection")
89
90    def _connect2UserX509CertAuthNService(self):
91        '''Same as _connect but Session Manager is using an Authentication
92        Service that returns PKI credentials i.e. like MyProxy'''
93       
94        section = 'DEFAULT'
95
96        print("Connecting to session manager with AuthN service returning "
97              "PKI creds...")
98               
99        # Change to alternative authentication service
100        userX509CertFilePath = self.cfg.get(section, 'userX509CertFilePath')
101        userPriKeyFilePath = self.cfg.get(section, 'userPriKeyFilePath')
102        userPriKeyPwd = self.cfg.get(section, 'userPriKeyPwd')
103                                         
104        self.sm['authNService'] = {
105            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
106            'moduleName': 'userx509certauthn',
107            'className': 'UserX509CertAuthN',
108            'userX509CertFilePath': userX509CertFilePath,
109            'userPriKeyFilePath': userPriKeyFilePath
110        }
111
112        self.sm.initAuthNService()
113       
114        username = self.cfg.get(section, 'username')
115        if SessionManagerTestCase.passphrase is None and \
116           self.cfg.has_option(section, 'passphrase'):
117            SessionManagerTestCase.passphrase=self.cfg.get(section, 
118                                                           'passphrase')
119       
120        if not SessionManagerTestCase.passphrase:
121            SessionManagerTestCase.passphrase = getpass.getpass(\
122                prompt="\nPass-phrase for user %s: " % username)
123
124        print("Connecting to session manager as user: %s..." % username)
125        userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
126            self.sm.connect(username=username, 
127                            passphrase=SessionManagerTestCase.passphrase)
128        self.userX509Cert = X509CertParse(userX509Cert)
129       
130        print("User '%s' connected to Session Manager:\n%s" % (username, 
131                                                               self.sessID))
132        print("Finished setting up connection")
133   
134    def test01Connect2AuthNServiceWithNoUserX509CertReturned(self):
135       
136        thisSection = 'test01Connect2AuthNServiceWithNoUserX509CertReturned'
137        username = self.cfg.get(thisSection, 'username')
138        if SessionManagerTestCase.passphrase is None and \
139           self.cfg.has_option(thisSection, 'passphrase'):
140            SessionManagerTestCase.passphrase=self.cfg.get(thisSection, 
141                                                           'passphrase')
142       
143        if not SessionManagerTestCase.passphrase:
144            SessionManagerTestCase.passphrase = getpass.getpass(
145                prompt="\ntest1Connect pass-phrase for user %s: " % username)
146
147        print "Connecting to session manager as user: %s..." %username
148        userX509Cert, userPriKey, issuingCert, sessID = self.sm.connect(
149                                username=username, 
150                                passphrase=SessionManagerTestCase.passphrase)
151        assert(userX509Cert is None)
152        assert(userPriKey is None)
153        assert(issuingCert is None)
154       
155        print("User '%s' connected to Session Manager:\n%s"%(username, sessID))     
156                                 
157    def test02Connect2AuthNServiceReturningAUserX509Cert(self):
158       
159        section = 'test02Connect2AuthNServiceReturningAUserX509Cert'
160       
161        # Change to alternative authentication service
162        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
163        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
164        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
165        outputCredFilePath = self.cfg.get(section, 'outputCredsFilePath')
166                                         
167        self.sm['authNService'] = {
168            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
169            'moduleName': 'userx509certauthn',
170            'className': 'UserX509CertAuthN',
171            'userX509CertFilePath': userX509CertFilePath,
172            'userPriKeyFilePath': userPriKeyFilePath
173        }
174
175        self.sm.initAuthNService()
176       
177        print("Connecting to session manager...")
178        userX509Cert, self.userPriKey, self.issuingCert, sessID = self.sm.connect(
179                                                    passphrase=userPriKeyPwd)
180        self.userX509Cert = X509CertParse(userX509Cert)
181       
182        print("Connected to Session Manager:\n%s" % sessID)
183        creds='\n'.join((self.issuingCert or '',
184                         self.userX509Cert.asPEM().strip(),
185                         self.userPriKey))
186        open(mkPath(outputCredFilePath), "w").write(creds)
187   
188           
189    def test03GetSessionStatus(self):
190        """test03GetSessionStatus: check a session is alive"""
191       
192        self._connect()
193        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
194        print "User connected to Session Manager with sessID=%s" % self.sessID
195
196        assert not self.sm.getSessionStatus(sessID='abc'), \
197            "sessID=abc shouldn't exist!"
198           
199        print "CORRECT: sessID=abc doesn't exist"
200       
201    def test04ConnectNoCreateServerSess(self):
202        """test04ConnectNoCreateServerSess: Connect to retrieve credentials
203        only - no session is created.  This makes sense only for an AuthN
204        Service that returns user credentials"""
205        section = 'test04ConnectNoCreateServerSess'
206       
207        # Change to alternative authentication service
208        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
209        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
210        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
211                                         
212        self.sm['authNService'] = {
213            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
214            'moduleName': 'userx509certauthn',
215            'className': 'UserX509CertAuthN',
216            'userX509CertFilePath': userX509CertFilePath,
217            'userPriKeyFilePath': userPriKeyFilePath
218        }
219
220        self.sm.initAuthNService()
221       
222       
223        username = self.cfg.get(section, 'username')
224
225        if SessionManagerTestCase.test4Passphrase is None and \
226           self.cfg.has_option(section, 'passphrase'):
227            SessionManagerTestCase.test4Passphrase = self.cfg.get(section, 
228                                                                  'passphrase')
229       
230        if not SessionManagerTestCase.test4Passphrase:
231            SessionManagerTestCase.test4Passphrase = getpass.getpass(prompt=\
232                                            "\n%s pass-phrase for user %s: " % 
233                                            (section, username))
234
235        userX509Cert, userPriKey, issuingCert, sessID = \
236            self.sm.connect(username=username, 
237                            passphrase=SessionManagerTestCase.test4Passphrase,
238                            createServerSess=False)
239       
240        # Expect null session ID
241        assert not sessID, "Expecting a null session ID!"
242         
243        print("User '%s' retrieved creds. from Session Manager:\n%s" % 
244                                                    (username, sessID))
245           
246
247    def test05DisconnectWithSessID(self):
248        """test05DisconnectWithSessID: disconnect as if acting as a browser
249        client
250        """
251       
252        self._connect()       
253        self.sm.deleteUserSession(sessID=self.sessID)
254       
255        print "User disconnected from Session Manager:\n%s" % self.sessID
256           
257
258    def test06DisconnectWithUserX509Cert(self):
259        """test5DisconnectWithUserX509Cert: Disconnect based on a user X.509
260        cert. credential from an earlier call to connect
261        """
262       
263        self._connect2UserX509CertAuthNService()
264       
265        # User cert DN determines ID of session to delete
266        self.sm.deleteUserSession(userX509Cert=self.userX509Cert)
267        print "User disconnected from Session Manager:\n%s" % self.userX509Cert
268
269
270    def test07GetAttCertWithSessID(self):
271        """test07GetAttCertWithSessID: make an attribute request using
272        a session ID as authentication credential"""
273
274        self._connect()
275       
276        section = 'test07GetAttCertWithSessID'
277        aaURI = self.cfg.get(section, 'aaURI')
278        attCert = self.sm.getAttCert(sessID=self.sessID, 
279                                     attributeAuthorityURI=aaURI)
280        print("Attribute Certificate:\n%s" % attCert) 
281        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
282        attCert.write()
283
284
285    def test08GetAttCertRefusedWithSessID(self):
286        """test08GetAttCertRefusedWithSessID: make an attribute request using
287        a sessID as authentication credential requesting an AC from an
288        Attribute Authority where the user is NOT registered"""
289
290        self._connect()
291       
292        aaURI = self.cfg.get('test08GetAttCertRefusedWithSessID', 'aaURI')
293       
294        try:
295            attCert = self.sm.getAttCert(sessID=self.sessID, 
296                                         attributeAuthorityURI=aaURI,
297                                         mapFromTrustedHosts=False)
298        except CredentialWalletAttributeRequestDenied, e:
299            print("SUCCESS - obtained expected result: %s" % e)
300            return
301       
302        self.fail("Request allowed from AA where user is NOT registered!")
303
304
305    def test09GetMappedAttCertWithSessID(self):
306        """test09GetMappedAttCertWithSessID: make an attribute request using
307        a session ID as authentication credential"""
308
309        self._connect()
310       
311        # Attribute Certificate cached in test 6 can be used to get a mapped
312        # AC for this test ...
313        self.test07GetAttCertWithSessID()
314
315        aaURI = self.cfg.get('test09GetMappedAttCertWithSessID', 'aaURI')
316       
317        attCert = self.sm.getAttCert(sessID=self.sessID,
318                                     attributeAuthorityURI=aaURI,
319                                     mapFromTrustedHosts=True)
320           
321        print("Attribute Certificate:\n%s" % attCert) 
322
323
324    def test10GetAttCertWithExtAttCertListWithSessID(self):
325        """test10GetAttCertWithExtAttCertListWithSessID: make an attribute
326        request using a session ID as authentication credential"""
327       
328        self._connect()
329        section = 'test10GetAttCertWithExtAttCertListWithSessID'
330        aaURI = self.cfg.get(section, 'aaURI')
331       
332        # Use output from test6GetAttCertWithSessID!
333        extACFilePath = xpdVars(self.cfg.get(section, 'extACFilePath'))   
334        extAttCert = open(extACFilePath).read()
335       
336        attCert = self.sm.getAttCert(sessID=self.sessID, 
337                                     attributeAuthorityURI=aaURI,
338                                     extAttCertList=[extAttCert])
339        print("Attribute Certificate:\n%s" % attCert) 
340
341
342    def test11GetAttCertWithUserX509Cert(self):
343        """test11GetAttCertWithUserX509Cert: make an attribute request using
344        a user cert as authentication credential"""
345        self._connect2UserX509CertAuthNService()
346
347        # Request an attribute certificate from an Attribute Authority
348        # using the userX509Cert returned from connect()
349       
350        aaURI = self.cfg.get('test11GetAttCertWithUserX509Cert', 'aaURI')
351        attCert = self.sm.getAttCert(userX509Cert=self.userX509Cert, 
352                                     attributeAuthorityURI=aaURI)
353        print("Attribute Certificate:\n%s" % attCert) 
354
355
356    def test12GetAttCertFromLocalAAInstance(self):
357        """test12GetAttCertFromLocalAAInstance: make an attribute request to a
358        locally instantiated Attribute Authority"""
359
360        self._connect()
361       
362        section = 'test12GetAttCertFromLocalAAInstance'
363        aaPropFilePath = self.cfg.get(section, 'aaPropFilePath')
364        attributeAuthority=AttributeAuthority(propFilePath=aaPropFilePath,
365                                              propPrefix='attributeAuthority')
366       
367        attCert = self.sm.getAttCert(sessID=self.sessID, 
368                                     attributeAuthority=attributeAuthority)           
369        print("Attribute Certificate:\n%s" % attCert) 
370        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
371        attCert.write()
372
373
374class SessionManagerTestSuite(unittest.TestSuite):
375   
376    def __init__(self):
377        print "SessionManagerTestSuite ..."
378        smTestCaseMap = map(SessionManagerTestCase,
379                          (
380                            "test01Connect2AuthNServiceWithNoUserX509CertReturned",
381                            "test02Connect2AuthNServiceReturningAUserX509Cert",
382                            "test03GetSessionStatus",
383                            "test04ConnectNoCreateServerSess",
384                            "test05DisconnectWithSessID",
385                            "test06DisconnectWithUserX509Cert",
386                            "test07GetAttCertWithSessID",
387                            "test08GetAttCertRefusedWithSessID",
388                            "test09GetMappedAttCertWithSessID",
389                            "test10GetAttCertWithExtAttCertListWithSessID",
390                            "test11GetAttCertWithUserX509Cert",
391                            "test12GetAttCertFromLocalAAInstance",
392                          ))
393        unittest.TestSuite.__init__(self, smTestCaseMap)
394           
395                                                   
396if __name__ == "__main__":
397#    suite = SessionManagerTestSuite()
398#    unittest.TextTestRunner(verbosity=2).run(suite)
399    unittest.main()       
Note: See TracBrowser for help on using the repository browser.