source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py @ 4692

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py@4692
Revision 4692, 17.1 KB checked in by pjkersha, 11 years ago (diff)

Refactoring of SSO service to enable use of local AA and SM instances via keys to environ.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager SOAP client interface - makes requests
3for authentication and attribute retrieval.  Test Session Manager and Attribute
4Authority services must be running for *AttCert* tests.  See README in this
5directory
6
7NERC Data Grid Project
8"""
9__author__ = "P J Kershaw"
10__date__ = "23/02/06"
11__copyright__ = "(C) 2007 STFC"
12__license__ = \
13"""This software may be distributed under the terms of the Q Public
14License, version 1.0 or later."""
15__contact__ = "Philip.Kershaw@stfc.ac.uk"
16__revision__ = '$Id: test_sessionmanagerclient.py 4437 2008-11-18 12:34:25Z pjkersha $'
17import logging
18
19
20import unittest
21import os
22import sys
23import getpass
24import re
25import base64
26import urllib2
27
28from os.path import expandvars as xpdVars
29from os.path import join as jnPath
30mkPath = lambda file: jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], 
31                             file)
32
33from ndg.security.common.sessionmanager import SessionManagerClient, \
34    AttributeRequestDenied
35   
36from ndg.security.common.X509 import X509CertParse, X509CertRead
37from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
38from ndg.security.common.utils.configfileparsers import \
39    CaseSensitiveConfigParser
40
41
42class CombinedServicesTestCase(unittest.TestCase):
43    '''Unit tests for ndg.security.common.sessionmanager.SessionManagerClient
44    - SOAP Session Manager client interface
45    '''
46    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
47       
48    test01Passphrase = None
49    test03Passphrase = None
50
51    def _getCertChainFromProxyCertFile(self, certChainFilePath):
52        '''Read user cert and user cert from a single PEM file and put in
53        a list ready for input into SignatureHandler'''               
54        certChainFileTxt = open(certChainFilePath).read()
55       
56        pemPatRE = re.compile(CombinedServicesTestCase.pemPat, re.S)
57        x509CertList = pemPatRE.findall(certChainFileTxt)
58       
59        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
60                            x509CertList]
61   
62        # Expecting user cert first - move this to the end.  This will
63        # be the cert used to verify the message signature
64        signingCertChain.reverse()
65       
66        return signingCertChain
67
68    def _httpBasicAuthReq(self, *args):
69        """Utility for making a client request to the WSGI test application
70        using HTTP Basic Authentication"""
71        req = urllib2.Request(args[0])
72       
73        # username and password are optional args 2 and 3
74        if len(args) == 3:
75            base64String = base64.encodestring('%s:%s'%(args[1:]))[:-1]
76            authHeader =  "Basic %s" % base64String
77            req.add_header("Authorization", authHeader)
78           
79        handle = urllib2.urlopen(req)
80           
81        return handle.read()
82       
83
84    def setUp(self):
85
86        if 'NDGSEC_INT_DEBUG' in os.environ:
87            import pdb
88            pdb.set_trace()
89       
90        if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ:
91            os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'] = \
92                os.path.abspath(os.path.dirname(__file__))
93
94        self.cfgParser = CaseSensitiveConfigParser()
95        cfgFilePath = jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'],
96                             'test_combinedservices.cfg')
97        self.cfgParser.read(cfgFilePath)
98       
99        self.cfg = {}
100        for section in self.cfgParser.sections():
101            self.cfg[section] = dict(self.cfgParser.items(section))
102
103        try:
104            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
105                         self.cfg['setUp']['sslCACertFilePathList'].split()]
106        except KeyError:
107            sslCACertList = []
108       
109        # Set logging
110        try:
111            logLevel = getattr(logging, self.cfg['setUp']['logLevel'])
112        except AttributeError:
113            raise AttributeError("logLevel=%s not recognised, try one of: "
114                                 "CRITICAL, ERROR, WARNING, INFO, DEBUG or "
115                                 "NOTSET" % self.cfg['setUp']['logLevel'])
116           
117        logging.basicConfig(level=logLevel)
118       
119        # Instantiate WS proxy
120        self.clnt = SessionManagerClient(uri=self.cfg['setUp']['uri'],
121                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
122                        sslCACertList=sslCACertList,
123                        cfgFileSection='wsse',
124                        cfg=self.cfgParser) 
125               
126        self.sessID = None
127        self.userX509Cert = None
128        self.userPriKey = None
129        self.issuingCert = None
130       
131
132    def test01Connect(self):
133        """test01Connect: Connect as if acting as a browser client -
134        a session ID is returned"""
135       
136        username = self.cfg['test01Connect']['username']
137       
138        if CombinedServicesTestCase.test01Passphrase is None:
139            CombinedServicesTestCase.test01Passphrase = \
140                                    self.cfg['test01Connect'].get('passphrase')
141       
142        if not CombinedServicesTestCase.test01Passphrase:
143            CombinedServicesTestCase.test01Passphrase = getpass.getpass(\
144                prompt="\ntest01Connect pass-phrase for user %s: " % username)
145
146        self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
147            self.clnt.connect(self.cfg['test01Connect']['username'], 
148                    passphrase=CombinedServicesTestCase.test01Passphrase)
149
150        print("User '%s' connected to Session Manager:\n%s" % (username, 
151                                                               self.sessID))
152           
153           
154    def test02GetSessionStatus(self):
155        """test02GetSessionStatus: check a session is alive"""
156        print "\n\t" + self.test02GetSessionStatus.__doc__
157       
158        self.test01Connect()
159        assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead"
160               
161        print("User connected to Session Manager with sessID=%s" % self.sessID)
162
163        assert not self.clnt.getSessionStatus(sessID='abc'), \
164                                                "sessID=abc shouldn't exist!"
165           
166        print "CORRECT: sessID=abc doesn't exist"
167
168
169    def test03ConnectNoCreateServerSess(self):
170        """test03ConnectNoCreateServerSess: Connect without creating a session -
171        sessID should be None.  This only indicates that the username/password
172        are correct.  To be of practical use the AuthNService plugin at
173        the Session Manager needs to return X.509 credentials e.g.
174        with MyProxy plugin."""
175
176        username = self.cfg['test03ConnectNoCreateServerSess']['username']
177       
178        if CombinedServicesTestCase.test03Passphrase is None:
179            CombinedServicesTestCase.test03Passphrase = \
180                self.cfg['test03ConnectNoCreateServerSess'].get('passphrase')
181               
182        if not CombinedServicesTestCase.test03Passphrase:
183            prompt="\ntest03ConnectNoCreateServerSess pass-phrase for user %s: "
184            CombinedServicesTestCase.test03Passphrase = getpass.getpass(\
185                                                    prompt=prompt % username)
186           
187        userX509Cert, userPriKey,issuingCert, sessID = \
188            self.clnt.connect(username, 
189                      passphrase=CombinedServicesTestCase.test03Passphrase,
190                      createServerSess=False)
191       
192        # Expect null session ID
193        assert(not sessID)
194         
195        print("Successfully authenticated")
196           
197
198    def test04DisconnectWithSessID(self):
199        """test04DisconnectWithSessID: disconnect as if acting as a browser
200        client
201        """
202       
203        print "\n\t" + self.test04DisconnectWithSessID.__doc__
204        self.test01Connect()
205       
206        self.clnt.disconnect(sessID=self.sessID)
207       
208        print("User disconnected from Session Manager:\n%s" % self.sessID)
209           
210
211    def test05DisconnectWithUserX509Cert(self):
212        """test05DisconnectWithUserX509Cert: Disconnect as a command line client
213        """
214       
215        print "\n\t" + self.test05DisconnectWithUserX509Cert.__doc__
216        self.test01Connect()
217       
218        # Use user cert / private key just obtained from connect call for
219        # signature generation
220        if self.issuingCert:
221            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
222            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
223            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
224                                                           self.userX509Cert)
225            self.clnt.signatureHandler.signingCert = None
226        else:
227            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
228            self.clnt.signatureHandler.signingPriKeyPwd = \
229                CombinedServicesTestCase.test01Passphrase
230            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
231            self.clnt.signatureHandler.signingCertChain = ()
232            self.clnt.signatureHandler.signingCert = self.userX509Cert
233           
234        # user X.509 cert in signature determines ID of session to delete
235        self.clnt.disconnect()
236        print("User disconnected from Session Manager:\n%s"%self.userX509Cert)
237
238
239    def test06GetAttCertWithSessID(self):
240        """test06GetAttCertWithSessID: make an attribute request using
241        a session ID as authentication credential"""
242
243        print "\n\t" + self.test06GetAttCertWithSessID.__doc__
244        thisSection = self.cfg['test06GetAttCertWithSessID']     
245        self.test01Connect()
246       
247        attCert = self.clnt.getAttCert(sessID=self.sessID, 
248                                       attributeAuthorityURI=thisSection['aaURI'])
249       
250        print "Attribute Certificate:\n%s" % attCert
251        attCert.filePath = xpdVars(thisSection['acOutFilePath']) 
252        attCert.write() 
253
254
255    def test07GetAttCertWithUserX509Cert(self):
256        """test07GetAttCertWithUserX509Cert: make an attribute request using
257        a user cert as authentication credential"""
258        print "\n\t" + self.test07GetAttCertWithUserX509Cert.__doc__
259        self.test01Connect()
260
261        if self.issuingCert:
262            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
263            self.clnt.signatureHandler.signingPriKeyPwd = \
264                                CombinedServicesTestCase.test01Passphrase
265            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
266            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
267                                                           self.userX509Cert)
268            self.clnt.signatureHandler.signingCert = None
269        else:
270            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
271            self.clnt.signatureHandler.signingPriKeyPwd = \
272                                CombinedServicesTestCase.test01Passphrase
273            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
274            self.clnt.signatureHandler.signingCertChain = ()
275            self.clnt.signatureHandler.signingCert = self.userX509Cert
276       
277        # Request an attribute certificate from an Attribute Authority
278        # using the userX509Cert returned from connect()
279       
280        aaURI = self.cfg['test07GetAttCertWithUserX509Cert']['aaURI']
281        attCert = self.clnt.getAttCert(attributeAuthorityURI=aaURI)
282         
283        print("Attribute Certificate:\n%s" % attCert) 
284
285
286    def test08GetAttCertFromLocalAttributeAuthority(self):
287        """test08GetAttCertFromLocalAttributeAuthority: query the Attribute
288        Authority running in the same server instance as the Session Manager"""
289
290        print "\n\t" + self.test08GetAttCertFromLocalAttributeAuthority.__doc__
291        self.test01Connect()
292       
293        attCert = self.clnt.getAttCert(sessID=self.sessID)
294       
295        print "Attribute Certificate:\n%s" % attCert
296
297
298    def test09WSGILocalSessionManagerInstanceConnect(self):
299        """test09WSGILocalSessionManagerInstanceConnect: test a WSGI app
300        calling a Session Manager WSGI instance local to the server"""
301       
302        # Make a client connection to the WSGI app - authenticate with WSGI
303        # basic auth.  The WSGI app calls a Session Manager WSGI running in
304        # the same code stack
305        thisSection = self.cfg['test09WSGILocalSessionManagerInstanceConnect']
306        url = thisSection['url']
307        username = thisSection['username']
308        password = thisSection['passphrase']
309        print("WSGI app connecting to local Session Manager instance: %s" %
310              self._httpBasicAuthReq(url, username, password))
311
312
313    def test10WSGILocalSessionManagerInstanceGetSessionStatus(self):
314        """test10WSGILocalSessionManagerInstanceGetSessionStatus: test a WSGI
315        app calling a Session Manager WSGI instance local to the server"""
316       
317        # Make a client connection to the WSGI app - authenticate with WSGI
318        # basic auth
319        thisSection = self.cfg[
320                    'test10WSGILocalSessionManagerInstanceGetSessionStatus']
321        url = thisSection['url']
322        username = thisSection['username']
323        password = thisSection['passphrase']
324        print("WSGI app connecting to local Session Manager instance: %s" %
325              self._httpBasicAuthReq(url, username, password))
326
327
328    def test11WSGILocalSessionManagerInstanceDisconnect(self):
329        """test11WSGILocalSessionManagerInstanceDisconnect: test a WSGI app
330        calling a Session Manager WSGI instance local to the server"""
331       
332        # Make a client connection to the WSGI app - authenticate with WSGI
333        # basic auth
334        thisSection=self.cfg['test11WSGILocalSessionManagerInstanceDisconnect']
335        url = thisSection['url']
336        username = thisSection['username']
337        password = thisSection['passphrase']
338        print("WSGI app connecting to local Session Manager instance: %s" %
339              self._httpBasicAuthReq(url, username, password))     
340
341
342    def test12WSGILocalSessionManagerInstanceGetAttCert(self):
343        """test12WSGILocalSessionManagerInstanceGetAttCert: test a WSGI app
344        calling a Session Manager WSGI instance local to the server"""
345       
346        # Make a client connection to the WSGI app - authenticate with WSGI
347        # basic auth
348        thisSection=self.cfg['test12WSGILocalSessionManagerInstanceGetAttCert']
349        args = (thisSection['url'], thisSection['username'],
350                thisSection['passphrase'])
351       
352        print("WSGI app connecting to local Session Manager instance: %s" %
353              self._httpBasicAuthReq(*args))       
354       
355
356    def test13WSGILocalAttributeAuthorityInstanceGetHostInfo(self):
357        """test13WSGILocalAttributeAuthorityInstanceGetHostInfo: test a WSGI
358        app calling a Attribute Authority WSGI instance local to the server"""
359       
360        # Make a client connection to the WSGI app - authenticate with WSGI
361        # basic auth
362        thisSection = self.cfg[
363                        'test13WSGILocalAttributeAuthorityInstanceGetHostInfo']
364       
365        print("WSGI app connecting to local Attribute Authority instance: %s" %
366              self._httpBasicAuthReq(thisSection['url']))       
367       
368
369    def test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo(self):
370        """test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo: test a
371        WSGI app calling a Attribute Authority WSGI instance local to the
372        server"""
373       
374        # Make a client connection to the WSGI app - authenticate with WSGI
375        # basic auth
376        thisSection = self.cfg[
377                'test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo']
378       
379        print("WSGI app connecting to local Attribute Authority instance: %s" %
380            self._httpBasicAuthReq(thisSection['url']+'?'+thisSection['role']))       
381       
382
383    def test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo(self):
384        """test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo: test a
385        WSGI app calling a Attribute Authority WSGI instance local to the
386        server"""
387       
388        # Make a client connection to the WSGI app - authenticate with WSGI
389        # basic auth
390        thisSection = self.cfg[
391                    'test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo']
392       
393        print("WSGI app connecting to local Attribute Authority instance: %s" %
394              self._httpBasicAuthReq(thisSection['url']))       
395
396
397    def test16WSGILocalAttributeAuthorityInstanceGetAttCert(self):
398        """test16WSGILocalAttributeAuthorityInstanceGetAttCert: test a WSGI app
399        calling a Attribute Authority WSGI instance local to the server"""
400       
401        # Make a client connection to the WSGI app - authenticate with WSGI
402        # basic auth
403        thisSection = self.cfg[
404                        'test16WSGILocalAttributeAuthorityInstanceGetAttCert']
405        args = (thisSection['url'], thisSection['username'],
406                thisSection['passphrase'])
407       
408        print("WSGI app connecting to local Attribute Authority instance: %s" %
409              self._httpBasicAuthReq(*args))       
410
411
412if __name__ == "__main__":
413    unittest.main()       
Note: See TracBrowser for help on using the repository browser.