source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py @ 3041

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py@3041
Revision 3041, 9.6 KB checked in by pjkersha, 12 years ago (diff)

python/ndg.security.server/ndg/security/server/conf/sessionMgr.tac: removed soap_addUser - no longer needed

python/ndg.security.server/ndg/security/server/SessionMgr/init.py: fix to SessionMgr?.readProperties error msg

python/ndg.security.test/ndg/security/test/sessionMgr/test.py: ongoing implementation - getSessionStatus test now works.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "23/02/06"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id: SessionMgrTest.py 2909 2007-09-28 14:22:21Z pjkersha $'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from ndg.security.common.X509 import X509CertParse
22from ndg.security.server.SessionMgr import *
23   
24
25
26class SessionMgrTestCase(unittest.TestCase):
27    """Unit test case for ndg.security.server.SessionMgr.SessionMgr class.
28   
29    This class manages server side sessions"""
30   
31    test2Passphrase = None
32    test3Passphrase = None
33
34    def setUp(self):
35       
36        self.cfg = SafeConfigParser()
37        self.cfg.read("./sessionMgrTest.cfg")
38       
39        os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
40            os.path.expandvars(self.cfg.get('setUp', 
41                                            'NDGSEC_SM_UNITTEST_DIR'))
42           
43        # Initialise the Session Manager client connection
44        # Omit traceFile keyword to leave out SOAP debug info
45        self.sm = SessionMgr(propFilePath=self.cfg.get('setUp', 
46                                                       'propFilePath'))
47        if 'NDGSEC_INT_DEBUG' in os.environ:
48            import pdb
49            pdb.set_trace()
50                                 
51    def test1Connect(self):
52        """test1Connect: make a new session"""
53       
54        if SessionMgrTestCase.test2Passphrase is None:
55            SessionMgrTestCase.test2Passphrase = \
56                                    self.cfg.get('test1Connect', 'passphrase')
57       
58        if not SessionMgrTestCase.test2Passphrase:
59            SessionMgrTestCase.test2Passphrase = getpass.getpass(\
60                               prompt="\ntest1Connect pass-phrase for user: ")
61
62        proxyCert, self.proxyPriKey, self.userCert, self.sessID = \
63            self.sm.connect(username=self.cfg.get('test1Connect', 'username'), 
64                            passphrase=SessionMgrTestCase.test2Passphrase)
65        self.proxyCert = X509CertParse(proxyCert)
66       
67        print "User '%s' connected to Session Manager:\n%s" % \
68            (self.cfg.get('test1Connect', 'username'), self.sessID)
69           
70    def test2GetSessionStatus(self):
71        """test2GetSessionStatus: check a session is alive"""
72        self.test1Connect()
73        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
74        print "User connected to Session Manager with sessID=%s" % self.sessID
75
76        assert not self.sm.getSessionStatus(sessID='abc'), \
77            "sessID=abc shouldn't exist!"
78           
79        print "CORRECT: sessID=abc doesn't exist"
80       
81    def test3ConnectNoCreateServerSess(self):
82        """test3ConnectNoCreateServerSess: Connect as a non browser client -
83        sessID should be None"""
84
85        if SessionMgrTestCase.test3Passphrase is None:
86            SessionMgrTestCase.test3Passphrase = \
87                self.cfg.get('test3ConnectNoCreateServerSess', 'passphrase')
88               
89        if not SessionMgrTestCase.test3Passphrase:
90            SessionMgrTestCase.test3Passphrase = getpass.getpass(\
91            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user: ")
92
93        username = self.cfg.get('test3ConnectNoCreateServerSess', 'username')
94        self.proxyCert, self.proxyPriKey, self.userCert, sessID = \
95            self.sm.connect(username=username, 
96                            passphrase=SessionMgrTestCase.test3Passphrase,
97                            createServerSess=False)
98       
99        # Expect null session ID
100        assert not sessID, "Expecting a null session ID!"
101         
102        print "User '%s' connected to Session Manager:\n%s" % \
103                (self.cfg.get('test3ConnectNoCreateServerSess', 'username'), 
104                 self.proxyCert)
105           
106
107    def test4DisconnectUsingSessID(self):
108        """test4DisconnectUsingSessID: disconnect as if acting as a browser client
109        """
110       
111        print "\n\t" + self.test4DisconnectUsingSessID.__doc__
112        self.test1Connect()       
113        self.sm.deleteUserSession(sessID=self.sessID)
114       
115        print "User disconnected from Session Manager:\n%s" % self.sessID
116           
117
118    def test5DisconnectUsingProxyCert(self):
119        """test5DisconnectUsingProxyCert: Disconnect as a command line client
120        """
121       
122        print "\n\t" + self.test5DisconnectUsingProxyCert.__doc__
123        self.test1Connect()
124       
125        # Proxy cert in signature determines ID of session to
126        # delete
127        self.sm.deleteUserSession(proxyCert=self.proxyCert)
128        print "User disconnected from Session Manager:\n%s" % self.proxyCert
129
130
131    def test6GetAttCertUsingSessID(self):
132        """test6GetAttCertUsingSessID: make an attribute request using
133        a session ID as authentication credential"""
134
135        print "\n\t" + self.test6GetAttCertUsingSessID.__doc__       
136        self.test1Connect()
137       
138        attCert = self.sm.getAttCert(\
139            sessID=self.sessID, 
140            aaURI=self.cfg.get('test6GetAttCertUsingSessID', 'aauri'))
141       
142        print "Attribute Certificate:\n%s" % attCert
143        attCert.filePath = \
144            self.cfg.get('test6GetAttCertUsingSessID', 'acoutfilepath') 
145        attCert.write()
146
147
148    def test6aGetAttCertRefusedUsingSessID(self):
149        """test6aGetAttCertRefusedUsingSessID: make an attribute request using
150        a sessID as authentication credential requesting an AC from an
151        Attribute Authority where the user is NOT registered"""
152
153        print "\n\t" + self.test6aGetAttCertRefusedUsingSessID.__doc__       
154        self.test1Connect()
155       
156        aaURI = self.cfg.get('test6aGetAttCertRefusedUsingSessID', 'aauri')
157       
158        attCert, errMsg, e.extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
159                                         aaURI=aaURI,
160                                         mapFromTrustedHosts=False)
161        if errMsg:
162            print "SUCCESS - obtained expected result: %s" % errMsg
163            return
164       
165        self.fail("Request allowed from AA where user is NOT registered!")
166
167
168    def test6bGetMappedAttCertUsingSessID(self):
169        """test6bGetMappedAttCertUsingSessID: make an attribute request using
170        a session ID as authentication credential"""
171
172        print "\n\t" + self.test6bGetMappedAttCertUsingSessID.__doc__       
173        self.test1Connect()
174       
175        aaURI = self.cfg.get('test6bGetMappedAttCertUsingSessID', 'aauri')
176       
177        attCert=self.sm.getAttCert(sessID=self.sessID,attAuthorityURI=aaURI)
178       
179        print "Attribute Certificate:\n%s" % attCert 
180
181
182    def test6cGetAttCertWithExtAttCertListUsingSessID(self):
183        """test6cGetAttCertUsingSessID: make an attribute request using
184        a session ID as authentication credential"""
185       
186        print "\n\t" + \
187            self.test6cGetAttCertWithExtAttCertListUsingSessID.__doc__       
188        self.test1Connect()
189       
190        aaURI = \
191            self.cfg.get('test6cGetAttCertWithExtAttCertListUsingSessID', 'aauri')
192       
193        # Use output from test6GetAttCertUsingSessID!
194        extACFilePath = \
195    self.cfg.get('test6cGetAttCertWithExtAttCertListUsingSessID', 'extacfilepath')   
196        extAttCert = open(extACFilePath).read()
197       
198        attCert = self.sm.getAttCert(sessID=self.sessID, 
199                                       attAuthorityURI=aaURI,
200                                       extAttCertList=[extAttCert])
201         
202        print "Attribute Certificate:\n%s" % attCert 
203
204
205    def test7GetAttCertUsingProxyCert(self):
206        """test7GetAttCertUsingProxyCert: make an attribute request using
207        a proxy cert as authentication credential"""
208        print "\n\t" + self.test7GetAttCertUsingProxyCert.__doc__
209        self.test1Connect()
210
211        self.sm.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
212        self.sm.signatureHandler.signingPriKey = self.proxyPriKey       
213        self.sm.signatureHandler.signingCertChain = (self.userCert,
214                                                       self.proxyCert)
215       
216        # Request an attribute certificate from an Attribute Authority
217        # using the proxyCert returned from connect()
218       
219        aaURI = self.cfg.get('test7GetAttCertUsingProxyCert', 'aauri')
220        attCert = self.sm.getAttCert(attAuthorityURI=aaURI)
221         
222        print "Attribute Certificate:\n%s" % attCert 
223
224
225#_____________________________________________________________________________       
226class SessionMgrTestSuite(unittest.TestSuite):
227   
228    def __init__(self):
229        map = map(SessionMgrTestCase,
230                  (
231                    "test1Connect",
232                    "test2GetSessionStatus",
233                    "test3ConnectNoCreateServerSess",
234                    "test4DisconnectUsingSessID",
235                    "test5DisconnectUsingProxyCert",
236                    "test6GetAttCertUsingSessID",
237                    "test6bGetMappedAttCertUsingSessID",
238                    "test6cGetAttCertWithExtAttCertListUsingSessID",
239                    "test7GetAttCertUsingProxyCert",
240                  ))
241        unittest.TestSuite.__init__(self, map)
242           
243                                                   
244if __name__ == "__main__":
245    unittest.main()       
Note: See TracBrowser for help on using the repository browser.