source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test_sessionMgr.py @ 4321

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test_sessionMgr.py@4321
Revision 4321, 13.3 KB checked in by pjkersha, 11 years ago (diff)

rename authenservice -> authnservice

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  Attribute Authority services must be running
4for *AttCert* test methods
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20import traceback
21
22from ndg.security.common.utils.ConfigFileParsers import \
23                                                    CaseSensitiveConfigParser
24from ndg.security.common.X509 import X509CertParse
25from ndg.security.server.SessionMgr import *
26
27from os.path import expandvars as xpdVars
28from os.path import join as jnPath
29mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
30
31import logging
32logging.basicConfig(level=logging.DEBUG)
33
34
35class SessionMgrTestCase(unittest.TestCase):
36    """Unit test case for ndg.security.server.SessionMgr.SessionMgr class.
37   
38    This class manages server side sessions"""
39   
40    test1Passphrase = None
41    test3Passphrase = None
42   
43    def setUp(self):
44       
45        if 'NDGSEC_INT_DEBUG' in os.environ:
46            import pdb
47            pdb.set_trace()
48       
49        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
50            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
51                os.path.abspath(os.path.dirname(__file__))
52       
53        self.cfg = CaseSensitiveConfigParser()
54        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
55                                "sessionMgrTest.cfg")
56        self.cfg.read(configFilePath)
57                   
58        # Initialise the Session Manager client connection
59        # Omit traceFile keyword to leave out SOAP debug info
60        self.propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
61        self.sm = SessionMgr(propFilePath=self.propFilePath)
62
63    def _sessionMgrConnect(self):
64        '''Helper method to set up connections'''
65        print "Connecting to session manager..."
66        username = self.cfg.get('test1Connect', 'username')
67        if SessionMgrTestCase.test1Passphrase is None and \
68           self.cfg.has_option('test1Connect', 'passphrase'):
69            SessionMgrTestCase.test1Passphrase = self.cfg.get('test1Connect', 
70                                                              'passphrase')
71       
72        if not SessionMgrTestCase.test1Passphrase:
73            SessionMgrTestCase.test1Passphrase = getpass.getpass(\
74                prompt="\ntest1Connect pass-phrase for user %s: " % username)
75
76        print "Connecting to session manager as user: %s..." % username
77        userCert, self.userPriKey, self.issuingCert, self.sessID = \
78            self.sm.connect(username=username, 
79                            passphrase=SessionMgrTestCase.test1Passphrase)
80        self.userCert = X509CertParse(userCert)
81       
82        print "User '%s' connected to Session Manager:\n%s" % (username, 
83                                                               self.sessID)
84        creds='\n'.join((self.issuingCert or '',
85                         self.userCert.asPEM().strip(),
86                         self.userPriKey))
87        open(mkPath("user.creds"), "w").write(creds)
88        print "Finished setting up connection"
89       
90                                 
91    def test1Connect2AuthNServiceWithNoUserCertReturned(self):
92       
93        username = self.cfg.get('test1Connect', 'username')
94        if SessionMgrTestCase.test1Passphrase is None and \
95           self.cfg.has_option('test1Connect', 'passphrase'):
96            SessionMgrTestCase.test1Passphrase = self.cfg.get('test1Connect', 
97                                                              'passphrase')
98       
99        if not SessionMgrTestCase.test1Passphrase:
100            SessionMgrTestCase.test1Passphrase = getpass.getpass(
101                prompt="\ntest1Connect pass-phrase for user %s: " % username)
102
103        print "Connecting to session manager as user: %s..." %username
104        userCert, userPriKey, issuingCert, sessID = self.sm.connect(
105                            username=username, 
106                            passphrase=SessionMgrTestCase.test1Passphrase)
107        assert(userCert is None)
108        assert(userPriKey is None)
109        assert(issuingCert is None)
110       
111        print "User '%s' connected to Session Manager:\n%s" % \
112                                                        (username, self.sessID)       
113                                 
114    def test2Connect2AuthNServiceReturningAUserCert(self):
115       
116        username = self.cfg.get('test1Connect', 'username')
117        if SessionMgrTestCase.test1Passphrase is None and \
118           self.cfg.has_option('test1Connect', 'passphrase'):
119            SessionMgrTestCase.test1Passphrase = self.cfg.get('test1Connect', 
120                                                              'passphrase')
121       
122        if not SessionMgrTestCase.test1Passphrase:
123            SessionMgrTestCase.test1Passphrase = getpass.getpass(
124                prompt="\ntest1Connect pass-phrase for user %s: " % username)
125
126        print "Connecting to session manager as user: %s..." %username
127        userCert, self.userPriKey, self.issuingCert, self.sessID = \
128            self.sm.connect(username=username, 
129                            passphrase=SessionMgrTestCase.test1Passphrase)
130        self.userCert = X509CertParse(userCert)
131       
132        print "User '%s' connected to Session Manager:\n%s" % \
133                                                        (username, self.sessID)
134        creds='\n'.join((self.issuingCert or '',
135                         self.userCert.asPEM().strip(),
136                         self.userPriKey))
137        open(mkPath("user.creds"), "w").write(creds)
138   
139           
140    def test2GetSessionStatus(self):
141        """test2GetSessionStatus: check a session is alive"""
142       
143        self._sessionMgrConnect()
144        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
145        print "User connected to Session Manager with sessID=%s" % self.sessID
146
147        assert not self.sm.getSessionStatus(sessID='abc'), \
148            "sessID=abc shouldn't exist!"
149           
150        print "CORRECT: sessID=abc doesn't exist"
151       
152    def test3ConnectNoCreateServerSess(self):
153        """test3ConnectNoCreateServerSess: Connect as a non browser client -
154        sessID should be None"""
155        section = 'test3ConnectNoCreateServerSess'
156        username = self.cfg.get(section, 'username')
157
158        if SessionMgrTestCase.test3Passphrase is None and \
159           self.cfg.has_option(section, passphrase):
160            SessionMgrTestCase.test3Passphrase = self.cfg.get(section, 
161                                                              'passphrase')
162       
163        if not SessionMgrTestCase.test3Passphrase:
164            SessionMgrTestCase.test3Passphrase = getpass.getpass(prompt=\
165                                            "\ntest3ConnectNoCreateServerSess "
166                                            "pass-phrase for user %s: " % 
167                                            username)
168
169        self.userCert, self.userPriKey, self.issuingCert, sessID = \
170            self.sm.connect(username=username, 
171                            passphrase=SessionMgrTestCase.test3Passphrase,
172                            createServerSess=False)
173       
174        # Expect null session ID
175        assert not sessID, "Expecting a null session ID!"
176         
177        print("User '%s' retrieved creds. from Session Manager:\n%s" % 
178                                                    (username, self.userCert))
179           
180
181    def test4DisconnectWithSessID(self):
182        """test4DisconnectWithSessID: disconnect as if acting as a browser
183        client
184        """
185       
186        self._sessionMgrConnect()       
187        self.sm.deleteUserSession(sessID=self.sessID)
188       
189        print "User disconnected from Session Manager:\n%s" % self.sessID
190           
191
192    def test5DisconnectWithUserCert(self):
193        """test5DisconnectWithUserCert: Disconnect as a command line client
194        """
195       
196        self._sessionMgrConnect()
197       
198        # Proxy cert in signature determines ID of session to
199        # delete
200        self.sm.deleteUserSession(userCert=self.userCert)
201        print "User disconnected from Session Manager:\n%s" % self.userCert
202
203
204    def test6GetAttCertWithSessID(self):
205        """test6GetAttCertWithSessID: make an attribute request using
206        a session ID as authentication credential"""
207
208        self._sessionMgrConnect()
209       
210        section = 'test6GetAttCertWithSessID'
211        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
212                                        sessID=self.sessID, 
213                                        aaURI=self.cfg.get(section, 'aaURI'))
214        if errMsg:
215            self.fail(errMsg)
216           
217        print "Attribute Certificate:\n%s" % attCert
218        attCert.filePath = xpdVars(self.cfg.get(section, 'acoutfilepath')) 
219        attCert.write()
220       
221        return self.sm
222
223
224    def test6aGetAttCertRefusedWithSessID(self):
225        """test6aGetAttCertRefusedWithSessID: make an attribute request using
226        a sessID as authentication credential requesting an AC from an
227        Attribute Authority where the user is NOT registered"""
228
229        self._sessionMgrConnect()
230       
231        aaURI = self.cfg.get('test6aGetAttCertRefusedWithSessID', 'aaURI')
232       
233        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID, 
234                                                     aaURI=aaURI,
235                                                     mapFromTrustedHosts=False)
236        if errMsg:
237            print "SUCCESS - obtained expected result: %s" % errMsg
238            return
239       
240        self.fail("Request allowed from AA where user is NOT registered!")
241
242
243    def test6bGetMappedAttCertWithSessID(self):
244        """test6bGetMappedAttCertWithSessID: make an attribute request using
245        a session ID as authentication credential"""
246
247        self._sessionMgrConnect()
248       
249        # Attribute Certificate cached in test 6 can be used to get a mapped
250        # AC for this test ...
251        self.sm = self.test6GetAttCertWithSessID()
252
253        aaURI = self.cfg.get('test6bGetMappedAttCertWithSessID', 'aaURI')
254       
255        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID,
256                                                   aaURI=aaURI,
257                                                   mapFromTrustedHosts=True)
258        if errMsg:
259            self.fail(errMsg)
260           
261        print "Attribute Certificate:\n%s" % attCert 
262
263
264    def test6cGetAttCertWithExtAttCertListWithSessID(self):
265        """test6cGetAttCertWithSessID: make an attribute request using
266        a session ID as authentication credential"""
267       
268        self._sessionMgrConnect()
269        section = 'test6cGetAttCertWithExtAttCertListWithSessID'
270        aaURI = self.cfg.get(section, 'aaURI')
271       
272        # Use output from test6GetAttCertWithSessID!
273        extACFilePath = xpdVars(self.cfg.get(section, 'extACFilePath'))   
274        extAttCert = open(extACFilePath).read()
275       
276        attCert, errMsg, extAttCertList = self.sm.getAttCert(
277                                                   sessID=self.sessID, 
278                                                   aaURI=aaURI,
279                                                   extAttCertList=[extAttCert])
280        if errMsg:
281            self.fail(errMsg)
282         
283        print "Attribute Certificate:\n%s" % attCert 
284
285
286    def test7GetAttCertWithUserCert(self):
287        """test7GetAttCertWithUserCert: make an attribute request using
288        a user cert as authentication credential"""
289        self._sessionMgrConnect()
290
291        # Request an attribute certificate from an Attribute Authority
292        # using the userCert returned from connect()
293       
294        aaURI = self.cfg.get('test7GetAttCertWithUserCert', 'aaURI')
295        attCert, errMsg, extAttCertList = self.sm.getAttCert(
296                                     userCert=self.userCert, aaURI=aaURI)
297        if errMsg:
298            self.fail(errMsg)
299         
300        print "Attribute Certificate:\n%s" % attCert 
301
302
303#_____________________________________________________________________________       
304class SessionMgrTestSuite(unittest.TestSuite):
305   
306    def __init__(self):
307        print "SessionMgrTestSuite ..."
308        smTestCaseMap = map(SessionMgrTestCase,
309                          (
310                            "test1Connect",
311                            "test2GetSessionStatus",
312                            "test3ConnectNoCreateServerSess",
313                            "test4DisconnectWithSessID",
314                            "test5DisconnectWithUserCert",
315                            "test6GetAttCertWithSessID",
316                            "test6bGetMappedAttCertWithSessID",
317                            "test6cGetAttCertWithExtAttCertListWithSessID",
318                            "test7GetAttCertWithUserCert",
319                          ))
320        unittest.TestSuite.__init__(self, smTestCaseMap)
321           
322                                                   
323if __name__ == "__main__":
324#    suite = SessionMgrTestSuite()
325#    unittest.TextTestRunner(verbosity=2).run(suite)
326    unittest.main()       
Note: See TracBrowser for help on using the repository browser.