source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanagerclient/test_sessionmanagerclient.py @ 4692

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanagerclient/test_sessionmanagerclient.py@4692
Revision 4692, 13.5 KB checked in by pjkersha, 11 years ago (diff)

Refactoring of SSO service to enable use of local AA and SM instances via keys to environ.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager SOAP client interface - makes requests
3for authentication and attribute retrieval.  Test Session Manager and Attribute
4Authority services must be running for *AttCert* tests.  See README in this
5directory
6
7NERC Data Grid Project
8"""
9__author__ = "P J Kershaw"
10__date__ = "23/02/06"
11__copyright__ = "(C) 2007 STFC"
12__license__ = \
13"""This software may be distributed under the terms of the Q Public
14License, version 1.0 or later."""
15__contact__ = "Philip.Kershaw@stfc.ac.uk"
16__revision__ = '$Id$'
17import logging
18logging.basicConfig(level=logging.DEBUG)
19log = logging.getLogger(__name__)
20
21import unittest
22import os
23import sys
24import getpass
25import re
26
27from os.path import expandvars as xpdVars
28from os.path import join as jnPath
29mkPath = lambda file: jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'], file)
30
31from ndg.security.common.sessionmanager import SessionManagerClient, \
32    AttributeRequestDenied
33   
34from ndg.security.common.X509 import X509CertParse, X509CertRead
35from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
36from ndg.security.common.utils.configfileparsers import \
37    CaseSensitiveConfigParser
38
39
40class SessionManagerClientTestCase(unittest.TestCase):
41    '''Unit tests for ndg.security.common.sessionmanager.SessionManagerClient
42    - SOAP Session Manager client interface
43    '''
44    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
45       
46    test01Passphrase = None
47    test03Passphrase = None
48
49    def _getCertChainFromProxyCertFile(self, certChainFilePath):
50        '''Read user cert and user cert from a single PEM file and put in
51        a list ready for input into SignatureHandler'''               
52        certChainFileTxt = open(certChainFilePath).read()
53       
54        pemPatRE = re.compile(SessionManagerClientTestCase.pemPat, re.S)
55        x509CertList = pemPatRE.findall(certChainFileTxt)
56       
57        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
58                            x509CertList]
59   
60        # Expecting user cert first - move this to the end.  This will
61        # be the cert used to verify the message signature
62        signingCertChain.reverse()
63       
64        return signingCertChain
65
66
67       
68    def setUp(self):
69
70        if 'NDGSEC_INT_DEBUG' in os.environ:
71            import pdb
72            pdb.set_trace()
73       
74        if 'NDGSEC_SMCLNT_UNITTEST_DIR' not in os.environ:
75            os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'] = \
76                os.path.abspath(os.path.dirname(__file__))
77
78        self.cfgParser = CaseSensitiveConfigParser()
79        cfgFilePath = jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'],
80                                'sessionMgrClientTest.cfg')
81        self.cfgParser.read(cfgFilePath)
82       
83        self.cfg = {}
84        for section in self.cfgParser.sections():
85            self.cfg[section] = dict(self.cfgParser.items(section))
86
87        try:
88            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
89                         self.cfg['setUp']['sslCACertFilePathList'].split()]
90        except KeyError:
91            sslCACertList = []
92           
93        # Instantiate WS proxy
94        self.clnt = SessionManagerClient(uri=self.cfg['setUp']['uri'],
95                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
96                        sslCACertList=sslCACertList,
97                        cfgFileSection='wsse',
98                        cfg=self.cfgParser) 
99               
100        self.sessID = None
101        self.userX509Cert = None
102        self.userPriKey = None
103        self.issuingCert = None
104       
105
106    def test01Connect(self):
107        """test01Connect: Connect as if acting as a browser client -
108        a session ID is returned"""
109       
110        username = self.cfg['test01Connect']['username']
111       
112        if SessionManagerClientTestCase.test01Passphrase is None:
113            SessionManagerClientTestCase.test01Passphrase = \
114                                    self.cfg['test01Connect'].get('passphrase')
115       
116        if not SessionManagerClientTestCase.test01Passphrase:
117            SessionManagerClientTestCase.test01Passphrase = getpass.getpass(\
118                prompt="\ntest01Connect pass-phrase for user %s: " % username)
119
120        self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
121            self.clnt.connect(self.cfg['test01Connect']['username'], 
122                    passphrase=SessionManagerClientTestCase.test01Passphrase)
123
124        print("User '%s' connected to Session Manager:\n%s" % (username, 
125                                                               self.sessID))
126           
127           
128    def test02GetSessionStatus(self):
129        """test02GetSessionStatus: check a session is alive"""
130        print "\n\t" + self.test02GetSessionStatus.__doc__
131       
132        self.test01Connect()
133        assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead"
134               
135        print("User connected to Session Manager with sessID=%s" % self.sessID)
136
137        assert not self.clnt.getSessionStatus(sessID='abc'), \
138                                                "sessID=abc shouldn't exist!"
139           
140        print "CORRECT: sessID=abc doesn't exist"
141
142
143    def test03ConnectNoCreateServerSess(self):
144        """test03ConnectNoCreateServerSess: Connect without creating a session -
145        sessID should be None.  This only indicates that the username/password
146        are correct.  To be of practical use the AuthNService plugin at
147        the Session Manager needs to return X.509 credentials e.g.
148        with MyProxy plugin."""
149
150        username = self.cfg['test03ConnectNoCreateServerSess']['username']
151       
152        if SessionManagerClientTestCase.test03Passphrase is None:
153            SessionManagerClientTestCase.test03Passphrase = \
154                self.cfg['test03ConnectNoCreateServerSess'].get('passphrase')
155               
156        if not SessionManagerClientTestCase.test03Passphrase:
157            prompt="\ntest03ConnectNoCreateServerSess pass-phrase for user %s: "
158            SessionManagerClientTestCase.test03Passphrase = getpass.getpass(\
159                                                    prompt=prompt % username)
160           
161        userX509Cert, userPriKey,issuingCert, sessID = \
162            self.clnt.connect(username, 
163                      passphrase=SessionManagerClientTestCase.test03Passphrase,
164                      createServerSess=False)
165       
166        # Expect null session ID
167        assert(not sessID)
168         
169        print("Successfully authenticated")
170           
171
172    def test04DisconnectWithSessID(self):
173        """test04DisconnectWithSessID: disconnect as if acting as a browser
174        client
175        """
176       
177        print "\n\t" + self.test04DisconnectWithSessID.__doc__
178        self.test01Connect()
179       
180        self.clnt.disconnect(sessID=self.sessID)
181       
182        print("User disconnected from Session Manager:\n%s" % self.sessID)
183           
184
185    def test05DisconnectWithUserX509Cert(self):
186        """test05DisconnectWithUserX509Cert: Disconnect as a command line client
187        """
188       
189        print "\n\t" + self.test05DisconnectWithUserX509Cert.__doc__
190        self.test01Connect()
191       
192        # Use user cert / private key just obtained from connect call for
193        # signature generation
194        if self.issuingCert:
195            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
196            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
197            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
198                                                           self.userX509Cert)
199            self.clnt.signatureHandler.signingCert = None
200        else:
201            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
202            self.clnt.signatureHandler.signingPriKeyPwd = \
203                SessionManagerClientTestCase.test01Passphrase
204            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
205            self.clnt.signatureHandler.signingCertChain = ()
206            self.clnt.signatureHandler.signingCert = self.userX509Cert
207           
208        # user X.509 cert in signature determines ID of session to delete
209        self.clnt.disconnect()
210        print("User disconnected from Session Manager:\n%s"%self.userX509Cert)
211
212
213    def test06GetAttCertWithSessID(self):
214        """test06GetAttCertWithSessID: make an attribute request using
215        a session ID as authentication credential"""
216
217        print "\n\t" + self.test06GetAttCertWithSessID.__doc__
218        thisSection = self.cfg['test06GetAttCertWithSessID']     
219        self.test01Connect()
220       
221        attCert = self.clnt.getAttCert(sessID=self.sessID, 
222                                       attributeAuthorityURI=thisSection['aaURI'])
223       
224        print "Attribute Certificate:\n%s" % attCert
225        attCert.filePath = xpdVars(thisSection['acOutFilePath']) 
226        attCert.write()
227
228
229    def test07GetAttCertRefusedWithSessID(self):
230        """test07GetAttCertRefusedWithSessID: make an attribute request using
231        a sessID as authentication credential requesting an AC from an
232        Attribute Authority where the user is NOT registered"""
233
234        print "\n\t" + self.test07GetAttCertRefusedWithSessID.__doc__       
235        self.test01Connect()
236       
237        aaURI = self.cfg['test07GetAttCertRefusedWithSessID']['aaURI']
238       
239        try:
240            attCert = self.clnt.getAttCert(sessID=self.sessID, 
241                                           attributeAuthorityURI=aaURI,
242                                           mapFromTrustedHosts=False)
243        except AttributeRequestDenied, e:
244            print "SUCCESS - obtained expected result: %s" % e
245            return
246       
247        self.fail("Request allowed from AA where user is NOT registered!")
248
249
250    def test08GetMappedAttCertWithSessID(self):
251        """test08GetMappedAttCertWithSessID: make an attribute request using
252        a session ID as authentication credential"""
253
254        print "\n\t" + self.test08GetMappedAttCertWithSessID.__doc__       
255        self.test01Connect()
256       
257        aaURI = self.cfg['test08GetMappedAttCertWithSessID']['aaURI']
258       
259        attCert=self.clnt.getAttCert(sessID=self.sessID, attributeAuthorityURI=aaURI)
260       
261        print "Attribute Certificate:\n%s" % attCert 
262
263
264    def test09GetAttCertWithExtAttCertListWithSessID(self):
265        """test09GetAttCertWithExtAttCertListWithSessID: make an attribute
266        request usinga session ID as authentication credential"""
267       
268        print "\n\t"+self.test09GetAttCertWithExtAttCertListWithSessID.__doc__       
269        self.test01Connect()
270        thisSection = self.cfg['test09GetAttCertWithExtAttCertListWithSessID']
271       
272        aaURI = thisSection['aaURI']
273       
274        # Use output from test06GetAttCertWithSessID!
275        extACFilePath = xpdVars(thisSection['extACFilePath'])
276        extAttCert = open(extACFilePath).read()
277       
278        attCert = self.clnt.getAttCert(sessID=self.sessID, 
279                                       attributeAuthorityURI=aaURI,
280                                       extAttCertList=[extAttCert])
281         
282        print("Attribute Certificate:\n%s" % attCert) 
283
284
285    def test10GetAttCertWithUserX509Cert(self):
286        """test10GetAttCertWithUserX509Cert: make an attribute request using
287        a user cert as authentication credential"""
288        print "\n\t" + self.test10GetAttCertWithUserX509Cert.__doc__
289        self.test01Connect()
290
291        if self.issuingCert:
292            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
293            self.clnt.signatureHandler.signingPriKeyPwd = \
294                                SessionManagerClientTestCase.test01Passphrase
295            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
296            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
297                                                           self.userX509Cert)
298            self.clnt.signatureHandler.signingCert = None
299        else:
300            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
301            self.clnt.signatureHandler.signingPriKeyPwd = \
302                                SessionManagerClientTestCase.test01Passphrase
303            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
304            self.clnt.signatureHandler.signingCertChain = ()
305            self.clnt.signatureHandler.signingCert = self.userX509Cert
306       
307        # Request an attribute certificate from an Attribute Authority
308        # using the userX509Cert returned from connect()
309       
310        aaURI = self.cfg['test10GetAttCertWithUserX509Cert']['aaURI']
311        attCert = self.clnt.getAttCert(attributeAuthorityURI=aaURI)
312         
313        print("Attribute Certificate:\n%s" % attCert) 
314           
315           
316class SessionManagerClientTestSuite(unittest.TestSuite):
317   
318    def __init__(self):
319        map = map(SessionManagerClientTestCase,
320                  (
321                    "test01Connect",
322                    "test02GetSessionStatus",
323                    "test03ConnectNoCreateServerSess",
324                    "test04DisconnectWithSessID",
325                    "test05DisconnectWithUserX509Cert",
326                    "test06GetAttCertWithSessID",
327                    "test08GetMappedAttCertWithSessID",
328                    "test09GetAttCertWithExtAttCertListWithSessID",
329                    "test10GetAttCertWithUserX509Cert",
330                  ))
331        unittest.TestSuite.__init__(self, map)
332           
333                                                   
334if __name__ == "__main__":
335    unittest.main()       
Note: See TracBrowser for help on using the repository browser.