source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanager/test_sessionmanager.py @ 4839

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanager/test_sessionmanager.py@4839
Revision 4839, 16.7 KB checked in by pjkersha, 11 years ago (diff)

Changed licence from Q Public to BSD

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and attribute retrieval.  Attribute Authority services must be
4running for *AttCert* test methods.  See README in this directory for details
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2009 Science and Technology Facilities Council"
11__license__ = "BSD - see LICENSE file in top-level directory"__contact__ = "Philip.Kershaw@stfc.ac.uk"
12__revision__ = '$Id$'
13import logging
14logging.basicConfig(level=logging.DEBUG)
15
16import unittest
17import os, sys, getpass, re
18from ConfigParser import SafeConfigParser
19
20from os.path import expandvars as xpdVars
21from os.path import join as jnPath
22mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
23
24from ndg.security.test import BaseTestCase
25
26from ndg.security.common.utils.configfileparsers import \
27    CaseSensitiveConfigParser
28from ndg.security.common.X509 import X509CertParse
29from ndg.security.server.sessionmanager import SessionManager, \
30    CredentialWalletAttributeRequestDenied
31from ndg.security.server.attributeauthority import AttributeAuthority
32
33
34class SessionManagerTestCase(BaseTestCase):
35    """Unit test case for ndg.security.server.sessionmanager.SessionManager
36    class.
37   
38    This class manages server side sessions"""
39   
40    passphrase = None
41    test4Passphrase = None
42   
43    def setUp(self):
44        super(SessionManagerTestCase, self).setUp()
45       
46        if 'NDGSEC_INT_DEBUG' in os.environ:
47            import pdb
48            pdb.set_trace()
49       
50        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
51            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
52                os.path.abspath(os.path.dirname(__file__))
53       
54        self.cfg = CaseSensitiveConfigParser()
55        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
56                                "sessionMgrTest.cfg")
57        self.cfg.read(configFilePath)
58                   
59        # Initialise the Session Manager client connection
60        # Omit traceFile keyword to leave out SOAP debug info
61        self.propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
62        self.sm = SessionManager(propFilePath=self.propFilePath)
63
64    def _connect(self):
65        '''Helper method to set up connections'''
66        print "Connecting to session manager..."
67        section = 'DEFAULT'
68       
69        username = self.cfg.get(section, 'username')
70        if SessionManagerTestCase.passphrase is None and \
71           self.cfg.has_option(section, 'passphrase'):
72            SessionManagerTestCase.passphrase=self.cfg.get(section,
73                                                           'passphrase')
74       
75        if not SessionManagerTestCase.passphrase:
76            SessionManagerTestCase.passphrase = getpass.getpass(
77                            prompt="\nPass-phrase for user %s: " % username)
78
79        print("Connecting to session manager as user: %s..." % username)
80        userX509Cert, userPriKey, issuingCert, self.sessID = \
81            self.sm.connect(username=username, 
82                            passphrase=SessionManagerTestCase.passphrase)
83
84        print("User '%s' connected to Session Manager:\n%s" % (username, 
85                                                               self.sessID))
86        print("Finished setting up connection")
87
88    def _connect2UserX509CertAuthNService(self):
89        '''Same as _connect but Session Manager is using an Authentication
90        Service that returns PKI credentials i.e. like MyProxy'''
91       
92        section = 'DEFAULT'
93
94        print("Connecting to session manager with AuthN service returning "
95              "PKI creds...")
96               
97        # Change to alternative authentication service
98        userX509CertFilePath = self.cfg.get(section, 'userX509CertFilePath')
99        userPriKeyFilePath = self.cfg.get(section, 'userPriKeyFilePath')
100        userPriKeyPwd = self.cfg.get(section, 'userPriKeyPwd')
101                                         
102        self.sm['authNService'] = {
103            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
104            'moduleName': 'userx509certauthn',
105            'className': 'UserX509CertAuthN',
106            'userX509CertFilePath': userX509CertFilePath,
107            'userPriKeyFilePath': userPriKeyFilePath
108        }
109
110        self.sm.initAuthNService()
111       
112        username = self.cfg.get(section, 'username')
113        if SessionManagerTestCase.passphrase is None and \
114           self.cfg.has_option(section, 'passphrase'):
115            SessionManagerTestCase.passphrase=self.cfg.get(section, 
116                                                           'passphrase')
117       
118        if not SessionManagerTestCase.passphrase:
119            SessionManagerTestCase.passphrase = getpass.getpass(\
120                prompt="\nPass-phrase for user %s: " % username)
121
122        print("Connecting to session manager as user: %s..." % username)
123        userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
124            self.sm.connect(username=username, 
125                            passphrase=SessionManagerTestCase.passphrase)
126        self.userX509Cert = X509CertParse(userX509Cert)
127       
128        print("User '%s' connected to Session Manager:\n%s" % (username, 
129                                                               self.sessID))
130        print("Finished setting up connection")
131   
132    def test01Connect2AuthNServiceWithNoUserX509CertReturned(self):
133       
134        thisSection = 'test01Connect2AuthNServiceWithNoUserX509CertReturned'
135        username = self.cfg.get(thisSection, 'username')
136        if SessionManagerTestCase.passphrase is None and \
137           self.cfg.has_option(thisSection, 'passphrase'):
138            SessionManagerTestCase.passphrase=self.cfg.get(thisSection, 
139                                                           'passphrase')
140       
141        if not SessionManagerTestCase.passphrase:
142            SessionManagerTestCase.passphrase = getpass.getpass(
143                prompt="\ntest1Connect pass-phrase for user %s: " % username)
144
145        print "Connecting to session manager as user: %s..." %username
146        userX509Cert, userPriKey, issuingCert, sessID = self.sm.connect(
147                                username=username, 
148                                passphrase=SessionManagerTestCase.passphrase)
149        assert(userX509Cert is None)
150        assert(userPriKey is None)
151        assert(issuingCert is None)
152       
153        print("User '%s' connected to Session Manager:\n%s"%(username, sessID))     
154                                 
155    def test02Connect2AuthNServiceReturningAUserX509Cert(self):
156       
157        section = 'test02Connect2AuthNServiceReturningAUserX509Cert'
158       
159        # Change to alternative authentication service
160        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
161        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
162        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
163        outputCredFilePath = self.cfg.get(section, 'outputCredsFilePath')
164                                         
165        self.sm['authNService'] = {
166            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
167            'moduleName': 'userx509certauthn',
168            'className': 'UserX509CertAuthN',
169            'userX509CertFilePath': userX509CertFilePath,
170            'userPriKeyFilePath': userPriKeyFilePath
171        }
172
173        self.sm.initAuthNService()
174       
175        print("Connecting to session manager...")
176        userX509Cert, self.userPriKey, self.issuingCert, sessID = self.sm.connect(
177                                                    passphrase=userPriKeyPwd)
178        self.userX509Cert = X509CertParse(userX509Cert)
179       
180        print("Connected to Session Manager:\n%s" % sessID)
181        creds='\n'.join((self.issuingCert or '',
182                         self.userX509Cert.asPEM().strip(),
183                         self.userPriKey))
184        open(mkPath(outputCredFilePath), "w").write(creds)
185   
186           
187    def test03GetSessionStatus(self):
188        """test03GetSessionStatus: check a session is alive"""
189       
190        self._connect()
191        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
192        print "User connected to Session Manager with sessID=%s" % self.sessID
193
194        assert not self.sm.getSessionStatus(sessID='abc'), \
195            "sessID=abc shouldn't exist!"
196           
197        print "CORRECT: sessID=abc doesn't exist"
198       
199    def test04ConnectNoCreateServerSess(self):
200        """test04ConnectNoCreateServerSess: Connect to retrieve credentials
201        only - no session is created.  This makes sense only for an AuthN
202        Service that returns user credentials"""
203        section = 'test04ConnectNoCreateServerSess'
204       
205        # Change to alternative authentication service
206        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
207        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
208        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
209                                         
210        self.sm['authNService'] = {
211            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
212            'moduleName': 'userx509certauthn',
213            'className': 'UserX509CertAuthN',
214            'userX509CertFilePath': userX509CertFilePath,
215            'userPriKeyFilePath': userPriKeyFilePath
216        }
217
218        self.sm.initAuthNService()
219       
220       
221        username = self.cfg.get(section, 'username')
222
223        if SessionManagerTestCase.test4Passphrase is None and \
224           self.cfg.has_option(section, 'passphrase'):
225            SessionManagerTestCase.test4Passphrase = self.cfg.get(section, 
226                                                                  'passphrase')
227       
228        if not SessionManagerTestCase.test4Passphrase:
229            SessionManagerTestCase.test4Passphrase = getpass.getpass(prompt=\
230                                            "\n%s pass-phrase for user %s: " % 
231                                            (section, username))
232
233        userX509Cert, userPriKey, issuingCert, sessID = \
234            self.sm.connect(username=username, 
235                            passphrase=SessionManagerTestCase.test4Passphrase,
236                            createServerSess=False)
237       
238        # Expect null session ID
239        assert not sessID, "Expecting a null session ID!"
240         
241        print("User '%s' retrieved creds. from Session Manager:\n%s" % 
242                                                    (username, sessID))
243           
244
245    def test05DisconnectWithSessID(self):
246        """test05DisconnectWithSessID: disconnect as if acting as a browser
247        client
248        """
249       
250        self._connect()       
251        self.sm.deleteUserSession(sessID=self.sessID)
252       
253        print "User disconnected from Session Manager:\n%s" % self.sessID
254           
255
256    def test06DisconnectWithUserX509Cert(self):
257        """test5DisconnectWithUserX509Cert: Disconnect based on a user X.509
258        cert. credential from an earlier call to connect
259        """
260       
261        self._connect2UserX509CertAuthNService()
262       
263        # User cert DN determines ID of session to delete
264        self.sm.deleteUserSession(userX509Cert=self.userX509Cert)
265        print "User disconnected from Session Manager:\n%s" % self.userX509Cert
266
267
268    def test07GetAttCertWithSessID(self):
269        """test07GetAttCertWithSessID: make an attribute request using
270        a session ID as authentication credential"""
271
272        self._connect()
273       
274        section = 'test07GetAttCertWithSessID'
275        aaURI = self.cfg.get(section, 'aaURI')
276        attCert = self.sm.getAttCert(sessID=self.sessID, 
277                                     attributeAuthorityURI=aaURI)
278        print("Attribute Certificate:\n%s" % attCert) 
279        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
280        attCert.write()
281
282
283    def test08GetAttCertRefusedWithSessID(self):
284        """test08GetAttCertRefusedWithSessID: make an attribute request using
285        a sessID as authentication credential requesting an AC from an
286        Attribute Authority where the user is NOT registered"""
287
288        self._connect()
289       
290        aaURI = self.cfg.get('test08GetAttCertRefusedWithSessID', 'aaURI')
291       
292        try:
293            attCert = self.sm.getAttCert(sessID=self.sessID, 
294                                         attributeAuthorityURI=aaURI,
295                                         mapFromTrustedHosts=False)
296        except CredentialWalletAttributeRequestDenied, e:
297            print("SUCCESS - obtained expected result: %s" % e)
298            return
299       
300        self.fail("Request allowed from AA where user is NOT registered!")
301
302
303    def test09GetMappedAttCertWithSessID(self):
304        """test09GetMappedAttCertWithSessID: make an attribute request using
305        a session ID as authentication credential"""
306
307        self._connect()
308       
309        # Attribute Certificate cached in test 6 can be used to get a mapped
310        # AC for this test ...
311        self.test07GetAttCertWithSessID()
312
313        aaURI = self.cfg.get('test09GetMappedAttCertWithSessID', 'aaURI')
314       
315        attCert = self.sm.getAttCert(sessID=self.sessID,
316                                     attributeAuthorityURI=aaURI,
317                                     mapFromTrustedHosts=True)
318           
319        print("Attribute Certificate:\n%s" % attCert) 
320
321
322    def test10GetAttCertWithExtAttCertListWithSessID(self):
323        """test10GetAttCertWithExtAttCertListWithSessID: make an attribute
324        request using a session ID as authentication credential"""
325       
326        self._connect()
327        section = 'test10GetAttCertWithExtAttCertListWithSessID'
328        aaURI = self.cfg.get(section, 'aaURI')
329       
330        # Use output from test6GetAttCertWithSessID!
331        extACFilePath = xpdVars(self.cfg.get(section, 'extACFilePath'))   
332        extAttCert = open(extACFilePath).read()
333       
334        attCert = self.sm.getAttCert(sessID=self.sessID, 
335                                     attributeAuthorityURI=aaURI,
336                                     extAttCertList=[extAttCert])
337        print("Attribute Certificate:\n%s" % attCert) 
338
339
340    def test11GetAttCertWithUserX509Cert(self):
341        """test11GetAttCertWithUserX509Cert: make an attribute request using
342        a user cert as authentication credential"""
343        self._connect2UserX509CertAuthNService()
344
345        # Request an attribute certificate from an Attribute Authority
346        # using the userX509Cert returned from connect()
347       
348        aaURI = self.cfg.get('test11GetAttCertWithUserX509Cert', 'aaURI')
349        attCert = self.sm.getAttCert(userX509Cert=self.userX509Cert, 
350                                     attributeAuthorityURI=aaURI)
351        print("Attribute Certificate:\n%s" % attCert) 
352
353
354    def test12GetAttCertFromLocalAAInstance(self):
355        """test12GetAttCertFromLocalAAInstance: make an attribute request to a
356        locally instantiated Attribute Authority"""
357
358        self._connect()
359       
360        section = 'test12GetAttCertFromLocalAAInstance'
361        aaPropFilePath = self.cfg.get(section, 'aaPropFilePath')
362        attributeAuthority=AttributeAuthority(propFilePath=aaPropFilePath)
363       
364        attCert = self.sm.getAttCert(sessID=self.sessID, 
365                                     attributeAuthority=attributeAuthority)           
366        print("Attribute Certificate:\n%s" % attCert) 
367        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
368        attCert.write()
369
370
371class SessionManagerTestSuite(unittest.TestSuite):
372   
373    def __init__(self):
374        print "SessionManagerTestSuite ..."
375        smTestCaseMap = map(SessionManagerTestCase,
376                          (
377                            "test01Connect2AuthNServiceWithNoUserX509CertReturned",
378                            "test02Connect2AuthNServiceReturningAUserX509Cert",
379                            "test03GetSessionStatus",
380                            "test04ConnectNoCreateServerSess",
381                            "test05DisconnectWithSessID",
382                            "test06DisconnectWithUserX509Cert",
383                            "test07GetAttCertWithSessID",
384                            "test08GetAttCertRefusedWithSessID",
385                            "test09GetMappedAttCertWithSessID",
386                            "test10GetAttCertWithExtAttCertListWithSessID",
387                            "test11GetAttCertWithUserX509Cert",
388                            "test12GetAttCertFromLocalAAInstance",
389                          ))
390        unittest.TestSuite.__init__(self, smTestCaseMap)
391           
392                                                   
393if __name__ == "__main__":
394#    suite = SessionManagerTestSuite()
395#    unittest.TextTestRunner(verbosity=2).run(suite)
396    unittest.main()       
Note: See TracBrowser for help on using the repository browser.