source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanager/test_sessionmanager.py @ 4447

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanager/test_sessionmanager.py@4447
Revision 4447, 17.1 KB checked in by pjkersha, 11 years ago (diff)
  • Updated Session Manager unit tests to include a call to a locally instantiated Attribute Authority
  • fixed bug in CredentialWallet?.getAttCert - ensure attributeAuthority keyword input correctly picked up.
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and attribute retrieval.  Attribute Authority services must be
4running for *AttCert* test methods.  See README in this directory for details
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "Philip.Kershaw@stfc.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20import traceback
21
22from ndg.security.common.utils.ConfigFileParsers import \
23                                                    CaseSensitiveConfigParser
24from ndg.security.common.X509 import X509CertParse
25from ndg.security.server.sessionmanager import SessionManager
26from ndg.security.server.attributeauthority import AttributeAuthority
27
28from os.path import expandvars as xpdVars
29from os.path import join as jnPath
30mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
31
32import logging
33logging.basicConfig(level=logging.DEBUG)
34
35
36class SessionManagerTestCase(unittest.TestCase):
37    """Unit test case for ndg.security.server.sessionmanager.SessionManager
38    class.
39   
40    This class manages server side sessions"""
41   
42    passphrase = None
43    test4Passphrase = None
44   
45    def setUp(self):
46       
47        if 'NDGSEC_INT_DEBUG' in os.environ:
48            import pdb
49            pdb.set_trace()
50       
51        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
52            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
53                os.path.abspath(os.path.dirname(__file__))
54       
55        self.cfg = CaseSensitiveConfigParser()
56        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
57                                "sessionMgrTest.cfg")
58        self.cfg.read(configFilePath)
59                   
60        # Initialise the Session Manager client connection
61        # Omit traceFile keyword to leave out SOAP debug info
62        self.propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
63        self.sm = SessionManager(propFilePath=self.propFilePath)
64
65    def _connect(self):
66        '''Helper method to set up connections'''
67        print "Connecting to session manager..."
68        section = 'DEFAULT'
69       
70        username = self.cfg.get(section, 'username')
71        if SessionManagerTestCase.passphrase is None and \
72           self.cfg.has_option(section, 'passphrase'):
73            SessionManagerTestCase.passphrase=self.cfg.get(section,
74                                                           'passphrase')
75       
76        if not SessionManagerTestCase.passphrase:
77            SessionManagerTestCase.passphrase = getpass.getpass(
78                            prompt="\nPass-phrase for user %s: " % username)
79
80        print("Connecting to session manager as user: %s..." % username)
81        userX509Cert, userPriKey, issuingCert, self.sessID = \
82            self.sm.connect(username=username, 
83                            passphrase=SessionManagerTestCase.passphrase)
84
85        print("User '%s' connected to Session Manager:\n%s" % (username, 
86                                                               self.sessID))
87        print("Finished setting up connection")
88
89    def _connect2UserCertAuthNService(self):
90        '''Same as _connect but Session Manager is using an Authentication
91        Service that returns PKI credentials i.e. like MyProxy'''
92       
93        section = 'DEFAULT'
94
95        print("Connecting to session manager with AuthN service returning "
96              "PKI creds...")
97               
98        # Change to alternative authentication service
99        userX509CertFilePath = self.cfg.get(section, 'userX509CertFilePath')
100        userPriKeyFilePath = self.cfg.get(section, 'userPriKeyFilePath')
101        userPriKeyPwd = self.cfg.get(section, 'userPriKeyPwd')
102                                         
103        self.sm['authNService'] = {
104            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
105            'moduleName': 'usercertauthn',
106            'className': 'UserCertAuthN',
107            'userX509CertFilePath': userX509CertFilePath,
108            'userPriKeyFilePath': userPriKeyFilePath
109        }
110
111        self.sm.initAuthNService()
112       
113        username = self.cfg.get(section, 'username')
114        if SessionManagerTestCase.passphrase is None and \
115           self.cfg.has_option(section, 'passphrase'):
116            SessionManagerTestCase.passphrase=self.cfg.get(section, 
117                                                           'passphrase')
118       
119        if not SessionManagerTestCase.passphrase:
120            SessionManagerTestCase.passphrase = getpass.getpass(\
121                prompt="\nPass-phrase for user %s: " % username)
122
123        print("Connecting to session manager as user: %s..." % username)
124        userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
125            self.sm.connect(username=username, 
126                            passphrase=SessionManagerTestCase.passphrase)
127        self.userX509Cert = X509CertParse(userX509Cert)
128       
129        print("User '%s' connected to Session Manager:\n%s" % (username, 
130                                                               self.sessID))
131        print("Finished setting up connection")
132   
133    def test01Connect2AuthNServiceWithNoUserCertReturned(self):
134       
135        thisSection = 'test01Connect2AuthNServiceWithNoUserCertReturned'
136        username = self.cfg.get(thisSection, 'username')
137        if SessionManagerTestCase.passphrase is None and \
138           self.cfg.has_option(thisSection, 'passphrase'):
139            SessionManagerTestCase.passphrase=self.cfg.get(thisSection, 
140                                                           'passphrase')
141       
142        if not SessionManagerTestCase.passphrase:
143            SessionManagerTestCase.passphrase = getpass.getpass(
144                prompt="\ntest1Connect pass-phrase for user %s: " % username)
145
146        print "Connecting to session manager as user: %s..." %username
147        userX509Cert, userPriKey, issuingCert, sessID = self.sm.connect(
148                                username=username, 
149                                passphrase=SessionManagerTestCase.passphrase)
150        assert(userX509Cert is None)
151        assert(userPriKey is None)
152        assert(issuingCert is None)
153       
154        print("User '%s' connected to Session Manager:\n%s"%(username, sessID))     
155                                 
156    def test02Connect2AuthNServiceReturningAUserCert(self):
157       
158        section = 'test02Connect2AuthNServiceReturningAUserCert'
159       
160        # Change to alternative authentication service
161        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
162        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
163        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
164        outputCredFilePath = self.cfg.get(section, 'outputCredsFilePath')
165                                         
166        self.sm['authNService'] = {
167            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
168            'moduleName': 'usercertauthn',
169            'className': 'UserCertAuthN',
170            'userX509CertFilePath': userX509CertFilePath,
171            'userPriKeyFilePath': userPriKeyFilePath
172        }
173
174        self.sm.initAuthNService()
175       
176        print("Connecting to session manager...")
177        userX509Cert, self.userPriKey, self.issuingCert, sessID = self.sm.connect(
178                                                    passphrase=userPriKeyPwd)
179        self.userX509Cert = X509CertParse(userX509Cert)
180       
181        print("Connected to Session Manager:\n%s" % sessID)
182        creds='\n'.join((self.issuingCert or '',
183                         self.userX509Cert.asPEM().strip(),
184                         self.userPriKey))
185        open(mkPath(outputCredFilePath), "w").write(creds)
186   
187           
188    def test03GetSessionStatus(self):
189        """test03GetSessionStatus: check a session is alive"""
190       
191        self._connect()
192        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
193        print "User connected to Session Manager with sessID=%s" % self.sessID
194
195        assert not self.sm.getSessionStatus(sessID='abc'), \
196            "sessID=abc shouldn't exist!"
197           
198        print "CORRECT: sessID=abc doesn't exist"
199       
200    def test04ConnectNoCreateServerSess(self):
201        """test04ConnectNoCreateServerSess: Connect to retrieve credentials
202        only - no session is created.  This makes sense only for an AuthN
203        Service that returns user credentials"""
204        section = 'test04ConnectNoCreateServerSess'
205       
206        # Change to alternative authentication service
207        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
208        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
209        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
210                                         
211        self.sm['authNService'] = {
212            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
213            'moduleName': 'usercertauthn',
214            'className': 'UserCertAuthN',
215            'userX509CertFilePath': userX509CertFilePath,
216            'userPriKeyFilePath': userPriKeyFilePath
217        }
218
219        self.sm.initAuthNService()
220       
221       
222        username = self.cfg.get(section, 'username')
223
224        if SessionManagerTestCase.test4Passphrase is None and \
225           self.cfg.has_option(section, 'passphrase'):
226            SessionManagerTestCase.test4Passphrase = self.cfg.get(section, 
227                                                                  'passphrase')
228       
229        if not SessionManagerTestCase.test4Passphrase:
230            SessionManagerTestCase.test4Passphrase = getpass.getpass(prompt=\
231                                            "\n%s pass-phrase for user %s: " % 
232                                            (section, username))
233
234        userX509Cert, userPriKey, issuingCert, sessID = \
235            self.sm.connect(username=username, 
236                            passphrase=SessionManagerTestCase.test4Passphrase,
237                            createServerSess=False)
238       
239        # Expect null session ID
240        assert not sessID, "Expecting a null session ID!"
241         
242        print("User '%s' retrieved creds. from Session Manager:\n%s" % 
243                                                    (username, sessID))
244           
245
246    def test05DisconnectWithSessID(self):
247        """test05DisconnectWithSessID: disconnect as if acting as a browser
248        client
249        """
250       
251        self._connect()       
252        self.sm.deleteUserSession(sessID=self.sessID)
253       
254        print "User disconnected from Session Manager:\n%s" % self.sessID
255           
256
257    def test06DisconnectWithUserCert(self):
258        """test5DisconnectWithUserCert: Disconnect based on a user X.509
259        cert. credential from an earlier call to connect
260        """
261       
262        self._connect2UserCertAuthNService()
263       
264        # User cert DN determines ID of session to delete
265        self.sm.deleteUserSession(userX509Cert=self.userX509Cert)
266        print "User disconnected from Session Manager:\n%s" % self.userX509Cert
267
268
269    def test07GetAttCertWithSessID(self):
270        """test07GetAttCertWithSessID: make an attribute request using
271        a session ID as authentication credential"""
272
273        self._connect()
274       
275        section = 'test07GetAttCertWithSessID'
276        aaURI = self.cfg.get(section, 'aaURI')
277        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, 
278                                                attributeAuthorityURI=aaURI)
279        if errMsg:
280            self.fail(errMsg)
281           
282        print("Attribute Certificate:\n%s" % attCert) 
283        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
284        attCert.write()
285
286
287    def test08GetAttCertRefusedWithSessID(self):
288        """test08GetAttCertRefusedWithSessID: make an attribute request using
289        a sessID as authentication credential requesting an AC from an
290        Attribute Authority where the user is NOT registered"""
291
292        self._connect()
293       
294        aaURI = self.cfg.get('test08GetAttCertRefusedWithSessID', 'aaURI')
295       
296        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, 
297                                                 attributeAuthorityURI=aaURI,
298                                                 mapFromTrustedHosts=False)
299        if errMsg:
300            print("SUCCESS - obtained expected result: %s" % errMsg)
301            return
302       
303        self.fail("Request allowed from AA where user is NOT registered!")
304
305
306    def test09GetMappedAttCertWithSessID(self):
307        """test09GetMappedAttCertWithSessID: make an attribute request using
308        a session ID as authentication credential"""
309
310        self._connect()
311       
312        # Attribute Certificate cached in test 6 can be used to get a mapped
313        # AC for this test ...
314        self.test07GetAttCertWithSessID()
315
316        aaURI = self.cfg.get('test09GetMappedAttCertWithSessID', 'aaURI')
317       
318        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID,
319                                                   attributeAuthorityURI=aaURI,
320                                                   mapFromTrustedHosts=True)
321        if errMsg:
322            self.fail(errMsg)
323           
324        print("Attribute Certificate:\n%s" % attCert) 
325
326
327    def test10GetAttCertWithExtAttCertListWithSessID(self):
328        """test10GetAttCertWithExtAttCertListWithSessID: make an attribute
329        request using a session ID as authentication credential"""
330       
331        self._connect()
332        section = 'test10GetAttCertWithExtAttCertListWithSessID'
333        aaURI = self.cfg.get(section, 'aaURI')
334       
335        # Use output from test6GetAttCertWithSessID!
336        extACFilePath = xpdVars(self.cfg.get(section, 'extACFilePath'))   
337        extAttCert = open(extACFilePath).read()
338       
339        attCert, errMsg, extAttCertList = self.sm.getAttCert(
340                                                   sessID=self.sessID, 
341                                                   attributeAuthorityURI=aaURI,
342                                                   extAttCertList=[extAttCert])
343        if errMsg:
344            self.fail(errMsg)
345         
346        print("Attribute Certificate:\n%s" % attCert) 
347
348
349    def test11GetAttCertWithUserCert(self):
350        """test11GetAttCertWithUserCert: make an attribute request using
351        a user cert as authentication credential"""
352        self._connect2UserCertAuthNService()
353
354        # Request an attribute certificate from an Attribute Authority
355        # using the userX509Cert returned from connect()
356       
357        aaURI = self.cfg.get('test11GetAttCertWithUserCert', 'aaURI')
358        attCert, errMsg, extAttCertList = self.sm.getAttCert(
359                                     userX509Cert=self.userX509Cert, 
360                                     attributeAuthorityURI=aaURI)
361        if errMsg:
362            self.fail(errMsg)
363         
364        print("Attribute Certificate:\n%s" % attCert) 
365
366
367    def test12GetAttCertFromLocalAAInstance(self):
368        """test12GetAttCertFromLocalAAInstance: make an attribute request to a
369        locally instantiated Attribute Authority"""
370
371        self._connect()
372       
373        section = 'test12GetAttCertFromLocalAAInstance'
374        aaPropFilePath = self.cfg.get(section, 'aaPropFilePath')
375        attributeAuthority=AttributeAuthority(propFilePath=aaPropFilePath,
376                                              propPrefix='attributeAuthority')
377       
378        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, 
379                                        attributeAuthority=attributeAuthority)
380        if errMsg:
381            self.fail(errMsg)
382           
383        print("Attribute Certificate:\n%s" % attCert) 
384        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
385        attCert.write()
386
387
388class SessionManagerTestSuite(unittest.TestSuite):
389   
390    def __init__(self):
391        print "SessionManagerTestSuite ..."
392        smTestCaseMap = map(SessionManagerTestCase,
393                          (
394                            "test01Connect2AuthNServiceWithNoUserCertReturned",
395                            "test02Connect2AuthNServiceReturningAUserCert",
396                            "test03GetSessionStatus",
397                            "test04ConnectNoCreateServerSess",
398                            "test05DisconnectWithSessID",
399                            "test06DisconnectWithUserCert",
400                            "test07GetAttCertWithSessID",
401                            "test08GetAttCertRefusedWithSessID",
402                            "test09GetMappedAttCertWithSessID",
403                            "test10GetAttCertWithExtAttCertListWithSessID",
404                            "test11GetAttCertWithUserCert",
405                            "test12GetAttCertFromLocalAAInstance",
406                          ))
407        unittest.TestSuite.__init__(self, smTestCaseMap)
408           
409                                                   
410if __name__ == "__main__":
411#    suite = SessionManagerTestSuite()
412#    unittest.TextTestRunner(verbosity=2).run(suite)
413    unittest.main()       
Note: See TracBrowser for help on using the repository browser.