source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py @ 3133

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py@3133
Revision 3133, 10.1 KB checked in by pjkersha, 12 years ago (diff)

Major changes to enable trust based on multiple CAs and use of dynamically created user certs from MyProxy? SimpleCA - affects ...
python/ndg.security.server/ndg/security/server/AttAuthority/init.py,
python/ndg.security.test/ndg/security/test/AttAuthority/siteAAttAuthorityProperties.xml,
python/ndg.security.test/ndg/security/test/AttAuthority/siteBAttAuthorityProperties.xml,
python/ndg.security.server/ndg/security/server/conf/attAuthorityProperties.xml,
python/ndg.security.server/ndg/security/server/SessionMgr/init.py,
python/ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrProperties.xml,
python/ndg.security.server/ndg/security/server/conf/sessionMgrProperties.xml,
python/ndg.security.server/ndg/security/server/MyProxy.py,
python/ndg.security.test/ndg/security/test/sessionMgr/test.py,
python/ndg.security.common/ndg/security/common/CredWallet.py

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from ndg.security.common.X509 import X509CertParse
22from ndg.security.server.SessionMgr import *
23
24
25class SessionMgrTestCase(unittest.TestCase):
26    """Unit test case for ndg.security.server.SessionMgr.SessionMgr class.
27   
28    This class manages server side sessions"""
29   
30    test1Passphrase = None
31    test3Passphrase = None
32
33    def setUp(self):
34       
35        self.cfg = SafeConfigParser()
36        self.cfg.read("./sessionMgrTest.cfg")
37       
38        os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
39            os.path.expandvars(self.cfg.get('setUp', 
40                                            'NDGSEC_SM_UNITTEST_DIR'))
41           
42        # Initialise the Session Manager client connection
43        # Omit traceFile keyword to leave out SOAP debug info
44        self.sm = SessionMgr(propFilePath=self.cfg.get('setUp', 
45                                                       'propFilePath'))
46        if 'NDGSEC_INT_DEBUG' in os.environ:
47            import pdb
48            pdb.set_trace()
49                                 
50    def test1Connect(self):
51        """test1Connect: make a new session"""
52       
53        print "\n\t" + self.test1Connect.__doc__
54       
55        if SessionMgrTestCase.test1Passphrase is None and \
56           self.cfg.has_option('test1Connect', 'passphrase'):
57            SessionMgrTestCase.test1Passphrase = \
58                                    self.cfg.get('test1Connect', 'passphrase')
59       
60        if not SessionMgrTestCase.test1Passphrase:
61            SessionMgrTestCase.test1Passphrase = getpass.getpass(\
62                               prompt="\ntest1Connect pass-phrase for user: ")
63
64        proxyCert, self.proxyPriKey, self.userCert, self.sessID = \
65            self.sm.connect(username=self.cfg.get('test1Connect', 'username'), 
66                            passphrase=SessionMgrTestCase.test1Passphrase)
67        self.proxyCert = X509CertParse(proxyCert)
68       
69        print "User '%s' connected to Session Manager:\n%s" % \
70            (self.cfg.get('test1Connect', 'username'), self.sessID)
71           
72    def test2GetSessionStatus(self):
73        """test2GetSessionStatus: check a session is alive"""
74        print "\n\t" + self.test2GetSessionStatus.__doc__
75       
76        self.test1Connect()
77        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
78        print "User connected to Session Manager with sessID=%s" % self.sessID
79
80        assert not self.sm.getSessionStatus(sessID='abc'), \
81            "sessID=abc shouldn't exist!"
82           
83        print "CORRECT: sessID=abc doesn't exist"
84       
85    def test3ConnectNoCreateServerSess(self):
86        """test3ConnectNoCreateServerSess: Connect as a non browser client -
87        sessID should be None"""
88
89        print "\n\t" + self.test3ConnectNoCreateServerSess.__doc__
90       
91        if SessionMgrTestCase.test3Passphrase is None:
92            SessionMgrTestCase.test3Passphrase = \
93                self.cfg.get('test3ConnectNoCreateServerSess', 'passphrase')
94               
95        if not SessionMgrTestCase.test3Passphrase:
96            SessionMgrTestCase.test3Passphrase = getpass.getpass(\
97            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user: ")
98
99        username = self.cfg.get('test3ConnectNoCreateServerSess', 'username')
100        self.proxyCert, self.proxyPriKey, self.userCert, sessID = \
101            self.sm.connect(username=username, 
102                            passphrase=SessionMgrTestCase.test3Passphrase,
103                            createServerSess=False)
104       
105        # Expect null session ID
106        assert not sessID, "Expecting a null session ID!"
107         
108        print "User '%s' connected to Session Manager:\n%s" % \
109                (self.cfg.get('test3ConnectNoCreateServerSess', 'username'), 
110                 self.proxyCert)
111           
112
113    def test4DisconnectUsingSessID(self):
114        """test4DisconnectUsingSessID: disconnect as if acting as a browser client
115        """
116       
117        print "\n\t" + self.test4DisconnectUsingSessID.__doc__
118        self.test1Connect()       
119        self.sm.deleteUserSession(sessID=self.sessID)
120       
121        print "User disconnected from Session Manager:\n%s" % self.sessID
122           
123
124    def test5DisconnectUsingProxyCert(self):
125        """test5DisconnectUsingProxyCert: Disconnect as a command line client
126        """
127       
128        print "\n\t" + self.test5DisconnectUsingProxyCert.__doc__
129        self.test1Connect()
130       
131        # Proxy cert in signature determines ID of session to
132        # delete
133        self.sm.deleteUserSession(proxyCert=self.proxyCert)
134        print "User disconnected from Session Manager:\n%s" % self.proxyCert
135
136
137    def test6GetAttCertUsingSessID(self):
138        """test6GetAttCertUsingSessID: make an attribute request using
139        a session ID as authentication credential"""
140
141        print "\n\t" + self.test6GetAttCertUsingSessID.__doc__       
142        self.test1Connect()
143       
144        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
145            sessID=self.sessID, 
146            aaURI=self.cfg.get('test6GetAttCertUsingSessID', 'aauri'))
147        if errMsg:
148            self.fail(errMsg)
149           
150        print "Attribute Certificate:\n%s" % attCert
151        attCert.filePath = \
152            self.cfg.get('test6GetAttCertUsingSessID', 'acoutfilepath') 
153        attCert.write()
154       
155        return self.sm
156
157
158    def test6aGetAttCertRefusedUsingSessID(self):
159        """test6aGetAttCertRefusedUsingSessID: make an attribute request using
160        a sessID as authentication credential requesting an AC from an
161        Attribute Authority where the user is NOT registered"""
162
163        print "\n\t" + self.test6aGetAttCertRefusedUsingSessID.__doc__       
164        self.test1Connect()
165       
166        aaURI = self.cfg.get('test6aGetAttCertRefusedUsingSessID', 'aauri')
167       
168        attCert, errMsg, extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
169                                         aaURI=aaURI,
170                                         mapFromTrustedHosts=False)
171        if errMsg:
172            print "SUCCESS - obtained expected result: %s" % errMsg
173            return
174       
175        self.fail("Request allowed from AA where user is NOT registered!")
176
177
178    def test6bGetMappedAttCertUsingSessID(self):
179        """test6bGetMappedAttCertUsingSessID: make an attribute request using
180        a session ID as authentication credential"""
181
182        print "\n\t" + self.test6bGetMappedAttCertUsingSessID.__doc__       
183        self.test1Connect()
184       
185        # Attribute Certificate cached in test 6 can be used to get a mapped
186        # AC for this test ...
187        self.sm = self.test6GetAttCertUsingSessID()
188
189        aaURI = self.cfg.get('test6bGetMappedAttCertUsingSessID', 'aauri')
190       
191        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID,
192                                                   aaURI=aaURI,
193                                                   mapFromTrustedHosts=True)
194        if errMsg:
195            self.fail(errMsg)
196           
197        print "Attribute Certificate:\n%s" % attCert 
198
199
200    def test6cGetAttCertWithExtAttCertListUsingSessID(self):
201        """test6cGetAttCertUsingSessID: make an attribute request using
202        a session ID as authentication credential"""
203       
204        print "\n\t" + \
205            self.test6cGetAttCertWithExtAttCertListUsingSessID.__doc__       
206        self.test1Connect()
207       
208        aaURI = \
209            self.cfg.get('test6cGetAttCertWithExtAttCertListUsingSessID', 'aauri')
210       
211        # Use output from test6GetAttCertUsingSessID!
212        extACFilePath = \
213    self.cfg.get('test6cGetAttCertWithExtAttCertListUsingSessID', 'extacfilepath')   
214        extAttCert = open(extACFilePath).read()
215       
216        attCert, errMsg, extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
217                                       aaURI=aaURI,
218                                       extAttCertList=[extAttCert])
219        if errMsg:
220            self.fail(errMsg)
221         
222        print "Attribute Certificate:\n%s" % attCert 
223
224
225    def test7GetAttCertUsingProxyCert(self):
226        """test7GetAttCertUsingProxyCert: make an attribute request using
227        a proxy cert as authentication credential"""
228        print "\n\t" + self.test7GetAttCertUsingProxyCert.__doc__
229        self.test1Connect()
230
231        # Request an attribute certificate from an Attribute Authority
232        # using the proxyCert returned from connect()
233       
234        aaURI = self.cfg.get('test7GetAttCertUsingProxyCert', 'aauri')
235        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
236                                     userCert=self.proxyCert, aaURI=aaURI)
237        if errMsg:
238            self.fail(errMsg)
239         
240        print "Attribute Certificate:\n%s" % attCert 
241
242
243#_____________________________________________________________________________       
244class SessionMgrTestSuite(unittest.TestSuite):
245   
246    def __init__(self):
247        map = map(SessionMgrTestCase,
248                  (
249                    "test1Connect",
250                    "test2GetSessionStatus",
251                    "test3ConnectNoCreateServerSess",
252                    "test4DisconnectUsingSessID",
253                    "test5DisconnectUsingProxyCert",
254                    "test6GetAttCertUsingSessID",
255                    "test6bGetMappedAttCertUsingSessID",
256                    "test6cGetAttCertWithExtAttCertListUsingSessID",
257                    "test7GetAttCertUsingProxyCert",
258                  ))
259        unittest.TestSuite.__init__(self, map)
260           
261                                                   
262if __name__ == "__main__":
263    unittest.main()       
Note: See TracBrowser for help on using the repository browser.