source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanager/test_sessionmanager.py @ 4770

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanager/test_sessionmanager.py@4770
Revision 4770, 16.7 KB checked in by pjkersha, 11 years ago (diff)

Updated copyright

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and attribute retrieval.  Attribute Authority services must be
4running for *AttCert* test methods.  See README in this directory for details
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2009 Science and Technology Facilities Council"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "Philip.Kershaw@stfc.ac.uk"
15__revision__ = '$Id$'
16import logging
17logging.basicConfig(level=logging.DEBUG)
18
19import unittest
20import os, sys, getpass, re
21from ConfigParser import SafeConfigParser
22
23from os.path import expandvars as xpdVars
24from os.path import join as jnPath
25mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
26
27from ndg.security.test import BaseTestCase
28
29from ndg.security.common.utils.configfileparsers import \
30    CaseSensitiveConfigParser
31from ndg.security.common.X509 import X509CertParse
32from ndg.security.server.sessionmanager import SessionManager, \
33    CredentialWalletAttributeRequestDenied
34from ndg.security.server.attributeauthority import AttributeAuthority
35
36
37class SessionManagerTestCase(BaseTestCase):
38    """Unit test case for ndg.security.server.sessionmanager.SessionManager
39    class.
40   
41    This class manages server side sessions"""
42   
43    passphrase = None
44    test4Passphrase = None
45   
46    def setUp(self):
47        super(SessionManagerTestCase, self).setUp()
48       
49        if 'NDGSEC_INT_DEBUG' in os.environ:
50            import pdb
51            pdb.set_trace()
52       
53        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
54            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
55                os.path.abspath(os.path.dirname(__file__))
56       
57        self.cfg = CaseSensitiveConfigParser()
58        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
59                                "sessionMgrTest.cfg")
60        self.cfg.read(configFilePath)
61                   
62        # Initialise the Session Manager client connection
63        # Omit traceFile keyword to leave out SOAP debug info
64        self.propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
65        self.sm = SessionManager(propFilePath=self.propFilePath)
66
67    def _connect(self):
68        '''Helper method to set up connections'''
69        print "Connecting to session manager..."
70        section = 'DEFAULT'
71       
72        username = self.cfg.get(section, 'username')
73        if SessionManagerTestCase.passphrase is None and \
74           self.cfg.has_option(section, 'passphrase'):
75            SessionManagerTestCase.passphrase=self.cfg.get(section,
76                                                           'passphrase')
77       
78        if not SessionManagerTestCase.passphrase:
79            SessionManagerTestCase.passphrase = getpass.getpass(
80                            prompt="\nPass-phrase for user %s: " % username)
81
82        print("Connecting to session manager as user: %s..." % username)
83        userX509Cert, userPriKey, issuingCert, self.sessID = \
84            self.sm.connect(username=username, 
85                            passphrase=SessionManagerTestCase.passphrase)
86
87        print("User '%s' connected to Session Manager:\n%s" % (username, 
88                                                               self.sessID))
89        print("Finished setting up connection")
90
91    def _connect2UserX509CertAuthNService(self):
92        '''Same as _connect but Session Manager is using an Authentication
93        Service that returns PKI credentials i.e. like MyProxy'''
94       
95        section = 'DEFAULT'
96
97        print("Connecting to session manager with AuthN service returning "
98              "PKI creds...")
99               
100        # Change to alternative authentication service
101        userX509CertFilePath = self.cfg.get(section, 'userX509CertFilePath')
102        userPriKeyFilePath = self.cfg.get(section, 'userPriKeyFilePath')
103        userPriKeyPwd = self.cfg.get(section, 'userPriKeyPwd')
104                                         
105        self.sm['authNService'] = {
106            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
107            'moduleName': 'userx509certauthn',
108            'className': 'UserX509CertAuthN',
109            'userX509CertFilePath': userX509CertFilePath,
110            'userPriKeyFilePath': userPriKeyFilePath
111        }
112
113        self.sm.initAuthNService()
114       
115        username = self.cfg.get(section, 'username')
116        if SessionManagerTestCase.passphrase is None and \
117           self.cfg.has_option(section, 'passphrase'):
118            SessionManagerTestCase.passphrase=self.cfg.get(section, 
119                                                           'passphrase')
120       
121        if not SessionManagerTestCase.passphrase:
122            SessionManagerTestCase.passphrase = getpass.getpass(\
123                prompt="\nPass-phrase for user %s: " % username)
124
125        print("Connecting to session manager as user: %s..." % username)
126        userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
127            self.sm.connect(username=username, 
128                            passphrase=SessionManagerTestCase.passphrase)
129        self.userX509Cert = X509CertParse(userX509Cert)
130       
131        print("User '%s' connected to Session Manager:\n%s" % (username, 
132                                                               self.sessID))
133        print("Finished setting up connection")
134   
135    def test01Connect2AuthNServiceWithNoUserX509CertReturned(self):
136       
137        thisSection = 'test01Connect2AuthNServiceWithNoUserX509CertReturned'
138        username = self.cfg.get(thisSection, 'username')
139        if SessionManagerTestCase.passphrase is None and \
140           self.cfg.has_option(thisSection, 'passphrase'):
141            SessionManagerTestCase.passphrase=self.cfg.get(thisSection, 
142                                                           'passphrase')
143       
144        if not SessionManagerTestCase.passphrase:
145            SessionManagerTestCase.passphrase = getpass.getpass(
146                prompt="\ntest1Connect pass-phrase for user %s: " % username)
147
148        print "Connecting to session manager as user: %s..." %username
149        userX509Cert, userPriKey, issuingCert, sessID = self.sm.connect(
150                                username=username, 
151                                passphrase=SessionManagerTestCase.passphrase)
152        assert(userX509Cert is None)
153        assert(userPriKey is None)
154        assert(issuingCert is None)
155       
156        print("User '%s' connected to Session Manager:\n%s"%(username, sessID))     
157                                 
158    def test02Connect2AuthNServiceReturningAUserX509Cert(self):
159       
160        section = 'test02Connect2AuthNServiceReturningAUserX509Cert'
161       
162        # Change to alternative authentication service
163        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
164        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
165        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
166        outputCredFilePath = self.cfg.get(section, 'outputCredsFilePath')
167                                         
168        self.sm['authNService'] = {
169            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
170            'moduleName': 'userx509certauthn',
171            'className': 'UserX509CertAuthN',
172            'userX509CertFilePath': userX509CertFilePath,
173            'userPriKeyFilePath': userPriKeyFilePath
174        }
175
176        self.sm.initAuthNService()
177       
178        print("Connecting to session manager...")
179        userX509Cert, self.userPriKey, self.issuingCert, sessID = self.sm.connect(
180                                                    passphrase=userPriKeyPwd)
181        self.userX509Cert = X509CertParse(userX509Cert)
182       
183        print("Connected to Session Manager:\n%s" % sessID)
184        creds='\n'.join((self.issuingCert or '',
185                         self.userX509Cert.asPEM().strip(),
186                         self.userPriKey))
187        open(mkPath(outputCredFilePath), "w").write(creds)
188   
189           
190    def test03GetSessionStatus(self):
191        """test03GetSessionStatus: check a session is alive"""
192       
193        self._connect()
194        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
195        print "User connected to Session Manager with sessID=%s" % self.sessID
196
197        assert not self.sm.getSessionStatus(sessID='abc'), \
198            "sessID=abc shouldn't exist!"
199           
200        print "CORRECT: sessID=abc doesn't exist"
201       
202    def test04ConnectNoCreateServerSess(self):
203        """test04ConnectNoCreateServerSess: Connect to retrieve credentials
204        only - no session is created.  This makes sense only for an AuthN
205        Service that returns user credentials"""
206        section = 'test04ConnectNoCreateServerSess'
207       
208        # Change to alternative authentication service
209        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
210        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
211        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
212                                         
213        self.sm['authNService'] = {
214            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
215            'moduleName': 'userx509certauthn',
216            'className': 'UserX509CertAuthN',
217            'userX509CertFilePath': userX509CertFilePath,
218            'userPriKeyFilePath': userPriKeyFilePath
219        }
220
221        self.sm.initAuthNService()
222       
223       
224        username = self.cfg.get(section, 'username')
225
226        if SessionManagerTestCase.test4Passphrase is None and \
227           self.cfg.has_option(section, 'passphrase'):
228            SessionManagerTestCase.test4Passphrase = self.cfg.get(section, 
229                                                                  'passphrase')
230       
231        if not SessionManagerTestCase.test4Passphrase:
232            SessionManagerTestCase.test4Passphrase = getpass.getpass(prompt=\
233                                            "\n%s pass-phrase for user %s: " % 
234                                            (section, username))
235
236        userX509Cert, userPriKey, issuingCert, sessID = \
237            self.sm.connect(username=username, 
238                            passphrase=SessionManagerTestCase.test4Passphrase,
239                            createServerSess=False)
240       
241        # Expect null session ID
242        assert not sessID, "Expecting a null session ID!"
243         
244        print("User '%s' retrieved creds. from Session Manager:\n%s" % 
245                                                    (username, sessID))
246           
247
248    def test05DisconnectWithSessID(self):
249        """test05DisconnectWithSessID: disconnect as if acting as a browser
250        client
251        """
252       
253        self._connect()       
254        self.sm.deleteUserSession(sessID=self.sessID)
255       
256        print "User disconnected from Session Manager:\n%s" % self.sessID
257           
258
259    def test06DisconnectWithUserX509Cert(self):
260        """test5DisconnectWithUserX509Cert: Disconnect based on a user X.509
261        cert. credential from an earlier call to connect
262        """
263       
264        self._connect2UserX509CertAuthNService()
265       
266        # User cert DN determines ID of session to delete
267        self.sm.deleteUserSession(userX509Cert=self.userX509Cert)
268        print "User disconnected from Session Manager:\n%s" % self.userX509Cert
269
270
271    def test07GetAttCertWithSessID(self):
272        """test07GetAttCertWithSessID: make an attribute request using
273        a session ID as authentication credential"""
274
275        self._connect()
276       
277        section = 'test07GetAttCertWithSessID'
278        aaURI = self.cfg.get(section, 'aaURI')
279        attCert = self.sm.getAttCert(sessID=self.sessID, 
280                                     attributeAuthorityURI=aaURI)
281        print("Attribute Certificate:\n%s" % attCert) 
282        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
283        attCert.write()
284
285
286    def test08GetAttCertRefusedWithSessID(self):
287        """test08GetAttCertRefusedWithSessID: make an attribute request using
288        a sessID as authentication credential requesting an AC from an
289        Attribute Authority where the user is NOT registered"""
290
291        self._connect()
292       
293        aaURI = self.cfg.get('test08GetAttCertRefusedWithSessID', 'aaURI')
294       
295        try:
296            attCert = self.sm.getAttCert(sessID=self.sessID, 
297                                         attributeAuthorityURI=aaURI,
298                                         mapFromTrustedHosts=False)
299        except CredentialWalletAttributeRequestDenied, e:
300            print("SUCCESS - obtained expected result: %s" % e)
301            return
302       
303        self.fail("Request allowed from AA where user is NOT registered!")
304
305
306    def test09GetMappedAttCertWithSessID(self):
307        """test09GetMappedAttCertWithSessID: make an attribute request using
308        a session ID as authentication credential"""
309
310        self._connect()
311       
312        # Attribute Certificate cached in test 6 can be used to get a mapped
313        # AC for this test ...
314        self.test07GetAttCertWithSessID()
315
316        aaURI = self.cfg.get('test09GetMappedAttCertWithSessID', 'aaURI')
317       
318        attCert = self.sm.getAttCert(sessID=self.sessID,
319                                     attributeAuthorityURI=aaURI,
320                                     mapFromTrustedHosts=True)
321           
322        print("Attribute Certificate:\n%s" % attCert) 
323
324
325    def test10GetAttCertWithExtAttCertListWithSessID(self):
326        """test10GetAttCertWithExtAttCertListWithSessID: make an attribute
327        request using a session ID as authentication credential"""
328       
329        self._connect()
330        section = 'test10GetAttCertWithExtAttCertListWithSessID'
331        aaURI = self.cfg.get(section, 'aaURI')
332       
333        # Use output from test6GetAttCertWithSessID!
334        extACFilePath = xpdVars(self.cfg.get(section, 'extACFilePath'))   
335        extAttCert = open(extACFilePath).read()
336       
337        attCert = self.sm.getAttCert(sessID=self.sessID, 
338                                     attributeAuthorityURI=aaURI,
339                                     extAttCertList=[extAttCert])
340        print("Attribute Certificate:\n%s" % attCert) 
341
342
343    def test11GetAttCertWithUserX509Cert(self):
344        """test11GetAttCertWithUserX509Cert: make an attribute request using
345        a user cert as authentication credential"""
346        self._connect2UserX509CertAuthNService()
347
348        # Request an attribute certificate from an Attribute Authority
349        # using the userX509Cert returned from connect()
350       
351        aaURI = self.cfg.get('test11GetAttCertWithUserX509Cert', 'aaURI')
352        attCert = self.sm.getAttCert(userX509Cert=self.userX509Cert, 
353                                     attributeAuthorityURI=aaURI)
354        print("Attribute Certificate:\n%s" % attCert) 
355
356
357    def test12GetAttCertFromLocalAAInstance(self):
358        """test12GetAttCertFromLocalAAInstance: make an attribute request to a
359        locally instantiated Attribute Authority"""
360
361        self._connect()
362       
363        section = 'test12GetAttCertFromLocalAAInstance'
364        aaPropFilePath = self.cfg.get(section, 'aaPropFilePath')
365        attributeAuthority=AttributeAuthority(propFilePath=aaPropFilePath)
366       
367        attCert = self.sm.getAttCert(sessID=self.sessID, 
368                                     attributeAuthority=attributeAuthority)           
369        print("Attribute Certificate:\n%s" % attCert) 
370        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
371        attCert.write()
372
373
374class SessionManagerTestSuite(unittest.TestSuite):
375   
376    def __init__(self):
377        print "SessionManagerTestSuite ..."
378        smTestCaseMap = map(SessionManagerTestCase,
379                          (
380                            "test01Connect2AuthNServiceWithNoUserX509CertReturned",
381                            "test02Connect2AuthNServiceReturningAUserX509Cert",
382                            "test03GetSessionStatus",
383                            "test04ConnectNoCreateServerSess",
384                            "test05DisconnectWithSessID",
385                            "test06DisconnectWithUserX509Cert",
386                            "test07GetAttCertWithSessID",
387                            "test08GetAttCertRefusedWithSessID",
388                            "test09GetMappedAttCertWithSessID",
389                            "test10GetAttCertWithExtAttCertListWithSessID",
390                            "test11GetAttCertWithUserX509Cert",
391                            "test12GetAttCertFromLocalAAInstance",
392                          ))
393        unittest.TestSuite.__init__(self, smTestCaseMap)
394           
395                                                   
396if __name__ == "__main__":
397#    suite = SessionManagerTestSuite()
398#    unittest.TextTestRunner(verbosity=2).run(suite)
399    unittest.main()       
Note: See TracBrowser for help on using the repository browser.