source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py @ 4158

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py@4158
Revision 4158, 11.9 KB checked in by cbyrom, 13 years ago (diff)

Create new utility module, ClassFactory? - to allow generic instantiation
of classes dynamically.

Implement use of this in the AttAuth? and SessionMgr? services + adjust
the config files for these accordingly + abstract use of MyProxy? in
SessionMgr? to generic authNService - and create packages with real
and test authN services. Adjust the SessionMgr? tests to use the
test authN service.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20import traceback
21
22from ndg.security.common.X509 import X509CertParse
23from ndg.security.server.SessionMgr import *
24from ndg.security.server.MyProxy import MyProxyClient
25
26from os.path import expandvars as xpdVars
27from os.path import join as jnPath
28mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
29
30import logging
31logging.basicConfig(level=logging.DEBUG)
32
33
34class SessionMgrTestCase(unittest.TestCase):
35    """Unit test case for ndg.security.server.SessionMgr.SessionMgr class.
36   
37    This class manages server side sessions"""
38   
39    test1Passphrase = None
40    test3Passphrase = None
41   
42    def setUp(self):
43       
44        if 'NDGSEC_INT_DEBUG' in os.environ:
45            import pdb
46            pdb.set_trace()
47       
48        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
49            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
50                os.path.abspath(os.path.dirname(__file__))
51       
52        self.cfg = SafeConfigParser()
53        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
54                                "sessionMgrTest.cfg")
55        self.cfg.read(configFilePath)
56                   
57        # Initialise the Session Manager client connection
58        # Omit traceFile keyword to leave out SOAP debug info
59        propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
60        self.sm = SessionMgr(propFilePath=propFilePath)
61
62    def sessionMgrConnect(self):
63        print "Connecting to session manager..."
64        username = self.cfg.get('test1Connect', 'username')
65        if SessionMgrTestCase.test1Passphrase is None and \
66           self.cfg.has_option('test1Connect', 'passphrase'):
67            SessionMgrTestCase.test1Passphrase = \
68                                    self.cfg.get('test1Connect', 'passphrase')
69       
70        if not SessionMgrTestCase.test1Passphrase:
71            SessionMgrTestCase.test1Passphrase = getpass.getpass(\
72                prompt="\ntest1Connect pass-phrase for user %s: " % username)
73
74        print "Connecting to session manager as user: %s..." %username
75        userCert, self.userPriKey, self.issuingCert, self.sessID = \
76            self.sm.connect(username=username, 
77                            passphrase=SessionMgrTestCase.test1Passphrase)
78        self.userCert = X509CertParse(userCert)
79       
80        print "User '%s' connected to Session Manager:\n%s" % \
81                                                        (username, self.sessID)
82        creds='\n'.join((self.issuingCert or '',
83                         self.userCert.asPEM().strip(),
84                         self.userPriKey))
85        open(mkPath("user.creds"), "w").write(creds)
86        print "Finished setting up connection"
87       
88                                 
89    def test1Connect(self):
90        """test1Connect: make a new session"""
91       
92        username = self.cfg.get('test1Connect', 'username')
93        if SessionMgrTestCase.test1Passphrase is None and \
94           self.cfg.has_option('test1Connect', 'passphrase'):
95            SessionMgrTestCase.test1Passphrase = \
96                                    self.cfg.get('test1Connect', 'passphrase')
97       
98        if not SessionMgrTestCase.test1Passphrase:
99            SessionMgrTestCase.test1Passphrase = getpass.getpass(\
100                prompt="\ntest1Connect pass-phrase for user %s: " % username)
101
102        print "Connecting to session manager as user: %s..." %username
103        userCert, self.userPriKey, self.issuingCert, self.sessID = \
104            self.sm.connect(username=username, 
105                            passphrase=SessionMgrTestCase.test1Passphrase)
106        self.userCert = X509CertParse(userCert)
107       
108        print "User '%s' connected to Session Manager:\n%s" % \
109                                                        (username, self.sessID)
110        creds='\n'.join((self.issuingCert or '',
111                         self.userCert.asPEM().strip(),
112                         self.userPriKey))
113        open(mkPath("user.creds"), "w").write(creds)
114   
115           
116    def test2GetSessionStatus(self):
117        """test2GetSessionStatus: check a session is alive"""
118       
119        self.sessionMgrConnect()
120        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
121        print "User connected to Session Manager with sessID=%s" % self.sessID
122
123        assert not self.sm.getSessionStatus(sessID='abc'), \
124            "sessID=abc shouldn't exist!"
125           
126        print "CORRECT: sessID=abc doesn't exist"
127       
128    def test3ConnectNoCreateServerSess(self):
129        """test3ConnectNoCreateServerSess: Connect as a non browser client -
130        sessID should be None"""
131
132        username = self.cfg.get('test3ConnectNoCreateServerSess', 'username')
133
134        if SessionMgrTestCase.test3Passphrase is None and \
135           self.cfg.has_option('test3ConnectNoCreateServerSess', 
136                               'passphrase'):
137            SessionMgrTestCase.test3Passphrase = \
138                self.cfg.get('test3ConnectNoCreateServerSess', 'passphrase')
139       
140        if not SessionMgrTestCase.test3Passphrase:
141            SessionMgrTestCase.test3Passphrase = getpass.getpass(\
142        prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user %s: " % \
143            username)
144
145        self.userCert, self.userPriKey, self.issuingCert, sessID = \
146            self.sm.connect(username=username, 
147                            passphrase=SessionMgrTestCase.test3Passphrase,
148                            createServerSess=False)
149       
150        # Expect null session ID
151        assert not sessID, "Expecting a null session ID!"
152         
153        print "User '%s' retrieved creds. from Session Manager:\n%s" % \
154                                                    (username, self.userCert)
155           
156
157    def test4DisconnectWithSessID(self):
158        """test4DisconnectWithSessID: disconnect as if acting as a browser client
159        """
160       
161        self.sessionMgrConnect()       
162        self.sm.deleteUserSession(sessID=self.sessID)
163       
164        print "User disconnected from Session Manager:\n%s" % self.sessID
165           
166
167    def test5DisconnectWithUserCert(self):
168        """test5DisconnectWithUserCert: Disconnect as a command line client
169        """
170       
171        self.sessionMgrConnect()
172       
173        # Proxy cert in signature determines ID of session to
174        # delete
175        self.sm.deleteUserSession(userCert=self.userCert)
176        print "User disconnected from Session Manager:\n%s" % self.userCert
177
178
179    def test6GetAttCertWithSessID(self):
180        """test6GetAttCertWithSessID: make an attribute request using
181        a session ID as authentication credential"""
182
183        self.sessionMgrConnect()
184       
185        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
186            sessID=self.sessID, 
187            aaURI=self.cfg.get('test6GetAttCertWithSessID', 'aauri'))
188        if errMsg:
189            self.fail(errMsg)
190           
191        print "Attribute Certificate:\n%s" % attCert
192        attCert.filePath = \
193            xpdVars(self.cfg.get('test6GetAttCertWithSessID', 'acoutfilepath')) 
194        attCert.write()
195       
196        return self.sm
197
198
199    def test6aGetAttCertRefusedWithSessID(self):
200        """test6aGetAttCertRefusedWithSessID: make an attribute request using
201        a sessID as authentication credential requesting an AC from an
202        Attribute Authority where the user is NOT registered"""
203
204        self.sessionMgrConnect()
205       
206        aaURI = self.cfg.get('test6aGetAttCertRefusedWithSessID', 'aauri')
207       
208        attCert, errMsg, extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
209                                         aaURI=aaURI,
210                                         mapFromTrustedHosts=False)
211        if errMsg:
212            print "SUCCESS - obtained expected result: %s" % errMsg
213            return
214       
215        self.fail("Request allowed from AA where user is NOT registered!")
216
217
218    def test6bGetMappedAttCertWithSessID(self):
219        """test6bGetMappedAttCertWithSessID: make an attribute request using
220        a session ID as authentication credential"""
221
222        self.sessionMgrConnect()
223       
224        # Attribute Certificate cached in test 6 can be used to get a mapped
225        # AC for this test ...
226        self.sm = self.test6GetAttCertWithSessID()
227
228        aaURI = self.cfg.get('test6bGetMappedAttCertWithSessID', 'aauri')
229       
230        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID,
231                                                   aaURI=aaURI,
232                                                   mapFromTrustedHosts=True)
233        if errMsg:
234            self.fail(errMsg)
235           
236        print "Attribute Certificate:\n%s" % attCert 
237
238
239    def test6cGetAttCertWithExtAttCertListWithSessID(self):
240        """test6cGetAttCertWithSessID: make an attribute request using
241        a session ID as authentication credential"""
242       
243        self.sessionMgrConnect()
244       
245        aaURI = \
246            self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 'aauri')
247       
248        # Use output from test6GetAttCertWithSessID!
249        extACFilePath = \
250        xpdVars(self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 
251                             'extacfilepath'))   
252        extAttCert = open(extACFilePath).read()
253       
254        attCert, errMsg, extAttCertList = self.sm.getAttCert(
255                                                   sessID=self.sessID, 
256                                                   aaURI=aaURI,
257                                                   extAttCertList=[extAttCert])
258        if errMsg:
259            self.fail(errMsg)
260         
261        print "Attribute Certificate:\n%s" % attCert 
262
263
264    def test7GetAttCertWithUserCert(self):
265        """test7GetAttCertWithUserCert: make an attribute request using
266        a user cert as authentication credential"""
267        self.sessionMgrConnect()
268
269        # Request an attribute certificate from an Attribute Authority
270        # using the userCert returned from connect()
271       
272        aaURI = self.cfg.get('test7GetAttCertWithUserCert', 'aauri')
273        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
274                                     userCert=self.userCert, aaURI=aaURI)
275        if errMsg:
276            self.fail(errMsg)
277         
278        print "Attribute Certificate:\n%s" % attCert 
279
280
281#_____________________________________________________________________________       
282class SessionMgrTestSuite(unittest.TestSuite):
283   
284    def __init__(self):
285        print "SessionMgrTestSuite ..."
286        smTestCaseMap = map(SessionMgrTestCase,
287                          (
288                            "test1Connect",
289                            "test2GetSessionStatus",
290                            "test3ConnectNoCreateServerSess",
291                            "test4DisconnectWithSessID",
292                            "test5DisconnectWithUserCert",
293                            "test6GetAttCertWithSessID",
294                            "test6bGetMappedAttCertWithSessID",
295                            "test6cGetAttCertWithExtAttCertListWithSessID",
296                            "test7GetAttCertWithUserCert",
297                          ))
298        unittest.TestSuite.__init__(self, smTestCaseMap)
299           
300                                                   
301if __name__ == "__main__":
302#    suite = SessionMgrTestSuite()
303#    unittest.TextTestRunner(verbosity=2).run(suite)
304    unittest.main()       
Note: See TracBrowser for help on using the repository browser.