source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py @ 3044

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py@3044
Revision 3044, 10.0 KB checked in by pjkersha, 12 years ago (diff)

SessionMgr? SessionMgrClient? unit tests complete with tests for getSessionStatus included

ndg.security.server/ndg/security/server/conf/sessionMgr.tac:

  • code for getSessionStatus incorporated

ndg.security.server/ndg/security/server/SessionMgr/SessionMgr_services_server.py,
ndg.security.common/ndg/security/common/SessionMgr/SessionMgr_services.py,
ndg.security.common/ndg/security/common/SessionMgr/SessionMgr_services_types.py,
www/html/sessionMgr.wsdl: fixed getSessionStatusResponse - isAlive element needs to be nested within a sequence elem.

ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py,

ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrProperties.xml: default to https for tests

ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrClientTest.cfg: get rid of test1AddUser and added test2GetSessionStatus

ndg.security.test/ndg/security/test/sessionMgr/test.py: SessionMgr? unit tests all working

ndg.security.common/ndg/security/common/SessionMgr/init.py: added getSessionStatus method

ndg.security.common/ndg/security/common/AttAuthority/init.py: fix to getHostInfo - return dict indexed by hostname

ndg.security.common/ndg/security/common/AttAuthority/AttAuthority_services.py: re-ran code generation from WSDL

Makefile: added targets for building ZSI code stubs from AA and SM WSDLs.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from ndg.security.common.X509 import X509CertParse
22from ndg.security.server.SessionMgr import *
23
24
25class SessionMgrTestCase(unittest.TestCase):
26    """Unit test case for ndg.security.server.SessionMgr.SessionMgr class.
27   
28    This class manages server side sessions"""
29   
30    test2Passphrase = None
31    test3Passphrase = None
32
33    def setUp(self):
34       
35        self.cfg = SafeConfigParser()
36        self.cfg.read("./sessionMgrTest.cfg")
37       
38        os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
39            os.path.expandvars(self.cfg.get('setUp', 
40                                            'NDGSEC_SM_UNITTEST_DIR'))
41           
42        # Initialise the Session Manager client connection
43        # Omit traceFile keyword to leave out SOAP debug info
44        self.sm = SessionMgr(propFilePath=self.cfg.get('setUp', 
45                                                       'propFilePath'))
46        if 'NDGSEC_INT_DEBUG' in os.environ:
47            import pdb
48            pdb.set_trace()
49                                 
50    def test1Connect(self):
51        """test1Connect: make a new session"""
52       
53        print "\n\t" + self.test1Connect.__doc__
54       
55        if SessionMgrTestCase.test2Passphrase is None:
56            SessionMgrTestCase.test2Passphrase = \
57                                    self.cfg.get('test1Connect', 'passphrase')
58       
59        if not SessionMgrTestCase.test2Passphrase:
60            SessionMgrTestCase.test2Passphrase = getpass.getpass(\
61                               prompt="\ntest1Connect pass-phrase for user: ")
62
63        proxyCert, self.proxyPriKey, self.userCert, self.sessID = \
64            self.sm.connect(username=self.cfg.get('test1Connect', 'username'), 
65                            passphrase=SessionMgrTestCase.test2Passphrase)
66        self.proxyCert = X509CertParse(proxyCert)
67       
68        print "User '%s' connected to Session Manager:\n%s" % \
69            (self.cfg.get('test1Connect', 'username'), self.sessID)
70           
71    def test2GetSessionStatus(self):
72        """test2GetSessionStatus: check a session is alive"""
73        print "\n\t" + self.test2GetSessionStatus.__doc__
74       
75        self.test1Connect()
76        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
77        print "User connected to Session Manager with sessID=%s" % self.sessID
78
79        assert not self.sm.getSessionStatus(sessID='abc'), \
80            "sessID=abc shouldn't exist!"
81           
82        print "CORRECT: sessID=abc doesn't exist"
83       
84    def test3ConnectNoCreateServerSess(self):
85        """test3ConnectNoCreateServerSess: Connect as a non browser client -
86        sessID should be None"""
87
88        print "\n\t" + self.test3ConnectNoCreateServerSess.__doc__
89       
90        if SessionMgrTestCase.test3Passphrase is None:
91            SessionMgrTestCase.test3Passphrase = \
92                self.cfg.get('test3ConnectNoCreateServerSess', 'passphrase')
93               
94        if not SessionMgrTestCase.test3Passphrase:
95            SessionMgrTestCase.test3Passphrase = getpass.getpass(\
96            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user: ")
97
98        username = self.cfg.get('test3ConnectNoCreateServerSess', 'username')
99        self.proxyCert, self.proxyPriKey, self.userCert, sessID = \
100            self.sm.connect(username=username, 
101                            passphrase=SessionMgrTestCase.test3Passphrase,
102                            createServerSess=False)
103       
104        # Expect null session ID
105        assert not sessID, "Expecting a null session ID!"
106         
107        print "User '%s' connected to Session Manager:\n%s" % \
108                (self.cfg.get('test3ConnectNoCreateServerSess', 'username'), 
109                 self.proxyCert)
110           
111
112    def test4DisconnectUsingSessID(self):
113        """test4DisconnectUsingSessID: disconnect as if acting as a browser client
114        """
115       
116        print "\n\t" + self.test4DisconnectUsingSessID.__doc__
117        self.test1Connect()       
118        self.sm.deleteUserSession(sessID=self.sessID)
119       
120        print "User disconnected from Session Manager:\n%s" % self.sessID
121           
122
123    def test5DisconnectUsingProxyCert(self):
124        """test5DisconnectUsingProxyCert: Disconnect as a command line client
125        """
126       
127        print "\n\t" + self.test5DisconnectUsingProxyCert.__doc__
128        self.test1Connect()
129       
130        # Proxy cert in signature determines ID of session to
131        # delete
132        self.sm.deleteUserSession(proxyCert=self.proxyCert)
133        print "User disconnected from Session Manager:\n%s" % self.proxyCert
134
135
136    def test6GetAttCertUsingSessID(self):
137        """test6GetAttCertUsingSessID: make an attribute request using
138        a session ID as authentication credential"""
139
140        print "\n\t" + self.test6GetAttCertUsingSessID.__doc__       
141        self.test1Connect()
142       
143        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
144            sessID=self.sessID, 
145            aaURI=self.cfg.get('test6GetAttCertUsingSessID', 'aauri'))
146        if errMsg:
147            self.fail(errMsg)
148           
149        print "Attribute Certificate:\n%s" % attCert
150        attCert.filePath = \
151            self.cfg.get('test6GetAttCertUsingSessID', 'acoutfilepath') 
152        attCert.write()
153       
154        return self.sm
155
156
157    def test6aGetAttCertRefusedUsingSessID(self):
158        """test6aGetAttCertRefusedUsingSessID: make an attribute request using
159        a sessID as authentication credential requesting an AC from an
160        Attribute Authority where the user is NOT registered"""
161
162        print "\n\t" + self.test6aGetAttCertRefusedUsingSessID.__doc__       
163        self.test1Connect()
164       
165        aaURI = self.cfg.get('test6aGetAttCertRefusedUsingSessID', 'aauri')
166       
167        attCert, errMsg, extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
168                                         aaURI=aaURI,
169                                         mapFromTrustedHosts=False)
170        if errMsg:
171            print "SUCCESS - obtained expected result: %s" % errMsg
172            return
173       
174        self.fail("Request allowed from AA where user is NOT registered!")
175
176
177    def test6bGetMappedAttCertUsingSessID(self):
178        """test6bGetMappedAttCertUsingSessID: make an attribute request using
179        a session ID as authentication credential"""
180
181        print "\n\t" + self.test6bGetMappedAttCertUsingSessID.__doc__       
182        self.test1Connect()
183       
184        # Attribute Certificate cached in test 6 can be used to get a mapped
185        # AC for this test ...
186        self.sm = self.test6GetAttCertUsingSessID()
187
188        aaURI = self.cfg.get('test6bGetMappedAttCertUsingSessID', 'aauri')
189       
190        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID,
191                                                   aaURI=aaURI,
192                                                   mapFromTrustedHosts=True)
193        if errMsg:
194            self.fail(errMsg)
195           
196        print "Attribute Certificate:\n%s" % attCert 
197
198
199    def test6cGetAttCertWithExtAttCertListUsingSessID(self):
200        """test6cGetAttCertUsingSessID: make an attribute request using
201        a session ID as authentication credential"""
202       
203        print "\n\t" + \
204            self.test6cGetAttCertWithExtAttCertListUsingSessID.__doc__       
205        self.test1Connect()
206       
207        aaURI = \
208            self.cfg.get('test6cGetAttCertWithExtAttCertListUsingSessID', 'aauri')
209       
210        # Use output from test6GetAttCertUsingSessID!
211        extACFilePath = \
212    self.cfg.get('test6cGetAttCertWithExtAttCertListUsingSessID', 'extacfilepath')   
213        extAttCert = open(extACFilePath).read()
214       
215        attCert, errMsg, extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
216                                       aaURI=aaURI,
217                                       extAttCertList=[extAttCert])
218        if errMsg:
219            self.fail(errMsg)
220         
221        print "Attribute Certificate:\n%s" % attCert 
222
223
224    def test7GetAttCertUsingProxyCert(self):
225        """test7GetAttCertUsingProxyCert: make an attribute request using
226        a proxy cert as authentication credential"""
227        print "\n\t" + self.test7GetAttCertUsingProxyCert.__doc__
228        self.test1Connect()
229
230        # Request an attribute certificate from an Attribute Authority
231        # using the proxyCert returned from connect()
232       
233        aaURI = self.cfg.get('test7GetAttCertUsingProxyCert', 'aauri')
234        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
235                                     userCert=self.proxyCert, aaURI=aaURI)
236        if errMsg:
237            self.fail(errMsg)
238         
239        print "Attribute Certificate:\n%s" % attCert 
240
241
242#_____________________________________________________________________________       
243class SessionMgrTestSuite(unittest.TestSuite):
244   
245    def __init__(self):
246        map = map(SessionMgrTestCase,
247                  (
248                    "test1Connect",
249                    "test2GetSessionStatus",
250                    "test3ConnectNoCreateServerSess",
251                    "test4DisconnectUsingSessID",
252                    "test5DisconnectUsingProxyCert",
253                    "test6GetAttCertUsingSessID",
254                    "test6bGetMappedAttCertUsingSessID",
255                    "test6cGetAttCertWithExtAttCertListUsingSessID",
256                    "test7GetAttCertUsingProxyCert",
257                  ))
258        unittest.TestSuite.__init__(self, map)
259           
260                                                   
261if __name__ == "__main__":
262    unittest.main()       
Note: See TracBrowser for help on using the repository browser.