source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py @ 3203

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py@4081
Revision 3203, 11.1 KB checked in by pjkersha, 12 years ago (diff)

security/python/ndg.security.test/ndg/security/test/sessionMgr/test.py,
security/python/ndg.security.test/ndg/security/test/sessionMgr/SessionMgrClientTest.py: fix settings of user.creds file for test1Connect

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20import traceback
21
22from ndg.security.common.X509 import X509CertParse
23from ndg.security.server.SessionMgr import *
24from ndg.security.server.MyProxy import MyProxyClient
25
26from os.path import expandvars as xpdVars
27from os.path import join as jnPath
28mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
29
30
31class SessionMgrTestCase(unittest.TestCase):
32    """Unit test case for ndg.security.server.SessionMgr.SessionMgr class.
33   
34    This class manages server side sessions"""
35   
36    test1Passphrase = None
37    test3Passphrase = None
38   
39    def setUp(self):
40       
41        if 'NDGSEC_INT_DEBUG' in os.environ:
42            import pdb
43            pdb.set_trace()
44       
45        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
46            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
47                os.path.abspath(os.path.dirname(__file__))
48       
49        self.cfg = SafeConfigParser()
50        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
51                                "sessionMgrTest.cfg")
52        self.cfg.read(configFilePath)
53                   
54        # Initialise the Session Manager client connection
55        # Omit traceFile keyword to leave out SOAP debug info
56        propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
57        self.sm = SessionMgr(propFilePath=propFilePath)
58       
59                                 
60    def test1Connect(self):
61        """test1Connect: make a new session"""
62       
63        print "\n\t" + self.test1Connect.__doc__
64       
65        username = self.cfg.get('test1Connect', 'username')
66       
67        if SessionMgrTestCase.test1Passphrase is None and \
68           self.cfg.has_option('test1Connect', 'passphrase'):
69            SessionMgrTestCase.test1Passphrase = \
70                                    self.cfg.get('test1Connect', 'passphrase')
71       
72        if not SessionMgrTestCase.test1Passphrase:
73            SessionMgrTestCase.test1Passphrase = getpass.getpass(\
74                prompt="\ntest1Connect pass-phrase for user %s: " % username)
75
76        userCert, self.userPriKey, self.issuingCert, self.sessID = \
77            self.sm.connect(username=username, 
78                            passphrase=SessionMgrTestCase.test1Passphrase)
79        self.userCert = X509CertParse(userCert)
80       
81        print "User '%s' connected to Session Manager:\n%s" % \
82                                                        (username, self.sessID)
83        creds='\n'.join((self.issuingCert or '',self.userCert,self.userPriKey))
84        open(mkPath("user.creds"), "w").write(creds)
85   
86           
87    def test2GetSessionStatus(self):
88        """test2GetSessionStatus: check a session is alive"""
89        print "\n\t" + self.test2GetSessionStatus.__doc__
90       
91        self.test1Connect()
92        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
93        print "User connected to Session Manager with sessID=%s" % self.sessID
94
95        assert not self.sm.getSessionStatus(sessID='abc'), \
96            "sessID=abc shouldn't exist!"
97           
98        print "CORRECT: sessID=abc doesn't exist"
99       
100    def test3ConnectNoCreateServerSess(self):
101        """test3ConnectNoCreateServerSess: Connect as a non browser client -
102        sessID should be None"""
103
104        print "\n\t" + self.test3ConnectNoCreateServerSess.__doc__
105       
106        username = self.cfg.get('test3ConnectNoCreateServerSess', 'username')
107
108        if SessionMgrTestCase.test3Passphrase is None and \
109           self.cfg.has_option('test3ConnectNoCreateServerSess', 
110                               'passphrase'):
111            SessionMgrTestCase.test3Passphrase = \
112                self.cfg.get('test3ConnectNoCreateServerSess', 'passphrase')
113       
114        if not SessionMgrTestCase.test3Passphrase:
115            SessionMgrTestCase.test3Passphrase = getpass.getpass(\
116        prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user %s: " % \
117            username)
118
119        self.userCert, self.userPriKey, self.issuingCert, sessID = \
120            self.sm.connect(username=username, 
121                            passphrase=SessionMgrTestCase.test3Passphrase,
122                            createServerSess=False)
123       
124        # Expect null session ID
125        assert not sessID, "Expecting a null session ID!"
126         
127        print "User '%s' retrieved creds. from Session Manager:\n%s" % \
128                                                    (username, self.userCert)
129           
130
131    def test4DisconnectWithSessID(self):
132        """test4DisconnectWithSessID: disconnect as if acting as a browser client
133        """
134       
135        print "\n\t" + self.test4DisconnectWithSessID.__doc__
136        self.test1Connect()       
137        self.sm.deleteUserSession(sessID=self.sessID)
138       
139        print "User disconnected from Session Manager:\n%s" % self.sessID
140           
141
142    def test5DisconnectWithUserCert(self):
143        """test5DisconnectWithUserCert: Disconnect as a command line client
144        """
145       
146        print "\n\t" + self.test5DisconnectWithUserCert.__doc__
147        self.test1Connect()
148       
149        # Proxy cert in signature determines ID of session to
150        # delete
151        self.sm.deleteUserSession(userCert=self.userCert)
152        print "User disconnected from Session Manager:\n%s" % self.userCert
153
154
155    def test6GetAttCertWithSessID(self):
156        """test6GetAttCertWithSessID: make an attribute request using
157        a session ID as authentication credential"""
158
159        print "\n\t" + self.test6GetAttCertWithSessID.__doc__       
160        self.test1Connect()
161       
162        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
163            sessID=self.sessID, 
164            aaURI=self.cfg.get('test6GetAttCertWithSessID', 'aauri'))
165        if errMsg:
166            self.fail(errMsg)
167           
168        print "Attribute Certificate:\n%s" % attCert
169        attCert.filePath = \
170            xpdVars(self.cfg.get('test6GetAttCertWithSessID', 'acoutfilepath')) 
171        attCert.write()
172       
173        return self.sm
174
175
176    def test6aGetAttCertRefusedWithSessID(self):
177        """test6aGetAttCertRefusedWithSessID: make an attribute request using
178        a sessID as authentication credential requesting an AC from an
179        Attribute Authority where the user is NOT registered"""
180
181        print "\n\t" + self.test6aGetAttCertRefusedWithSessID.__doc__       
182        self.test1Connect()
183       
184        aaURI = self.cfg.get('test6aGetAttCertRefusedWithSessID', 'aauri')
185       
186        attCert, errMsg, extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
187                                         aaURI=aaURI,
188                                         mapFromTrustedHosts=False)
189        if errMsg:
190            print "SUCCESS - obtained expected result: %s" % errMsg
191            return
192       
193        self.fail("Request allowed from AA where user is NOT registered!")
194
195
196    def test6bGetMappedAttCertWithSessID(self):
197        """test6bGetMappedAttCertWithSessID: make an attribute request using
198        a session ID as authentication credential"""
199
200        print "\n\t" + self.test6bGetMappedAttCertWithSessID.__doc__       
201        self.test1Connect()
202       
203        # Attribute Certificate cached in test 6 can be used to get a mapped
204        # AC for this test ...
205        self.sm = self.test6GetAttCertWithSessID()
206
207        aaURI = self.cfg.get('test6bGetMappedAttCertWithSessID', 'aauri')
208       
209        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID,
210                                                   aaURI=aaURI,
211                                                   mapFromTrustedHosts=True)
212        if errMsg:
213            self.fail(errMsg)
214           
215        print "Attribute Certificate:\n%s" % attCert 
216
217
218    def test6cGetAttCertWithExtAttCertListWithSessID(self):
219        """test6cGetAttCertWithSessID: make an attribute request using
220        a session ID as authentication credential"""
221       
222        print "\n\t" + \
223            self.test6cGetAttCertWithExtAttCertListWithSessID.__doc__       
224        self.test1Connect()
225       
226        aaURI = \
227            self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 'aauri')
228       
229        # Use output from test6GetAttCertWithSessID!
230        extACFilePath = \
231        xpdVars(self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 
232                             'extacfilepath'))   
233        extAttCert = open(extACFilePath).read()
234       
235        attCert, errMsg, extAttCertList = self.sm.getAttCert(
236                                                   sessID=self.sessID, 
237                                                   aaURI=aaURI,
238                                                   extAttCertList=[extAttCert])
239        if errMsg:
240            self.fail(errMsg)
241         
242        print "Attribute Certificate:\n%s" % attCert 
243
244
245    def test7GetAttCertWithUserCert(self):
246        """test7GetAttCertWithUserCert: make an attribute request using
247        a user cert as authentication credential"""
248        print "\n\t" + self.test7GetAttCertWithUserCert.__doc__
249        self.test1Connect()
250
251        # Request an attribute certificate from an Attribute Authority
252        # using the userCert returned from connect()
253       
254        aaURI = self.cfg.get('test7GetAttCertWithUserCert', 'aauri')
255        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
256                                     userCert=self.userCert, aaURI=aaURI)
257        if errMsg:
258            self.fail(errMsg)
259         
260        print "Attribute Certificate:\n%s" % attCert 
261
262
263#_____________________________________________________________________________       
264class SessionMgrTestSuite(unittest.TestSuite):
265   
266    def __init__(self):
267        print "SessionMgrTestSuite ..."
268        smTestCaseMap = map(SessionMgrTestCase,
269                          (
270                            "test1Connect",
271                            "test2GetSessionStatus",
272                            "test3ConnectNoCreateServerSess",
273                            "test4DisconnectWithSessID",
274                            "test5DisconnectWithUserCert",
275                            "test6GetAttCertWithSessID",
276                            "test6bGetMappedAttCertWithSessID",
277                            "test6cGetAttCertWithExtAttCertListWithSessID",
278                            "test7GetAttCertWithUserCert",
279                          ))
280        unittest.TestSuite.__init__(self, smTestCaseMap)
281           
282                                                   
283if __name__ == "__main__":
284#    suite = SessionMgrTestSuite()
285#    unittest.TextTestRunner(verbosity=2).run(suite)
286    unittest.main()       
Note: See TracBrowser for help on using the repository browser.