source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanager/test_sessionmanager.py @ 4734

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanager/test_sessionmanager.py@4734
Revision 4734, 16.7 KB checked in by pjkersha, 11 years ago (diff)

Refactored Session Manager unit tests separating out test files into the config dir.

  • Property svn:executable set to *
RevLine 
[3024]1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
[4406]3authentication and attribute retrieval.  Attribute Authority services must be
4running for *AttCert* test methods.  See README in this directory for details
[3024]5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
[3044]9__date__ = "20/11/07"
[4680]10__copyright__ = "(C) 2007 STFC"
[3024]11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
[4404]14__contact__ = "Philip.Kershaw@stfc.ac.uk"
[3044]15__revision__ = '$Id$'
[4734]16import logging
17logging.basicConfig(level=logging.DEBUG)
[3024]18
19import unittest
20import os, sys, getpass, re
21from ConfigParser import SafeConfigParser
22
[4734]23from os.path import expandvars as xpdVars
24from os.path import join as jnPath
25mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
26
27from ndg.security.test import BaseTestCase
28
[4573]29from ndg.security.common.utils.configfileparsers import \
[4576]30    CaseSensitiveConfigParser
[3041]31from ndg.security.common.X509 import X509CertParse
[4576]32from ndg.security.server.sessionmanager import SessionManager, \
33    CredentialWalletAttributeRequestDenied
[4447]34from ndg.security.server.attributeauthority import AttributeAuthority
[3024]35
36
[4734]37class SessionManagerTestCase(BaseTestCase):
[4401]38    """Unit test case for ndg.security.server.sessionmanager.SessionManager
39    class.
[3040]40   
41    This class manages server side sessions"""
42   
[4401]43    passphrase = None
44    test4Passphrase = None
[3192]45   
[3024]46    def setUp(self):
[4734]47        super(SessionManagerTestCase, self).setUp()
[3024]48       
[3192]49        if 'NDGSEC_INT_DEBUG' in os.environ:
50            import pdb
51            pdb.set_trace()
52       
53        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
54            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
55                os.path.abspath(os.path.dirname(__file__))
56       
[4304]57        self.cfg = CaseSensitiveConfigParser()
[3192]58        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
59                                "sessionMgrTest.cfg")
60        self.cfg.read(configFilePath)
61                   
[3024]62        # Initialise the Session Manager client connection
63        # Omit traceFile keyword to leave out SOAP debug info
[4304]64        self.propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
[4384]65        self.sm = SessionManager(propFilePath=self.propFilePath)
[4158]66
[4401]67    def _connect(self):
[4304]68        '''Helper method to set up connections'''
[4158]69        print "Connecting to session manager..."
[4401]70        section = 'DEFAULT'
[3192]71       
[4401]72        username = self.cfg.get(section, 'username')
73        if SessionManagerTestCase.passphrase is None and \
74           self.cfg.has_option(section, 'passphrase'):
75            SessionManagerTestCase.passphrase=self.cfg.get(section,
76                                                           'passphrase')
77       
78        if not SessionManagerTestCase.passphrase:
79            SessionManagerTestCase.passphrase = getpass.getpass(
80                            prompt="\nPass-phrase for user %s: " % username)
[4158]81
[4401]82        print("Connecting to session manager as user: %s..." % username)
83        userX509Cert, userPriKey, issuingCert, self.sessID = \
[4158]84            self.sm.connect(username=username, 
[4401]85                            passphrase=SessionManagerTestCase.passphrase)
86
87        print("User '%s' connected to Session Manager:\n%s" % (username, 
88                                                               self.sessID))
89        print("Finished setting up connection")
90
[4513]91    def _connect2UserX509CertAuthNService(self):
[4401]92        '''Same as _connect but Session Manager is using an Authentication
93        Service that returns PKI credentials i.e. like MyProxy'''
[4158]94       
[4401]95        section = 'DEFAULT'
96
97        print("Connecting to session manager with AuthN service returning "
98              "PKI creds...")
99               
100        # Change to alternative authentication service
101        userX509CertFilePath = self.cfg.get(section, 'userX509CertFilePath')
102        userPriKeyFilePath = self.cfg.get(section, 'userPriKeyFilePath')
103        userPriKeyPwd = self.cfg.get(section, 'userPriKeyPwd')
104                                         
105        self.sm['authNService'] = {
106            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
[4513]107            'moduleName': 'userx509certauthn',
108            'className': 'UserX509CertAuthN',
[4401]109            'userX509CertFilePath': userX509CertFilePath,
110            'userPriKeyFilePath': userPriKeyFilePath
111        }
112
113        self.sm.initAuthNService()
114       
115        username = self.cfg.get(section, 'username')
116        if SessionManagerTestCase.passphrase is None and \
117           self.cfg.has_option(section, 'passphrase'):
118            SessionManagerTestCase.passphrase=self.cfg.get(section, 
119                                                           'passphrase')
120       
121        if not SessionManagerTestCase.passphrase:
122            SessionManagerTestCase.passphrase = getpass.getpass(\
123                prompt="\nPass-phrase for user %s: " % username)
124
125        print("Connecting to session manager as user: %s..." % username)
126        userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
127            self.sm.connect(username=username, 
128                            passphrase=SessionManagerTestCase.passphrase)
129        self.userX509Cert = X509CertParse(userX509Cert)
130       
131        print("User '%s' connected to Session Manager:\n%s" % (username, 
132                                                               self.sessID))
133        print("Finished setting up connection")
134   
[4513]135    def test01Connect2AuthNServiceWithNoUserX509CertReturned(self):
[4320]136       
[4513]137        thisSection = 'test01Connect2AuthNServiceWithNoUserX509CertReturned'
[4447]138        username = self.cfg.get(thisSection, 'username')
[4401]139        if SessionManagerTestCase.passphrase is None and \
[4447]140           self.cfg.has_option(thisSection, 'passphrase'):
141            SessionManagerTestCase.passphrase=self.cfg.get(thisSection, 
142                                                           'passphrase')
[4320]143       
[4401]144        if not SessionManagerTestCase.passphrase:
145            SessionManagerTestCase.passphrase = getpass.getpass(
[4320]146                prompt="\ntest1Connect pass-phrase for user %s: " % username)
147
148        print "Connecting to session manager as user: %s..." %username
[4401]149        userX509Cert, userPriKey, issuingCert, sessID = self.sm.connect(
150                                username=username, 
151                                passphrase=SessionManagerTestCase.passphrase)
152        assert(userX509Cert is None)
[4320]153        assert(userPriKey is None)
154        assert(issuingCert is None)
155       
[4447]156        print("User '%s' connected to Session Manager:\n%s"%(username, sessID))     
[4320]157                                 
[4513]158    def test02Connect2AuthNServiceReturningAUserX509Cert(self):
[3024]159       
[4513]160        section = 'test02Connect2AuthNServiceReturningAUserX509Cert'
[3024]161       
[4401]162        # Change to alternative authentication service
163        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
164        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
165        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
166        outputCredFilePath = self.cfg.get(section, 'outputCredsFilePath')
167                                         
168        self.sm['authNService'] = {
169            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
[4516]170            'moduleName': 'userx509certauthn',
[4513]171            'className': 'UserX509CertAuthN',
[4401]172            'userX509CertFilePath': userX509CertFilePath,
173            'userPriKeyFilePath': userPriKeyFilePath
174        }
[3024]175
[4401]176        self.sm.initAuthNService()
[3041]177       
[4401]178        print("Connecting to session manager...")
179        userX509Cert, self.userPriKey, self.issuingCert, sessID = self.sm.connect(
180                                                    passphrase=userPriKeyPwd)
181        self.userX509Cert = X509CertParse(userX509Cert)
182       
183        print("Connected to Session Manager:\n%s" % sessID)
[4120]184        creds='\n'.join((self.issuingCert or '',
[4401]185                         self.userX509Cert.asPEM().strip(),
[4120]186                         self.userPriKey))
[4401]187        open(mkPath(outputCredFilePath), "w").write(creds)
[3192]188   
[3024]189           
[4447]190    def test03GetSessionStatus(self):
191        """test03GetSessionStatus: check a session is alive"""
[3044]192       
[4401]193        self._connect()
[3041]194        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
195        print "User connected to Session Manager with sessID=%s" % self.sessID
196
197        assert not self.sm.getSessionStatus(sessID='abc'), \
198            "sessID=abc shouldn't exist!"
199           
200        print "CORRECT: sessID=abc doesn't exist"
[3040]201       
[4447]202    def test04ConnectNoCreateServerSess(self):
203        """test04ConnectNoCreateServerSess: Connect to retrieve credentials
[4401]204        only - no session is created.  This makes sense only for an AuthN
205        Service that returns user credentials"""
[4447]206        section = 'test04ConnectNoCreateServerSess'
[4401]207       
208        # Change to alternative authentication service
209        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
210        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
211        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
212                                         
213        self.sm['authNService'] = {
214            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
[4516]215            'moduleName': 'userx509certauthn',
[4513]216            'className': 'UserX509CertAuthN',
[4401]217            'userX509CertFilePath': userX509CertFilePath,
218            'userPriKeyFilePath': userPriKeyFilePath
219        }
220
221        self.sm.initAuthNService()
222       
223       
[4304]224        username = self.cfg.get(section, 'username')
[3024]225
[4401]226        if SessionManagerTestCase.test4Passphrase is None and \
[4397]227           self.cfg.has_option(section, 'passphrase'):
[4401]228            SessionManagerTestCase.test4Passphrase = self.cfg.get(section, 
[4397]229                                                                  'passphrase')
[3139]230       
[4401]231        if not SessionManagerTestCase.test4Passphrase:
232            SessionManagerTestCase.test4Passphrase = getpass.getpass(prompt=\
233                                            "\n%s pass-phrase for user %s: " % 
234                                            (section, username))
[3024]235
[4401]236        userX509Cert, userPriKey, issuingCert, sessID = \
[3041]237            self.sm.connect(username=username, 
[4401]238                            passphrase=SessionManagerTestCase.test4Passphrase,
[3041]239                            createServerSess=False)
[3024]240       
241        # Expect null session ID
[3041]242        assert not sessID, "Expecting a null session ID!"
[3024]243         
[4304]244        print("User '%s' retrieved creds. from Session Manager:\n%s" % 
[4401]245                                                    (username, sessID))
[3024]246           
247
[4447]248    def test05DisconnectWithSessID(self):
249        """test05DisconnectWithSessID: disconnect as if acting as a browser
[4304]250        client
[3024]251        """
252       
[4401]253        self._connect()       
[3041]254        self.sm.deleteUserSession(sessID=self.sessID)
[3024]255       
256        print "User disconnected from Session Manager:\n%s" % self.sessID
257           
258
[4513]259    def test06DisconnectWithUserX509Cert(self):
260        """test5DisconnectWithUserX509Cert: Disconnect based on a user X.509
[4401]261        cert. credential from an earlier call to connect
[3024]262        """
263       
[4513]264        self._connect2UserX509CertAuthNService()
[3024]265       
[4401]266        # User cert DN determines ID of session to delete
267        self.sm.deleteUserSession(userX509Cert=self.userX509Cert)
268        print "User disconnected from Session Manager:\n%s" % self.userX509Cert
[3024]269
270
[4447]271    def test07GetAttCertWithSessID(self):
272        """test07GetAttCertWithSessID: make an attribute request using
[3024]273        a session ID as authentication credential"""
274
[4401]275        self._connect()
[3024]276       
[4447]277        section = 'test07GetAttCertWithSessID'
[4401]278        aaURI = self.cfg.get(section, 'aaURI')
[4576]279        attCert = self.sm.getAttCert(sessID=self.sessID, 
280                                     attributeAuthorityURI=aaURI)
[4402]281        print("Attribute Certificate:\n%s" % attCert) 
[4401]282        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
[3024]283        attCert.write()
284
285
[4447]286    def test08GetAttCertRefusedWithSessID(self):
287        """test08GetAttCertRefusedWithSessID: make an attribute request using
[3024]288        a sessID as authentication credential requesting an AC from an
289        Attribute Authority where the user is NOT registered"""
290
[4401]291        self._connect()
[3024]292       
[4447]293        aaURI = self.cfg.get('test08GetAttCertRefusedWithSessID', 'aaURI')
[3024]294       
[4576]295        try:
296            attCert = self.sm.getAttCert(sessID=self.sessID, 
297                                         attributeAuthorityURI=aaURI,
298                                         mapFromTrustedHosts=False)
299        except CredentialWalletAttributeRequestDenied, e:
300            print("SUCCESS - obtained expected result: %s" % e)
[3024]301            return
302       
303        self.fail("Request allowed from AA where user is NOT registered!")
304
305
[4447]306    def test09GetMappedAttCertWithSessID(self):
307        """test09GetMappedAttCertWithSessID: make an attribute request using
[3024]308        a session ID as authentication credential"""
309
[4401]310        self._connect()
[3024]311       
[3044]312        # Attribute Certificate cached in test 6 can be used to get a mapped
313        # AC for this test ...
[4447]314        self.test07GetAttCertWithSessID()
[3044]315
[4447]316        aaURI = self.cfg.get('test09GetMappedAttCertWithSessID', 'aaURI')
[3024]317       
[4576]318        attCert = self.sm.getAttCert(sessID=self.sessID,
319                                     attributeAuthorityURI=aaURI,
320                                     mapFromTrustedHosts=True)
[3044]321           
[4402]322        print("Attribute Certificate:\n%s" % attCert) 
[3024]323
324
[4402]325    def test10GetAttCertWithExtAttCertListWithSessID(self):
326        """test10GetAttCertWithExtAttCertListWithSessID: make an attribute
327        request using a session ID as authentication credential"""
[3024]328       
[4401]329        self._connect()
[4402]330        section = 'test10GetAttCertWithExtAttCertListWithSessID'
[4304]331        aaURI = self.cfg.get(section, 'aaURI')
[3024]332       
[3139]333        # Use output from test6GetAttCertWithSessID!
[4304]334        extACFilePath = xpdVars(self.cfg.get(section, 'extACFilePath'))   
[3024]335        extAttCert = open(extACFilePath).read()
336       
[4576]337        attCert = self.sm.getAttCert(sessID=self.sessID, 
338                                     attributeAuthorityURI=aaURI,
339                                     extAttCertList=[extAttCert])
[4402]340        print("Attribute Certificate:\n%s" % attCert) 
[3024]341
342
[4513]343    def test11GetAttCertWithUserX509Cert(self):
344        """test11GetAttCertWithUserX509Cert: make an attribute request using
[3139]345        a user cert as authentication credential"""
[4513]346        self._connect2UserX509CertAuthNService()
[3024]347
348        # Request an attribute certificate from an Attribute Authority
[4401]349        # using the userX509Cert returned from connect()
[3024]350       
[4513]351        aaURI = self.cfg.get('test11GetAttCertWithUserX509Cert', 'aaURI')
[4576]352        attCert = self.sm.getAttCert(userX509Cert=self.userX509Cert, 
[4402]353                                     attributeAuthorityURI=aaURI)
354        print("Attribute Certificate:\n%s" % attCert) 
[3024]355
356
[4447]357    def test12GetAttCertFromLocalAAInstance(self):
358        """test12GetAttCertFromLocalAAInstance: make an attribute request to a
359        locally instantiated Attribute Authority"""
360
361        self._connect()
362       
363        section = 'test12GetAttCertFromLocalAAInstance'
364        aaPropFilePath = self.cfg.get(section, 'aaPropFilePath')
[4734]365        attributeAuthority=AttributeAuthority(propFilePath=aaPropFilePath)
[4447]366       
[4576]367        attCert = self.sm.getAttCert(sessID=self.sessID, 
368                                     attributeAuthority=attributeAuthority)           
[4447]369        print("Attribute Certificate:\n%s" % attCert) 
370        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
371        attCert.write()
372
373
[4384]374class SessionManagerTestSuite(unittest.TestSuite):
[3024]375   
376    def __init__(self):
[4384]377        print "SessionManagerTestSuite ..."
378        smTestCaseMap = map(SessionManagerTestCase,
[3192]379                          (
[4513]380                            "test01Connect2AuthNServiceWithNoUserX509CertReturned",
381                            "test02Connect2AuthNServiceReturningAUserX509Cert",
[4447]382                            "test03GetSessionStatus",
383                            "test04ConnectNoCreateServerSess",
384                            "test05DisconnectWithSessID",
[4513]385                            "test06DisconnectWithUserX509Cert",
[4447]386                            "test07GetAttCertWithSessID",
387                            "test08GetAttCertRefusedWithSessID",
388                            "test09GetMappedAttCertWithSessID",
[4402]389                            "test10GetAttCertWithExtAttCertListWithSessID",
[4513]390                            "test11GetAttCertWithUserX509Cert",
[4447]391                            "test12GetAttCertFromLocalAAInstance",
[3192]392                          ))
393        unittest.TestSuite.__init__(self, smTestCaseMap)
[3024]394           
395                                                   
396if __name__ == "__main__":
[4384]397#    suite = SessionManagerTestSuite()
[3192]398#    unittest.TextTestRunner(verbosity=2).run(suite)
[3024]399    unittest.main()       
Note: See TracBrowser for help on using the repository browser.