source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test_sessionMgr.py @ 4320

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test_sessionMgr.py@4320
Revision 4320, 13.5 KB checked in by pjkersha, 11 years ago (diff)
  • Fixed ConfigFileParsers? and re-tested CredWallet? unit tests.
  • Added standard exception types for Session Manager AbstractAuthNService interface.
  • Integrated AbstractAuthNService and refactored CredWallet? back into SessionMgr?
  • SessionMgr? unit tests part re-factored. Attribute Cert retrieval tests still to update.
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  Attribute Authority services must be running
4for *AttCert* test methods
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20import traceback
21
22from ndg.security.common.utils.ConfigFileParsers import \
23                                                    CaseSensitiveConfigParser
24from ndg.security.common.X509 import X509CertParse
25from ndg.security.server.SessionMgr import *
26
27from os.path import expandvars as xpdVars
28from os.path import join as jnPath
29mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
30
31import logging
32logging.basicConfig(level=logging.DEBUG)
33
34
35class SessionMgrTestCase(unittest.TestCase):
36    """Unit test case for ndg.security.server.SessionMgr.SessionMgr class.
37   
38    This class manages server side sessions"""
39   
40    test1Passphrase = None
41    test3Passphrase = None
42   
43    def setUp(self):
44       
45        if 'NDGSEC_INT_DEBUG' in os.environ:
46            import pdb
47            pdb.set_trace()
48       
49        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
50            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
51                os.path.abspath(os.path.dirname(__file__))
52       
53        self.cfg = CaseSensitiveConfigParser()
54        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
55                                "sessionMgrTest.cfg")
56        self.cfg.read(configFilePath)
57                   
58        # Initialise the Session Manager client connection
59        # Omit traceFile keyword to leave out SOAP debug info
60        self.propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
61        self.sm = SessionMgr(propFilePath=self.propFilePath)
62
63    def test0Init(self):
64        pass
65       
66    def _sessionMgrConnect(self):
67        '''Helper method to set up connections'''
68        print "Connecting to session manager..."
69        username = self.cfg.get('test1Connect', 'username')
70        if SessionMgrTestCase.test1Passphrase is None and \
71           self.cfg.has_option('test1Connect', 'passphrase'):
72            SessionMgrTestCase.test1Passphrase = self.cfg.get('test1Connect', 
73                                                              'passphrase')
74       
75        if not SessionMgrTestCase.test1Passphrase:
76            SessionMgrTestCase.test1Passphrase = getpass.getpass(\
77                prompt="\ntest1Connect pass-phrase for user %s: " % username)
78
79        print "Connecting to session manager as user: %s..." % username
80        userCert, self.userPriKey, self.issuingCert, self.sessID = \
81            self.sm.connect(username=username, 
82                            passphrase=SessionMgrTestCase.test1Passphrase)
83        self.userCert = X509CertParse(userCert)
84       
85        print "User '%s' connected to Session Manager:\n%s" % (username, 
86                                                               self.sessID)
87        creds='\n'.join((self.issuingCert or '',
88                         self.userCert.asPEM().strip(),
89                         self.userPriKey))
90        open(mkPath("user.creds"), "w").write(creds)
91        print "Finished setting up connection"
92       
93                                 
94    def test1Connect2AuthNServiceWithNoUserCertReturned(self):
95        """test1Connect: make a new session connecting to AuthN service where
96        no user cert. is returned"""
97       
98        username = self.cfg.get('test1Connect', 'username')
99        if SessionMgrTestCase.test1Passphrase is None and \
100           self.cfg.has_option('test1Connect', 'passphrase'):
101            SessionMgrTestCase.test1Passphrase = self.cfg.get('test1Connect', 
102                                                              'passphrase')
103       
104        if not SessionMgrTestCase.test1Passphrase:
105            SessionMgrTestCase.test1Passphrase = getpass.getpass(
106                prompt="\ntest1Connect pass-phrase for user %s: " % username)
107
108        print "Connecting to session manager as user: %s..." %username
109        userCert, userPriKey, issuingCert, sessID = self.sm.connect(
110                            username=username, 
111                            passphrase=SessionMgrTestCase.test1Passphrase)
112        assert(userCert is None)
113        assert(userPriKey is None)
114        assert(issuingCert is None)
115       
116        print "User '%s' connected to Session Manager:\n%s" % \
117                                                        (username, self.sessID)       
118                                 
119    def test2Connect2AuthNServiceReturningAUserCert(self):
120        """test1Connect: make a new session"""
121       
122        username = self.cfg.get('test1Connect', 'username')
123        if SessionMgrTestCase.test1Passphrase is None and \
124           self.cfg.has_option('test1Connect', 'passphrase'):
125            SessionMgrTestCase.test1Passphrase = self.cfg.get('test1Connect', 
126                                                              'passphrase')
127       
128        if not SessionMgrTestCase.test1Passphrase:
129            SessionMgrTestCase.test1Passphrase = getpass.getpass(
130                prompt="\ntest1Connect pass-phrase for user %s: " % username)
131
132        print "Connecting to session manager as user: %s..." %username
133        userCert, self.userPriKey, self.issuingCert, self.sessID = \
134            self.sm.connect(username=username, 
135                            passphrase=SessionMgrTestCase.test1Passphrase)
136        self.userCert = X509CertParse(userCert)
137       
138        print "User '%s' connected to Session Manager:\n%s" % \
139                                                        (username, self.sessID)
140        creds='\n'.join((self.issuingCert or '',
141                         self.userCert.asPEM().strip(),
142                         self.userPriKey))
143        open(mkPath("user.creds"), "w").write(creds)
144   
145           
146    def test2GetSessionStatus(self):
147        """test2GetSessionStatus: check a session is alive"""
148       
149        self._sessionMgrConnect()
150        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
151        print "User connected to Session Manager with sessID=%s" % self.sessID
152
153        assert not self.sm.getSessionStatus(sessID='abc'), \
154            "sessID=abc shouldn't exist!"
155           
156        print "CORRECT: sessID=abc doesn't exist"
157       
158    def test3ConnectNoCreateServerSess(self):
159        """test3ConnectNoCreateServerSess: Connect as a non browser client -
160        sessID should be None"""
161        section = 'test3ConnectNoCreateServerSess'
162        username = self.cfg.get(section, 'username')
163
164        if SessionMgrTestCase.test3Passphrase is None and \
165           self.cfg.has_option(section, passphrase):
166            SessionMgrTestCase.test3Passphrase = self.cfg.get(section, 
167                                                              'passphrase')
168       
169        if not SessionMgrTestCase.test3Passphrase:
170            SessionMgrTestCase.test3Passphrase = getpass.getpass(prompt=\
171                                            "\ntest3ConnectNoCreateServerSess "
172                                            "pass-phrase for user %s: " % 
173                                            username)
174
175        self.userCert, self.userPriKey, self.issuingCert, sessID = \
176            self.sm.connect(username=username, 
177                            passphrase=SessionMgrTestCase.test3Passphrase,
178                            createServerSess=False)
179       
180        # Expect null session ID
181        assert not sessID, "Expecting a null session ID!"
182         
183        print("User '%s' retrieved creds. from Session Manager:\n%s" % 
184                                                    (username, self.userCert))
185           
186
187    def test4DisconnectWithSessID(self):
188        """test4DisconnectWithSessID: disconnect as if acting as a browser
189        client
190        """
191       
192        self._sessionMgrConnect()       
193        self.sm.deleteUserSession(sessID=self.sessID)
194       
195        print "User disconnected from Session Manager:\n%s" % self.sessID
196           
197
198    def test5DisconnectWithUserCert(self):
199        """test5DisconnectWithUserCert: Disconnect as a command line client
200        """
201       
202        self._sessionMgrConnect()
203       
204        # Proxy cert in signature determines ID of session to
205        # delete
206        self.sm.deleteUserSession(userCert=self.userCert)
207        print "User disconnected from Session Manager:\n%s" % self.userCert
208
209
210    def test6GetAttCertWithSessID(self):
211        """test6GetAttCertWithSessID: make an attribute request using
212        a session ID as authentication credential"""
213
214        self._sessionMgrConnect()
215       
216        section = 'test6GetAttCertWithSessID'
217        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
218                                        sessID=self.sessID, 
219                                        aaURI=self.cfg.get(section, 'aaURI'))
220        if errMsg:
221            self.fail(errMsg)
222           
223        print "Attribute Certificate:\n%s" % attCert
224        attCert.filePath = xpdVars(self.cfg.get(section, 'acoutfilepath')) 
225        attCert.write()
226       
227        return self.sm
228
229
230    def test6aGetAttCertRefusedWithSessID(self):
231        """test6aGetAttCertRefusedWithSessID: make an attribute request using
232        a sessID as authentication credential requesting an AC from an
233        Attribute Authority where the user is NOT registered"""
234
235        self._sessionMgrConnect()
236       
237        aaURI = self.cfg.get('test6aGetAttCertRefusedWithSessID', 'aaURI')
238       
239        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, 
240                                                     aaURI=aaURI,
241                                                     mapFromTrustedHosts=False)
242        if errMsg:
243            print "SUCCESS - obtained expected result: %s" % errMsg
244            return
245       
246        self.fail("Request allowed from AA where user is NOT registered!")
247
248
249    def test6bGetMappedAttCertWithSessID(self):
250        """test6bGetMappedAttCertWithSessID: make an attribute request using
251        a session ID as authentication credential"""
252
253        self._sessionMgrConnect()
254       
255        # Attribute Certificate cached in test 6 can be used to get a mapped
256        # AC for this test ...
257        self.sm = self.test6GetAttCertWithSessID()
258
259        aaURI = self.cfg.get('test6bGetMappedAttCertWithSessID', 'aaURI')
260       
261        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID,
262                                                   aaURI=aaURI,
263                                                   mapFromTrustedHosts=True)
264        if errMsg:
265            self.fail(errMsg)
266           
267        print "Attribute Certificate:\n%s" % attCert 
268
269
270    def test6cGetAttCertWithExtAttCertListWithSessID(self):
271        """test6cGetAttCertWithSessID: make an attribute request using
272        a session ID as authentication credential"""
273       
274        self._sessionMgrConnect()
275        section = 'test6cGetAttCertWithExtAttCertListWithSessID'
276        aaURI = self.cfg.get(section, 'aaURI')
277       
278        # Use output from test6GetAttCertWithSessID!
279        extACFilePath = xpdVars(self.cfg.get(section, 'extACFilePath'))   
280        extAttCert = open(extACFilePath).read()
281       
282        attCert, errMsg, extAttCertList = self.sm.getAttCert(
283                                                   sessID=self.sessID, 
284                                                   aaURI=aaURI,
285                                                   extAttCertList=[extAttCert])
286        if errMsg:
287            self.fail(errMsg)
288         
289        print "Attribute Certificate:\n%s" % attCert 
290
291
292    def test7GetAttCertWithUserCert(self):
293        """test7GetAttCertWithUserCert: make an attribute request using
294        a user cert as authentication credential"""
295        self._sessionMgrConnect()
296
297        # Request an attribute certificate from an Attribute Authority
298        # using the userCert returned from connect()
299       
300        aaURI = self.cfg.get('test7GetAttCertWithUserCert', 'aaURI')
301        attCert, errMsg, extAttCertList = self.sm.getAttCert(
302                                     userCert=self.userCert, aaURI=aaURI)
303        if errMsg:
304            self.fail(errMsg)
305         
306        print "Attribute Certificate:\n%s" % attCert 
307
308
309#_____________________________________________________________________________       
310class SessionMgrTestSuite(unittest.TestSuite):
311   
312    def __init__(self):
313        print "SessionMgrTestSuite ..."
314        smTestCaseMap = map(SessionMgrTestCase,
315                          (
316                            "test1Connect",
317                            "test2GetSessionStatus",
318                            "test3ConnectNoCreateServerSess",
319                            "test4DisconnectWithSessID",
320                            "test5DisconnectWithUserCert",
321                            "test6GetAttCertWithSessID",
322                            "test6bGetMappedAttCertWithSessID",
323                            "test6cGetAttCertWithExtAttCertListWithSessID",
324                            "test7GetAttCertWithUserCert",
325                          ))
326        unittest.TestSuite.__init__(self, smTestCaseMap)
327           
328                                                   
329if __name__ == "__main__":
330#    suite = SessionMgrTestSuite()
331#    unittest.TextTestRunner(verbosity=2).run(suite)
332    unittest.main()       
Note: See TracBrowser for help on using the repository browser.