source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py @ 3139

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py@3139
Revision 3139, 10.2 KB checked in by pjkersha, 12 years ago (diff)

Working SessionMgr? unit tests with multiple CA support for WS-Security dsig verification and AC verification.

python/ndg.security.test/ndg/security/test/sessionMgr/init.py,
python/ndg.security.test/ndg/security/test/sessionMgr/openssl.conf,
python/ndg.security.test/ndg/security/test/sessionMgr/sessionMgrTest.cfg,
python/ndg.security.test/ndg/security/test/sessionMgr/sessionMgrProperties.xml: files added for SM unit test

python/ndg.security.test/ndg/security/test/sessionMgr/test.py: renamed refs to proxy certs -> user certs.

python/ndg.security.common/ndg/security/common/CredWallet.py: fix to AttAuthorityClient? instantiation for sslCACertFilePathList setting

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from ndg.security.common.X509 import X509CertParse
22from ndg.security.server.SessionMgr import *
23
24
25class SessionMgrTestCase(unittest.TestCase):
26    """Unit test case for ndg.security.server.SessionMgr.SessionMgr class.
27   
28    This class manages server side sessions"""
29   
30    test1Passphrase = None
31    test3Passphrase = None
32
33    def setUp(self):
34       
35        self.cfg = SafeConfigParser()
36        self.cfg.read("./sessionMgrTest.cfg")
37       
38        os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
39            os.path.expandvars(self.cfg.get('setUp', 
40                                            'NDGSEC_SM_UNITTEST_DIR'))
41           
42        # Initialise the Session Manager client connection
43        # Omit traceFile keyword to leave out SOAP debug info
44        self.sm = SessionMgr(propFilePath=self.cfg.get('setUp', 
45                                                       'propFilePath'))
46        if 'NDGSEC_INT_DEBUG' in os.environ:
47            import pdb
48            pdb.set_trace()
49                                 
50    def test1Connect(self):
51        """test1Connect: make a new session"""
52       
53        print "\n\t" + self.test1Connect.__doc__
54       
55        if SessionMgrTestCase.test1Passphrase is None and \
56           self.cfg.has_option('test1Connect', 'passphrase'):
57            SessionMgrTestCase.test1Passphrase = \
58                                    self.cfg.get('test1Connect', 'passphrase')
59       
60        if not SessionMgrTestCase.test1Passphrase:
61            SessionMgrTestCase.test1Passphrase = getpass.getpass(\
62                               prompt="\ntest1Connect pass-phrase for user: ")
63
64        userCert, self.userPriKey, self.issuingCert, self.sessID = \
65            self.sm.connect(username=self.cfg.get('test1Connect', 'username'), 
66                            passphrase=SessionMgrTestCase.test1Passphrase)
67        self.userCert = X509CertParse(userCert)
68       
69        print "User '%s' connected to Session Manager:\n%s" % \
70            (self.cfg.get('test1Connect', 'username'), self.sessID)
71           
72    def test2GetSessionStatus(self):
73        """test2GetSessionStatus: check a session is alive"""
74        print "\n\t" + self.test2GetSessionStatus.__doc__
75       
76        self.test1Connect()
77        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
78        print "User connected to Session Manager with sessID=%s" % self.sessID
79
80        assert not self.sm.getSessionStatus(sessID='abc'), \
81            "sessID=abc shouldn't exist!"
82           
83        print "CORRECT: sessID=abc doesn't exist"
84       
85    def test3ConnectNoCreateServerSess(self):
86        """test3ConnectNoCreateServerSess: Connect as a non browser client -
87        sessID should be None"""
88
89        print "\n\t" + self.test3ConnectNoCreateServerSess.__doc__
90       
91        if SessionMgrTestCase.test3Passphrase is None and \
92           self.cfg.has_option('test3ConnectNoCreateServerSess', 
93                               'passphrase'):
94            SessionMgrTestCase.test3Passphrase = \
95                self.cfg.get('test3ConnectNoCreateServerSess', 'passphrase')
96       
97        if not SessionMgrTestCase.test3Passphrase:
98            SessionMgrTestCase.test3Passphrase = getpass.getpass(\
99            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user: ")
100
101        username = self.cfg.get('test3ConnectNoCreateServerSess', 'username')
102        self.userCert, self.userPriKey, self.issuingCert, sessID = \
103            self.sm.connect(username=username, 
104                            passphrase=SessionMgrTestCase.test3Passphrase,
105                            createServerSess=False)
106       
107        # Expect null session ID
108        assert not sessID, "Expecting a null session ID!"
109         
110        print "User '%s' connected to Session Manager:\n%s" % \
111                (self.cfg.get('test3ConnectNoCreateServerSess', 'username'), 
112                 self.userCert)
113           
114
115    def test4DisconnectWithSessID(self):
116        """test4DisconnectWithSessID: disconnect as if acting as a browser client
117        """
118       
119        print "\n\t" + self.test4DisconnectWithSessID.__doc__
120        self.test1Connect()       
121        self.sm.deleteUserSession(sessID=self.sessID)
122       
123        print "User disconnected from Session Manager:\n%s" % self.sessID
124           
125
126    def test5DisconnectWithUserCert(self):
127        """test5DisconnectWithUserCert: Disconnect as a command line client
128        """
129       
130        print "\n\t" + self.test5DisconnectWithUserCert.__doc__
131        self.test1Connect()
132       
133        # Proxy cert in signature determines ID of session to
134        # delete
135        self.sm.deleteUserSession(userCert=self.userCert)
136        print "User disconnected from Session Manager:\n%s" % self.userCert
137
138
139    def test6GetAttCertWithSessID(self):
140        """test6GetAttCertWithSessID: make an attribute request using
141        a session ID as authentication credential"""
142
143        print "\n\t" + self.test6GetAttCertWithSessID.__doc__       
144        self.test1Connect()
145       
146        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
147            sessID=self.sessID, 
148            aaURI=self.cfg.get('test6GetAttCertWithSessID', 'aauri'))
149        if errMsg:
150            self.fail(errMsg)
151           
152        print "Attribute Certificate:\n%s" % attCert
153        attCert.filePath = \
154            self.cfg.get('test6GetAttCertWithSessID', 'acoutfilepath') 
155        attCert.write()
156       
157        return self.sm
158
159
160    def test6aGetAttCertRefusedWithSessID(self):
161        """test6aGetAttCertRefusedWithSessID: make an attribute request using
162        a sessID as authentication credential requesting an AC from an
163        Attribute Authority where the user is NOT registered"""
164
165        print "\n\t" + self.test6aGetAttCertRefusedWithSessID.__doc__       
166        self.test1Connect()
167       
168        aaURI = self.cfg.get('test6aGetAttCertRefusedWithSessID', 'aauri')
169       
170        attCert, errMsg, extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
171                                         aaURI=aaURI,
172                                         mapFromTrustedHosts=False)
173        if errMsg:
174            print "SUCCESS - obtained expected result: %s" % errMsg
175            return
176       
177        self.fail("Request allowed from AA where user is NOT registered!")
178
179
180    def test6bGetMappedAttCertWithSessID(self):
181        """test6bGetMappedAttCertWithSessID: make an attribute request using
182        a session ID as authentication credential"""
183
184        print "\n\t" + self.test6bGetMappedAttCertWithSessID.__doc__       
185        self.test1Connect()
186       
187        # Attribute Certificate cached in test 6 can be used to get a mapped
188        # AC for this test ...
189        self.sm = self.test6GetAttCertWithSessID()
190
191        aaURI = self.cfg.get('test6bGetMappedAttCertWithSessID', 'aauri')
192       
193        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID,
194                                                   aaURI=aaURI,
195                                                   mapFromTrustedHosts=True)
196        if errMsg:
197            self.fail(errMsg)
198           
199        print "Attribute Certificate:\n%s" % attCert 
200
201
202    def test6cGetAttCertWithExtAttCertListWithSessID(self):
203        """test6cGetAttCertWithSessID: make an attribute request using
204        a session ID as authentication credential"""
205       
206        print "\n\t" + \
207            self.test6cGetAttCertWithExtAttCertListWithSessID.__doc__       
208        self.test1Connect()
209       
210        aaURI = \
211            self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 'aauri')
212       
213        # Use output from test6GetAttCertWithSessID!
214        extACFilePath = \
215    self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 'extacfilepath')   
216        extAttCert = open(extACFilePath).read()
217       
218        attCert, errMsg, extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
219                                       aaURI=aaURI,
220                                       extAttCertList=[extAttCert])
221        if errMsg:
222            self.fail(errMsg)
223         
224        print "Attribute Certificate:\n%s" % attCert 
225
226
227    def test7GetAttCertWithUserCert(self):
228        """test7GetAttCertWithUserCert: make an attribute request using
229        a user cert as authentication credential"""
230        print "\n\t" + self.test7GetAttCertWithUserCert.__doc__
231        self.test1Connect()
232
233        # Request an attribute certificate from an Attribute Authority
234        # using the userCert returned from connect()
235       
236        aaURI = self.cfg.get('test7GetAttCertWithUserCert', 'aauri')
237        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
238                                     userCert=self.userCert, aaURI=aaURI)
239        if errMsg:
240            self.fail(errMsg)
241         
242        print "Attribute Certificate:\n%s" % attCert 
243
244
245#_____________________________________________________________________________       
246class SessionMgrTestSuite(unittest.TestSuite):
247   
248    def __init__(self):
249        map = map(SessionMgrTestCase,
250                  (
251                    "test1Connect",
252                    "test2GetSessionStatus",
253                    "test3ConnectNoCreateServerSess",
254                    "test4DisconnectWithSessID",
255                    "test5DisconnectWithUserCert",
256                    "test6GetAttCertWithSessID",
257                    "test6bGetMappedAttCertWithSessID",
258                    "test6cGetAttCertWithExtAttCertListWithSessID",
259                    "test7GetAttCertWithUserCert",
260                  ))
261        unittest.TestSuite.__init__(self, map)
262           
263                                                   
264if __name__ == "__main__":
265    unittest.main()       
Note: See TracBrowser for help on using the repository browser.