source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/sessionmanager/test_sessionmanager.py @ 5779

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/sessionmanager/test_sessionmanager.py@5779
Revision 5779, 16.9 KB checked in by pjkersha, 11 years ago (diff)

Integrated automated start-up and shutdown of Paste http servers for unit tests.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and attribute retrieval.  Attribute Authority services must be
4running for *AttCert* test methods.  See README in this directory for details
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2009 Science and Technology Facilities Council"
11__license__ = "BSD - see LICENSE file in top-level directory"
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id$'
14import logging
15logging.basicConfig(level=logging.DEBUG)
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from os.path import expandvars as xpdVars
22from os.path import join as jnPath
23mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
24
25from ndg.security.test.unit import BaseTestCase
26
27from ndg.security.common.utils.configfileparsers import \
28    CaseSensitiveConfigParser
29from ndg.security.common.X509 import X509CertParse
30from ndg.security.server.sessionmanager import SessionManager, \
31    CredentialWalletAttributeRequestDenied
32from ndg.security.server.attributeauthority import AttributeAuthority
33
34
35class SessionManagerTestCase(BaseTestCase):
36    """Unit test case for ndg.security.server.sessionmanager.SessionManager
37    class.
38   
39    This class manages server side sessions"""
40   
41    passphrase = None
42    test4Passphrase = None
43
44    def __init__(self, *arg, **kw):
45        super(SessionManagerTestCase, self).__init__(*arg, **kw)
46        self.startAttributeAuthorities()
47
48    def setUp(self):
49        super(SessionManagerTestCase, self).setUp()
50       
51        if 'NDGSEC_INT_DEBUG' in os.environ:
52            import pdb
53            pdb.set_trace()
54       
55        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
56            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
57                os.path.abspath(os.path.dirname(__file__))
58       
59        self.cfg = CaseSensitiveConfigParser()
60        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
61                                "sessionMgrTest.cfg")
62        self.cfg.read(configFilePath)
63                   
64        # Initialise the Session Manager client connection
65        # Omit traceFile keyword to leave out SOAP debug info
66        self.propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
67        self.sm = SessionManager(propFilePath=self.propFilePath)
68
69    def _connect(self):
70        '''Helper method to set up connections'''
71        print "Connecting to session manager..."
72        section = 'DEFAULT'
73       
74        username = self.cfg.get(section, 'username')
75        if SessionManagerTestCase.passphrase is None and \
76           self.cfg.has_option(section, 'passphrase'):
77            SessionManagerTestCase.passphrase=self.cfg.get(section,
78                                                           'passphrase')
79       
80        if not SessionManagerTestCase.passphrase:
81            SessionManagerTestCase.passphrase = getpass.getpass(
82                            prompt="\nPass-phrase for user %s: " % username)
83
84        print("Connecting to session manager as user: %s..." % username)
85        userX509Cert, userPriKey, issuingCert, self.sessID = \
86            self.sm.connect(username=username, 
87                            passphrase=SessionManagerTestCase.passphrase)
88
89        print("User '%s' connected to Session Manager:\n%s" % (username, 
90                                                               self.sessID))
91        print("Finished setting up connection")
92
93    def _connect2UserX509CertAuthNService(self):
94        '''Same as _connect but Session Manager is using an Authentication
95        Service that returns PKI credentials i.e. like MyProxy'''
96       
97        section = 'DEFAULT'
98
99        print("Connecting to session manager with AuthN service returning "
100              "PKI creds...")
101               
102        # Change to alternative authentication service
103        userX509CertFilePath = self.cfg.get(section, 'userX509CertFilePath')
104        userPriKeyFilePath = self.cfg.get(section, 'userPriKeyFilePath')
105        userPriKeyPwd = self.cfg.get(section, 'userPriKeyPwd')
106                                         
107        self.sm['authNService'] = {
108            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
109            'moduleName': 'userx509certauthn',
110            'className': 'UserX509CertAuthN',
111            'userX509CertFilePath': userX509CertFilePath,
112            'userPriKeyFilePath': userPriKeyFilePath
113        }
114
115        self.sm.initAuthNService()
116       
117        username = self.cfg.get(section, 'username')
118        if SessionManagerTestCase.passphrase is None and \
119           self.cfg.has_option(section, 'passphrase'):
120            SessionManagerTestCase.passphrase=self.cfg.get(section, 
121                                                           'passphrase')
122       
123        if not SessionManagerTestCase.passphrase:
124            SessionManagerTestCase.passphrase = getpass.getpass(\
125                prompt="\nPass-phrase for user %s: " % username)
126
127        print("Connecting to session manager as user: %s..." % username)
128        userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
129            self.sm.connect(username=username, 
130                            passphrase=SessionManagerTestCase.passphrase)
131        self.userX509Cert = X509CertParse(userX509Cert)
132       
133        print("User '%s' connected to Session Manager:\n%s" % (username, 
134                                                               self.sessID))
135        print("Finished setting up connection")
136   
137    def test01Connect2AuthNServiceWithNoUserX509CertReturned(self):
138       
139        thisSection = 'test01Connect2AuthNServiceWithNoUserX509CertReturned'
140        username = self.cfg.get(thisSection, 'username')
141        if SessionManagerTestCase.passphrase is None and \
142           self.cfg.has_option(thisSection, 'passphrase'):
143            SessionManagerTestCase.passphrase=self.cfg.get(thisSection, 
144                                                           'passphrase')
145       
146        if not SessionManagerTestCase.passphrase:
147            SessionManagerTestCase.passphrase = getpass.getpass(
148                prompt="\ntest1Connect pass-phrase for user %s: " % username)
149
150        print "Connecting to session manager as user: %s..." %username
151        userX509Cert, userPriKey, issuingCert, sessID = self.sm.connect(
152                                username=username, 
153                                passphrase=SessionManagerTestCase.passphrase)
154        assert(userX509Cert is None)
155        assert(userPriKey is None)
156        assert(issuingCert is None)
157       
158        print("User '%s' connected to Session Manager:\n%s"%(username, sessID))     
159                                 
160    def test02Connect2AuthNServiceReturningAUserX509Cert(self):
161       
162        section = 'test02Connect2AuthNServiceReturningAUserX509Cert'
163       
164        # Change to alternative authentication service
165        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
166        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
167        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
168        outputCredFilePath = self.cfg.get(section, 'outputCredsFilePath')
169                                         
170        self.sm['authNService'] = {
171            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
172            'moduleName': 'userx509certauthn',
173            'className': 'UserX509CertAuthN',
174            'userX509CertFilePath': userX509CertFilePath,
175            'userPriKeyFilePath': userPriKeyFilePath
176        }
177
178        self.sm.initAuthNService()
179       
180        print("Connecting to session manager...")
181        userX509Cert, self.userPriKey, self.issuingCert, sessID = self.sm.connect(
182                                                    passphrase=userPriKeyPwd)
183        self.userX509Cert = X509CertParse(userX509Cert)
184       
185        print("Connected to Session Manager:\n%s" % sessID)
186        creds='\n'.join((self.issuingCert or '',
187                         self.userX509Cert.asPEM().strip(),
188                         self.userPriKey))
189        open(mkPath(outputCredFilePath), "w").write(creds)
190   
191           
192    def test03GetSessionStatus(self):
193        """test03GetSessionStatus: check a session is alive"""
194       
195        self._connect()
196        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
197        print "User connected to Session Manager with sessID=%s" % self.sessID
198
199        assert not self.sm.getSessionStatus(sessID='abc'), \
200            "sessID=abc shouldn't exist!"
201           
202        print "CORRECT: sessID=abc doesn't exist"
203       
204    def test04ConnectNoCreateServerSess(self):
205        """test04ConnectNoCreateServerSess: Connect to retrieve credentials
206        only - no session is created.  This makes sense only for an AuthN
207        Service that returns user credentials"""
208        section = 'test04ConnectNoCreateServerSess'
209       
210        # Change to alternative authentication service
211        userX509CertFilePath = self.cfg.get('DEFAULT', 'userX509CertFilePath')
212        userPriKeyFilePath = self.cfg.get('DEFAULT', 'userPriKeyFilePath')
213        userPriKeyPwd = self.cfg.get('DEFAULT', 'userPriKeyPwd')
214                                         
215        self.sm['authNService'] = {
216            'moduleFilePath': os.environ['NDGSEC_SM_UNITTEST_DIR'],
217            'moduleName': 'userx509certauthn',
218            'className': 'UserX509CertAuthN',
219            'userX509CertFilePath': userX509CertFilePath,
220            'userPriKeyFilePath': userPriKeyFilePath
221        }
222
223        self.sm.initAuthNService()
224       
225       
226        username = self.cfg.get(section, 'username')
227
228        if SessionManagerTestCase.test4Passphrase is None and \
229           self.cfg.has_option(section, 'passphrase'):
230            SessionManagerTestCase.test4Passphrase = self.cfg.get(section, 
231                                                                  'passphrase')
232       
233        if not SessionManagerTestCase.test4Passphrase:
234            SessionManagerTestCase.test4Passphrase = getpass.getpass(prompt=\
235                                            "\n%s pass-phrase for user %s: " % 
236                                            (section, username))
237
238        userX509Cert, userPriKey, issuingCert, sessID = \
239            self.sm.connect(username=username, 
240                            passphrase=SessionManagerTestCase.test4Passphrase,
241                            createServerSess=False)
242       
243        # Expect null session ID
244        assert not sessID, "Expecting a null session ID!"
245         
246        print("User '%s' retrieved creds. from Session Manager:\n%s" % 
247                                                    (username, sessID))
248           
249
250    def test05DisconnectWithSessID(self):
251        """test05DisconnectWithSessID: disconnect as if acting as a browser
252        client
253        """
254       
255        self._connect()       
256        self.sm.deleteUserSession(sessID=self.sessID)
257       
258        print "User disconnected from Session Manager:\n%s" % self.sessID
259           
260
261    def test06DisconnectWithUserX509Cert(self):
262        """test5DisconnectWithUserX509Cert: Disconnect based on a user X.509
263        cert. credential from an earlier call to connect
264        """
265       
266        self._connect2UserX509CertAuthNService()
267       
268        # User cert DN determines ID of session to delete
269        self.sm.deleteUserSession(userX509Cert=self.userX509Cert)
270        print "User disconnected from Session Manager:\n%s" % self.userX509Cert
271
272
273    def test07GetAttCertWithSessID(self):
274        """test07GetAttCertWithSessID: make an attribute request using
275        a session ID as authentication credential"""
276
277        self._connect()
278       
279        section = 'test07GetAttCertWithSessID'
280        aaURI = self.cfg.get(section, 'aaURI')
281        attCert = self.sm.getAttCert(sessID=self.sessID, 
282                                     attributeAuthorityURI=aaURI)
283        print("Attribute Certificate:\n%s" % attCert) 
284        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
285        attCert.write()
286
287
288    def test08GetAttCertRefusedWithSessID(self):
289        """test08GetAttCertRefusedWithSessID: make an attribute request using
290        a sessID as authentication credential requesting an AC from an
291        Attribute Authority where the user is NOT registered"""
292
293        self._connect()
294       
295        aaURI = self.cfg.get('test08GetAttCertRefusedWithSessID', 'aaURI')
296       
297        try:
298            attCert = self.sm.getAttCert(sessID=self.sessID, 
299                                         attributeAuthorityURI=aaURI,
300                                         mapFromTrustedHosts=False)
301        except CredentialWalletAttributeRequestDenied, e:
302            print("SUCCESS - obtained expected result: %s" % e)
303            return
304       
305        self.fail("Request allowed from AA where user is NOT registered!")
306
307
308    def test09GetMappedAttCertWithSessID(self):
309        """test09GetMappedAttCertWithSessID: make an attribute request using
310        a session ID as authentication credential"""
311
312        self._connect()
313       
314        # Attribute Certificate cached in test 6 can be used to get a mapped
315        # AC for this test ...
316        self.test07GetAttCertWithSessID()
317
318        aaURI = self.cfg.get('test09GetMappedAttCertWithSessID', 'aaURI')
319       
320        attCert = self.sm.getAttCert(sessID=self.sessID,
321                                     attributeAuthorityURI=aaURI,
322                                     mapFromTrustedHosts=True)
323           
324        print("Attribute Certificate:\n%s" % attCert) 
325
326
327    def test10GetAttCertWithExtAttCertListWithSessID(self):
328        """test10GetAttCertWithExtAttCertListWithSessID: make an attribute
329        request using a session ID as authentication credential"""
330       
331        self._connect()
332        section = 'test10GetAttCertWithExtAttCertListWithSessID'
333        aaURI = self.cfg.get(section, 'aaURI')
334       
335        # Use output from test6GetAttCertWithSessID!
336        extACFilePath = xpdVars(self.cfg.get(section, 'extACFilePath'))   
337        extAttCert = open(extACFilePath).read()
338       
339        attCert = self.sm.getAttCert(sessID=self.sessID, 
340                                     attributeAuthorityURI=aaURI,
341                                     extAttCertList=[extAttCert])
342        print("Attribute Certificate:\n%s" % attCert) 
343
344
345    def test11GetAttCertWithUserX509Cert(self):
346        """test11GetAttCertWithUserX509Cert: make an attribute request using
347        a user cert as authentication credential"""
348        self._connect2UserX509CertAuthNService()
349
350        # Request an attribute certificate from an Attribute Authority
351        # using the userX509Cert returned from connect()
352       
353        aaURI = self.cfg.get('test11GetAttCertWithUserX509Cert', 'aaURI')
354        attCert = self.sm.getAttCert(userX509Cert=self.userX509Cert, 
355                                     attributeAuthorityURI=aaURI)
356        print("Attribute Certificate:\n%s" % attCert) 
357
358
359    def test12GetAttCertFromLocalAAInstance(self):
360        """test12GetAttCertFromLocalAAInstance: make an attribute request to a
361        locally instantiated Attribute Authority"""
362
363        self._connect()
364       
365        section = 'test12GetAttCertFromLocalAAInstance'
366        aaPropFilePath = self.cfg.get(section, 'aaPropFilePath')
367        attributeAuthority=AttributeAuthority.fromPropertyFile(
368                                                propFilePath=aaPropFilePath)
369       
370        attCert = self.sm.getAttCert(sessID=self.sessID, 
371                                     attributeAuthority=attributeAuthority)           
372        print("Attribute Certificate:\n%s" % attCert) 
373        attCert.filePath = xpdVars(self.cfg.get(section, 'acOutputFilePath')) 
374        attCert.write()
375
376
377class SessionManagerTestSuite(unittest.TestSuite):
378   
379    def __init__(self):
380        print "SessionManagerTestSuite ..."
381        smTestCaseMap = map(SessionManagerTestCase,
382                          (
383                            "test01Connect2AuthNServiceWithNoUserX509CertReturned",
384                            "test02Connect2AuthNServiceReturningAUserX509Cert",
385                            "test03GetSessionStatus",
386                            "test04ConnectNoCreateServerSess",
387                            "test05DisconnectWithSessID",
388                            "test06DisconnectWithUserX509Cert",
389                            "test07GetAttCertWithSessID",
390                            "test08GetAttCertRefusedWithSessID",
391                            "test09GetMappedAttCertWithSessID",
392                            "test10GetAttCertWithExtAttCertListWithSessID",
393                            "test11GetAttCertWithUserX509Cert",
394                            "test12GetAttCertFromLocalAAInstance",
395                          ))
396        unittest.TestSuite.__init__(self, smTestCaseMap)
397           
398                                                   
399if __name__ == "__main__":
400#    suite = SessionManagerTestSuite()
401#    unittest.TextTestRunner(verbosity=2).run(suite)
402    unittest.main()       
Note: See TracBrowser for help on using the repository browser.