source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py @ 4680

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py@4680
Revision 4680, 17.8 KB checked in by pjkersha, 11 years ago (diff)

Global replace to fix copyright from STFC & NERC to STFC alone because it's not possible to have copyright held by two orgs.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager SOAP client interface - makes requests
3for authentication and attribute retrieval.  Test Session Manager and Attribute
4Authority services must be running for *AttCert* tests.  See README in this
5directory
6
7NERC Data Grid Project
8"""
9__author__ = "P J Kershaw"
10__date__ = "23/02/06"
11__copyright__ = "(C) 2007 STFC"
12__license__ = \
13"""This software may be distributed under the terms of the Q Public
14License, version 1.0 or later."""
15__contact__ = "Philip.Kershaw@stfc.ac.uk"
16__revision__ = '$Id: test_sessionmanagerclient.py 4437 2008-11-18 12:34:25Z pjkersha $'
17import logging
18logging.basicConfig(level=logging.DEBUG)
19
20import unittest
21import os
22import sys
23import getpass
24import re
25import base64
26import urllib2
27
28from os.path import expandvars as xpdVars
29from os.path import join as jnPath
30mkPath = lambda file: jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], 
31                             file)
32
33from ndg.security.common.sessionmanager import SessionManagerClient, \
34    AttributeRequestDenied
35   
36from ndg.security.common.X509 import X509CertParse, X509CertRead
37from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
38from ndg.security.common.utils.configfileparsers import \
39    CaseSensitiveConfigParser
40
41
42class CombinedServicesTestCase(unittest.TestCase):
43    '''Unit tests for ndg.security.common.sessionmanager.SessionManagerClient
44    - SOAP Session Manager client interface
45    '''
46    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
47       
48    test01Passphrase = None
49    test03Passphrase = None
50
51    def _getCertChainFromProxyCertFile(self, certChainFilePath):
52        '''Read user cert and user cert from a single PEM file and put in
53        a list ready for input into SignatureHandler'''               
54        certChainFileTxt = open(certChainFilePath).read()
55       
56        pemPatRE = re.compile(CombinedServicesTestCase.pemPat, re.S)
57        x509CertList = pemPatRE.findall(certChainFileTxt)
58       
59        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
60                            x509CertList]
61   
62        # Expecting user cert first - move this to the end.  This will
63        # be the cert used to verify the message signature
64        signingCertChain.reverse()
65       
66        return signingCertChain
67
68    def _httpBasicAuthReq(self, *args):
69        """Utility for making a client request to the WSGI test application
70        using HTTP Basic Authentication"""
71        req = urllib2.Request(args[0])
72       
73        # username and password are optional args 2 and 3
74        if len(args) == 3:
75            base64String = base64.encodestring('%s:%s'%(args[1:]))[:-1]
76            authHeader =  "Basic %s" % base64String
77            req.add_header("Authorization", authHeader)
78           
79        handle = urllib2.urlopen(req)
80           
81        return handle.read()
82       
83
84    def setUp(self):
85
86        if 'NDGSEC_INT_DEBUG' in os.environ:
87            import pdb
88            pdb.set_trace()
89       
90        if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ:
91            os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'] = \
92                os.path.abspath(os.path.dirname(__file__))
93
94        self.cfgParser = CaseSensitiveConfigParser()
95        cfgFilePath = jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'],
96                             'test_combinedservices.cfg')
97        self.cfgParser.read(cfgFilePath)
98       
99        self.cfg = {}
100        for section in self.cfgParser.sections():
101            self.cfg[section] = dict(self.cfgParser.items(section))
102
103        try:
104            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
105                         self.cfg['setUp']['sslCACertFilePathList'].split()]
106        except KeyError:
107            sslCACertList = []
108           
109        # Instantiate WS proxy
110        self.clnt = SessionManagerClient(uri=self.cfg['setUp']['uri'],
111                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
112                        sslCACertList=sslCACertList,
113                        cfgFileSection='wsse',
114                        cfg=self.cfgParser) 
115               
116        self.sessID = None
117        self.userX509Cert = None
118        self.userPriKey = None
119        self.issuingCert = None
120       
121
122    def test01Connect(self):
123        """test01Connect: Connect as if acting as a browser client -
124        a session ID is returned"""
125       
126        username = self.cfg['test01Connect']['username']
127       
128        if CombinedServicesTestCase.test01Passphrase is None:
129            CombinedServicesTestCase.test01Passphrase = \
130                                    self.cfg['test01Connect'].get('passphrase')
131       
132        if not CombinedServicesTestCase.test01Passphrase:
133            CombinedServicesTestCase.test01Passphrase = getpass.getpass(\
134                prompt="\ntest01Connect pass-phrase for user %s: " % username)
135
136        self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
137            self.clnt.connect(self.cfg['test01Connect']['username'], 
138                    passphrase=CombinedServicesTestCase.test01Passphrase)
139
140        print("User '%s' connected to Session Manager:\n%s" % (username, 
141                                                               self.sessID))
142           
143           
144    def test02GetSessionStatus(self):
145        """test02GetSessionStatus: check a session is alive"""
146        print "\n\t" + self.test02GetSessionStatus.__doc__
147       
148        self.test01Connect()
149        assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead"
150               
151        print("User connected to Session Manager with sessID=%s" % self.sessID)
152
153        assert not self.clnt.getSessionStatus(sessID='abc'), \
154                                                "sessID=abc shouldn't exist!"
155           
156        print "CORRECT: sessID=abc doesn't exist"
157
158
159    def test03ConnectNoCreateServerSess(self):
160        """test03ConnectNoCreateServerSess: Connect without creating a session -
161        sessID should be None.  This only indicates that the username/password
162        are correct.  To be of practical use the AuthNService plugin at
163        the Session Manager needs to return X.509 credentials e.g.
164        with MyProxy plugin."""
165
166        username = self.cfg['test03ConnectNoCreateServerSess']['username']
167       
168        if CombinedServicesTestCase.test03Passphrase is None:
169            CombinedServicesTestCase.test03Passphrase = \
170                self.cfg['test03ConnectNoCreateServerSess'].get('passphrase')
171               
172        if not CombinedServicesTestCase.test03Passphrase:
173            prompt="\ntest03ConnectNoCreateServerSess pass-phrase for user %s: "
174            CombinedServicesTestCase.test03Passphrase = getpass.getpass(\
175                                                    prompt=prompt % username)
176           
177        userX509Cert, userPriKey,issuingCert, sessID = \
178            self.clnt.connect(username, 
179                      passphrase=CombinedServicesTestCase.test03Passphrase,
180                      createServerSess=False)
181       
182        # Expect null session ID
183        assert(not sessID)
184         
185        print("Successfully authenticated")
186           
187
188    def test04DisconnectWithSessID(self):
189        """test04DisconnectWithSessID: disconnect as if acting as a browser
190        client
191        """
192       
193        print "\n\t" + self.test04DisconnectWithSessID.__doc__
194        self.test01Connect()
195       
196        self.clnt.disconnect(sessID=self.sessID)
197       
198        print("User disconnected from Session Manager:\n%s" % self.sessID)
199           
200
201    def test05DisconnectWithUserX509Cert(self):
202        """test05DisconnectWithUserX509Cert: Disconnect as a command line client
203        """
204       
205        print "\n\t" + self.test05DisconnectWithUserX509Cert.__doc__
206        self.test01Connect()
207       
208        # Use user cert / private key just obtained from connect call for
209        # signature generation
210        if self.issuingCert:
211            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
212            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
213            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
214                                                           self.userX509Cert)
215            self.clnt.signatureHandler.signingCert = None
216        else:
217            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
218            self.clnt.signatureHandler.signingPriKeyPwd = \
219                CombinedServicesTestCase.test01Passphrase
220            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
221            self.clnt.signatureHandler.signingCertChain = ()
222            self.clnt.signatureHandler.signingCert = self.userX509Cert
223           
224        # user X.509 cert in signature determines ID of session to delete
225        self.clnt.disconnect()
226        print("User disconnected from Session Manager:\n%s"%self.userX509Cert)
227
228
229    def test06GetAttCertWithSessID(self):
230        """test06GetAttCertWithSessID: make an attribute request using
231        a session ID as authentication credential"""
232
233        print "\n\t" + self.test06GetAttCertWithSessID.__doc__
234        thisSection = self.cfg['test06GetAttCertWithSessID']     
235        self.test01Connect()
236       
237        attCert = self.clnt.getAttCert(sessID=self.sessID, 
238                                       attAuthorityURI=thisSection['aaURI'])
239       
240        print "Attribute Certificate:\n%s" % attCert
241        attCert.filePath = xpdVars(thisSection['acOutFilePath']) 
242        attCert.write() 
243
244
245    def test07GetAttCertWithUserX509Cert(self):
246        """test07GetAttCertWithUserX509Cert: make an attribute request using
247        a user cert as authentication credential"""
248        print "\n\t" + self.test07GetAttCertWithUserX509Cert.__doc__
249        self.test01Connect()
250
251        if self.issuingCert:
252            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
253            self.clnt.signatureHandler.signingPriKeyPwd = \
254                                CombinedServicesTestCase.test01Passphrase
255            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
256            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
257                                                           self.userX509Cert)
258            self.clnt.signatureHandler.signingCert = None
259        else:
260            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
261            self.clnt.signatureHandler.signingPriKeyPwd = \
262                                CombinedServicesTestCase.test01Passphrase
263            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
264            self.clnt.signatureHandler.signingCertChain = ()
265            self.clnt.signatureHandler.signingCert = self.userX509Cert
266       
267        # Request an attribute certificate from an Attribute Authority
268        # using the userX509Cert returned from connect()
269       
270        aaURI = self.cfg['test07GetAttCertWithUserX509Cert']['aaURI']
271        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
272         
273        print("Attribute Certificate:\n%s" % attCert) 
274
275
276    def test08GetAttCertFromLocalAttributeAuthority(self):
277        """test08GetAttCertFromLocalAttributeAuthority: query the Attribute
278        Authority running in the same server instance as the Session Manager"""
279
280        print "\n\t" + self.test08GetAttCertFromLocalAttributeAuthority.__doc__
281        self.test01Connect()
282       
283        attCert = self.clnt.getAttCert(sessID=self.sessID)
284       
285        print "Attribute Certificate:\n%s" % attCert
286
287
288    def test09WSGILocalSessionManagerInstanceConnect(self):
289        """test09WSGILocalSessionManagerInstanceConnect: test a WSGI app
290        calling a Session Manager WSGI instance local to the server"""
291       
292        # Make a client connection to the WSGI app - authenticate with WSGI
293        # basic auth.  The WSGI app calls a Session Manager WSGI running in
294        # the same code stack
295        thisSection = self.cfg['test09WSGILocalSessionManagerInstanceConnect']
296        url = thisSection['url']
297        username = thisSection['username']
298        password = thisSection['passphrase']
299        print("WSGI app connecting to local Session Manager instance: %s" %
300              self._httpBasicAuthReq(url, username, password))
301
302
303    def test10WSGILocalSessionManagerInstanceGetSessionStatus(self):
304        """test10WSGILocalSessionManagerInstanceGetSessionStatus: test a WSGI
305        app calling a Session Manager WSGI instance local to the server"""
306       
307        # Make a client connection to the WSGI app - authenticate with WSGI
308        # basic auth
309        thisSection = self.cfg[
310                    'test10WSGILocalSessionManagerInstanceGetSessionStatus']
311        url = thisSection['url']
312        username = thisSection['username']
313        password = thisSection['passphrase']
314        print("WSGI app connecting to local Session Manager instance: %s" %
315              self._httpBasicAuthReq(url, username, password))
316
317
318    def test11WSGILocalSessionManagerInstanceDisconnect(self):
319        """test11WSGILocalSessionManagerInstanceDisconnect: test a WSGI app
320        calling a Session Manager WSGI instance local to the server"""
321       
322        # Make a client connection to the WSGI app - authenticate with WSGI
323        # basic auth
324        thisSection=self.cfg['test11WSGILocalSessionManagerInstanceDisconnect']
325        url = thisSection['url']
326        username = thisSection['username']
327        password = thisSection['passphrase']
328        print("WSGI app connecting to local Session Manager instance: %s" %
329              self._httpBasicAuthReq(url, username, password))     
330
331
332    def test12WSGILocalSessionManagerInstanceGetAttCert(self):
333        """test12WSGILocalSessionManagerInstanceGetAttCert: test a WSGI app
334        calling a Session Manager WSGI instance local to the server"""
335       
336        # Make a client connection to the WSGI app - authenticate with WSGI
337        # basic auth
338        thisSection=self.cfg['test12WSGILocalSessionManagerInstanceGetAttCert']
339        args = (thisSection['url'], thisSection['username'],
340                thisSection['passphrase'])
341       
342        print("WSGI app connecting to local Session Manager instance: %s" %
343              self._httpBasicAuthReq(*args))       
344       
345
346    def test13WSGILocalAttributeAuthorityInstanceGetHostInfo(self):
347        """test13WSGILocalAttributeAuthorityInstanceGetHostInfo: test a WSGI
348        app calling a Attribute Authority WSGI instance local to the server"""
349       
350        # Make a client connection to the WSGI app - authenticate with WSGI
351        # basic auth
352        thisSection = self.cfg[
353                        'test13WSGILocalAttributeAuthorityInstanceGetHostInfo']
354       
355        print("WSGI app connecting to local Attribute Authority instance: %s" %
356              self._httpBasicAuthReq(thisSection['url']))       
357       
358
359    def test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo(self):
360        """test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo: test a
361        WSGI app calling a Attribute Authority WSGI instance local to the
362        server"""
363       
364        # Make a client connection to the WSGI app - authenticate with WSGI
365        # basic auth
366        thisSection = self.cfg[
367                'test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo']
368       
369        print("WSGI app connecting to local Attribute Authority instance: %s" %
370            self._httpBasicAuthReq(thisSection['url']+'?'+thisSection['role']))       
371       
372
373    def test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo(self):
374        """test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo: test a
375        WSGI app calling a Attribute Authority WSGI instance local to the
376        server"""
377       
378        # Make a client connection to the WSGI app - authenticate with WSGI
379        # basic auth
380        thisSection = self.cfg[
381                    'test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo']
382       
383        print("WSGI app connecting to local Attribute Authority instance: %s" %
384              self._httpBasicAuthReq(thisSection['url']))       
385
386
387    def test16WSGILocalAttributeAuthorityInstanceGetAttCert(self):
388        """test16WSGILocalAttributeAuthorityInstanceGetAttCert: test a WSGI app
389        calling a Attribute Authority WSGI instance local to the server"""
390       
391        # Make a client connection to the WSGI app - authenticate with WSGI
392        # basic auth
393        thisSection = self.cfg[
394                        'test16WSGILocalAttributeAuthorityInstanceGetAttCert']
395        args = (thisSection['url'], thisSection['username'],
396                thisSection['passphrase'])
397       
398        print("WSGI app connecting to local Attribute Authority instance: %s" %
399              self._httpBasicAuthReq(*args))       
400
401
402class CombinedServicesTestSuite(unittest.TestSuite):
403   
404    def __init__(self):
405        map = map(CombinedServicesTestCase,
406            (
407            "test01Connect",
408            "test02GetSessionStatus",
409            "test03ConnectNoCreateServerSess",
410            "test04DisconnectWithSessID",
411            "test05DisconnectWithUserX509Cert",
412            "test06GetAttCertWithSessID",
413            "test07GetAttCertWithUserX509Cert",
414            "test08GetAttCertFromLocalAttributeAuthority",
415            "test09WSGILocalSessionManagerInstanceConnect",
416            "test10WSGILocalSessionManagerInstanceGetSessionStatus",
417            "test11WSGILocalSessionManagerInstanceDisconnect",
418            "test12WSGILocalSessionManagerInstanceGetAttCert",
419            "test13WSGILocalAttributeAuthorityInstanceGetHostInfo",
420            "test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo",
421            "test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo"
422            ))
423        unittest.TestSuite.__init__(self, map)
424           
425                                                   
426if __name__ == "__main__":
427    unittest.main()       
Note: See TracBrowser for help on using the repository browser.