source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py @ 3192

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py@3192
Revision 3192, 11.0 KB checked in by pjkersha, 12 years ago (diff)

Working version of Session Manager unit tests with certs included in SVN issued from test CA.

security/python/ndg.security.test/ndg/security/test/attAuthority/README: added note about test config file.

security/python/ndg.security.test/ndg/security/test/attAuthority/siteAAttAuthorityProperties.xml,
security/python/ndg.security.test/ndg/security/test/attAuthority/siteBAttAuthorityProperties.xml:
added note about MyProxy? CA certificate inclusion for WSSE signature handler to trust requests authenticated by certs from MyProxy? CA. This is needed for running these services with Session Manager and Session Manager Cleint unit tests

security/python/ndg.security.test/ndg/security/test/ca/README: added note that this unit test is currently defunct.

security/python/ndg.security.test/ndg/security/test/gatekeeper/README,
security/python/ndg.security.test/ndg/security/test/Log: added

security/python/ndg.security.test/ndg/security/test/myProxy/README: added note about ensuring test creds are removed from the repository.

security/python/ndg.security.test/ndg/security/test/sessionCookie/README: note that this test is defunct. Pylons code can perform the same function

security/python/ndg.security.test/ndg/security/test/sessionCookie/SessionCookieTest.py,
security/python/ndg.security.test/ndg/security/test/sessionCookie/sessionCookieTest.cfg:
altered so that files are ref'd by env vars so that tests can be run from any dir

security/python/ndg.security.test/ndg/security/test/README: added

security/python/ndg.security.test/ndg/security/test/sessionMgr/sessionMgrProperties.xml,
security/python/ndg.security.test/ndg/security/test/sessionMgr/sessionMgrTest.cfg,
security/python/ndg.security.test/ndg/security/test/sessionMgr/test.py:

  • altered so that files are ref'd by env vars so that tests can be run from any dir
  • test1Connect PKI output is dumped to user.creds file.
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20import traceback
21
22from ndg.security.common.X509 import X509CertParse
23from ndg.security.server.SessionMgr import *
24from ndg.security.server.MyProxy import MyProxyClient
25
26from os.path import expandvars as xpdVars
27from os.path import join as jnPath
28mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
29
30
31class SessionMgrTestCase(unittest.TestCase):
32    """Unit test case for ndg.security.server.SessionMgr.SessionMgr class.
33   
34    This class manages server side sessions"""
35   
36    test1Passphrase = None
37    test3Passphrase = None
38   
39    def setUp(self):
40       
41        if 'NDGSEC_INT_DEBUG' in os.environ:
42            import pdb
43            pdb.set_trace()
44       
45        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
46            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
47                os.path.abspath(os.path.dirname(__file__))
48       
49        self.cfg = SafeConfigParser()
50        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
51                                "sessionMgrTest.cfg")
52        self.cfg.read(configFilePath)
53                   
54        # Initialise the Session Manager client connection
55        # Omit traceFile keyword to leave out SOAP debug info
56        propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
57        self.sm = SessionMgr(propFilePath=propFilePath)
58       
59                                 
60    def test1Connect(self):
61        """test1Connect: make a new session"""
62       
63        print "\n\t" + self.test1Connect.__doc__
64       
65        username = self.cfg.get('test1Connect', 'username')
66       
67        if SessionMgrTestCase.test1Passphrase is None and \
68           self.cfg.has_option('test1Connect', 'passphrase'):
69            SessionMgrTestCase.test1Passphrase = \
70                                    self.cfg.get('test1Connect', 'passphrase')
71       
72        if not SessionMgrTestCase.test1Passphrase:
73            SessionMgrTestCase.test1Passphrase = getpass.getpass(\
74                prompt="\ntest1Connect pass-phrase for user %s: " % username)
75
76        userCert, self.userPriKey, self.issuingCert, self.sessID = \
77            self.sm.connect(username=username, 
78                            passphrase=SessionMgrTestCase.test1Passphrase)
79        self.userCert = X509CertParse(userCert)
80       
81        print "User '%s' connected to Session Manager:\n%s" % \
82                                                        (username, self.sessID)
83        creds = self.issuingCert or '' + userCert + self.userPriKey
84        open(mkPath("user.creds"), "w").write(creds)
85   
86           
87    def test2GetSessionStatus(self):
88        """test2GetSessionStatus: check a session is alive"""
89        print "\n\t" + self.test2GetSessionStatus.__doc__
90       
91        self.test1Connect()
92        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
93        print "User connected to Session Manager with sessID=%s" % self.sessID
94
95        assert not self.sm.getSessionStatus(sessID='abc'), \
96            "sessID=abc shouldn't exist!"
97           
98        print "CORRECT: sessID=abc doesn't exist"
99       
100    def test3ConnectNoCreateServerSess(self):
101        """test3ConnectNoCreateServerSess: Connect as a non browser client -
102        sessID should be None"""
103
104        print "\n\t" + self.test3ConnectNoCreateServerSess.__doc__
105       
106        username = self.cfg.get('test3ConnectNoCreateServerSess', 'username')
107
108        if SessionMgrTestCase.test3Passphrase is None and \
109           self.cfg.has_option('test3ConnectNoCreateServerSess', 
110                               'passphrase'):
111            SessionMgrTestCase.test3Passphrase = \
112                self.cfg.get('test3ConnectNoCreateServerSess', 'passphrase')
113       
114        if not SessionMgrTestCase.test3Passphrase:
115            SessionMgrTestCase.test3Passphrase = getpass.getpass(\
116        prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user %s: " % \
117            username)
118
119        self.userCert, self.userPriKey, self.issuingCert, sessID = \
120            self.sm.connect(username=username, 
121                            passphrase=SessionMgrTestCase.test3Passphrase,
122                            createServerSess=False)
123       
124        # Expect null session ID
125        assert not sessID, "Expecting a null session ID!"
126         
127        print "User '%s' retrieved creds. from Session Manager:\n%s" % \
128                                                    (username, self.userCert)
129           
130
131    def test4DisconnectWithSessID(self):
132        """test4DisconnectWithSessID: disconnect as if acting as a browser client
133        """
134       
135        print "\n\t" + self.test4DisconnectWithSessID.__doc__
136        self.test1Connect()       
137        self.sm.deleteUserSession(sessID=self.sessID)
138       
139        print "User disconnected from Session Manager:\n%s" % self.sessID
140           
141
142    def test5DisconnectWithUserCert(self):
143        """test5DisconnectWithUserCert: Disconnect as a command line client
144        """
145       
146        print "\n\t" + self.test5DisconnectWithUserCert.__doc__
147        self.test1Connect()
148       
149        # Proxy cert in signature determines ID of session to
150        # delete
151        self.sm.deleteUserSession(userCert=self.userCert)
152        print "User disconnected from Session Manager:\n%s" % self.userCert
153
154
155    def test6GetAttCertWithSessID(self):
156        """test6GetAttCertWithSessID: make an attribute request using
157        a session ID as authentication credential"""
158
159        print "\n\t" + self.test6GetAttCertWithSessID.__doc__       
160        self.test1Connect()
161       
162        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
163            sessID=self.sessID, 
164            aaURI=self.cfg.get('test6GetAttCertWithSessID', 'aauri'))
165        if errMsg:
166            self.fail(errMsg)
167           
168        print "Attribute Certificate:\n%s" % attCert
169        attCert.filePath = \
170            xpdVars(self.cfg.get('test6GetAttCertWithSessID', 'acoutfilepath')) 
171        attCert.write()
172       
173        return self.sm
174
175
176    def test6aGetAttCertRefusedWithSessID(self):
177        """test6aGetAttCertRefusedWithSessID: make an attribute request using
178        a sessID as authentication credential requesting an AC from an
179        Attribute Authority where the user is NOT registered"""
180
181        print "\n\t" + self.test6aGetAttCertRefusedWithSessID.__doc__       
182        self.test1Connect()
183       
184        aaURI = self.cfg.get('test6aGetAttCertRefusedWithSessID', 'aauri')
185       
186        attCert, errMsg, extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
187                                         aaURI=aaURI,
188                                         mapFromTrustedHosts=False)
189        if errMsg:
190            print "SUCCESS - obtained expected result: %s" % errMsg
191            return
192       
193        self.fail("Request allowed from AA where user is NOT registered!")
194
195
196    def test6bGetMappedAttCertWithSessID(self):
197        """test6bGetMappedAttCertWithSessID: make an attribute request using
198        a session ID as authentication credential"""
199
200        print "\n\t" + self.test6bGetMappedAttCertWithSessID.__doc__       
201        self.test1Connect()
202       
203        # Attribute Certificate cached in test 6 can be used to get a mapped
204        # AC for this test ...
205        self.sm = self.test6GetAttCertWithSessID()
206
207        aaURI = self.cfg.get('test6bGetMappedAttCertWithSessID', 'aauri')
208       
209        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID,
210                                                   aaURI=aaURI,
211                                                   mapFromTrustedHosts=True)
212        if errMsg:
213            self.fail(errMsg)
214           
215        print "Attribute Certificate:\n%s" % attCert 
216
217
218    def test6cGetAttCertWithExtAttCertListWithSessID(self):
219        """test6cGetAttCertWithSessID: make an attribute request using
220        a session ID as authentication credential"""
221       
222        print "\n\t" + \
223            self.test6cGetAttCertWithExtAttCertListWithSessID.__doc__       
224        self.test1Connect()
225       
226        aaURI = \
227            self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 'aauri')
228       
229        # Use output from test6GetAttCertWithSessID!
230        extACFilePath = \
231        xpdVars(self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 
232                             'extacfilepath'))   
233        extAttCert = open(extACFilePath).read()
234       
235        attCert, errMsg, extAttCertList = self.sm.getAttCert(
236                                                   sessID=self.sessID, 
237                                                   aaURI=aaURI,
238                                                   extAttCertList=[extAttCert])
239        if errMsg:
240            self.fail(errMsg)
241         
242        print "Attribute Certificate:\n%s" % attCert 
243
244
245    def test7GetAttCertWithUserCert(self):
246        """test7GetAttCertWithUserCert: make an attribute request using
247        a user cert as authentication credential"""
248        print "\n\t" + self.test7GetAttCertWithUserCert.__doc__
249        self.test1Connect()
250
251        # Request an attribute certificate from an Attribute Authority
252        # using the userCert returned from connect()
253       
254        aaURI = self.cfg.get('test7GetAttCertWithUserCert', 'aauri')
255        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
256                                     userCert=self.userCert, aaURI=aaURI)
257        if errMsg:
258            self.fail(errMsg)
259         
260        print "Attribute Certificate:\n%s" % attCert 
261
262
263#_____________________________________________________________________________       
264class SessionMgrTestSuite(unittest.TestSuite):
265   
266    def __init__(self):
267        print "SessionMgrTestSuite ..."
268        smTestCaseMap = map(SessionMgrTestCase,
269                          (
270                            "test1Connect",
271                            "test2GetSessionStatus",
272                            "test3ConnectNoCreateServerSess",
273                            "test4DisconnectWithSessID",
274                            "test5DisconnectWithUserCert",
275                            "test6GetAttCertWithSessID",
276                            "test6bGetMappedAttCertWithSessID",
277                            "test6cGetAttCertWithExtAttCertListWithSessID",
278                            "test7GetAttCertWithUserCert",
279                          ))
280        unittest.TestSuite.__init__(self, smTestCaseMap)
281           
282                                                   
283if __name__ == "__main__":
284#    suite = SessionMgrTestSuite()
285#    unittest.TextTestRunner(verbosity=2).run(suite)
286    unittest.main()       
Note: See TracBrowser for help on using the repository browser.