source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py @ 4521

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py@4522
Revision 4521, 17.7 KB checked in by pjkersha, 11 years ago (diff)

Completed tests running Attribute Authority and Session Manager in the same WSGI stack:

  • ndg.security.server.wsgi.utils.attributeauthorityclient.WSGIAttributeAuthorityClient: completed this class and tested in combinedservices unit tests. This class enables WSGI apps to access an AttributeAuthority? WSGI app running in the same stack or else make a callout to a remote SOAP service.
  • ndg.security.server.wsgi.wssecurity: improved config set-up
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager SOAP client interface - makes requests
3for authentication and attribute retrieval.  Test Session Manager and Attribute
4Authority services must be running for *AttCert* tests.  See README in this
5directory
6
7NERC Data Grid Project
8"""
9__author__ = "P J Kershaw"
10__date__ = "23/02/06"
11__copyright__ = "(C) 2007 STFC & NERC"
12__license__ = \
13"""This software may be distributed under the terms of the Q Public
14License, version 1.0 or later."""
15__contact__ = "Philip.Kershaw@stfc.ac.uk"
16__revision__ = '$Id: test_sessionmanagerclient.py 4437 2008-11-18 12:34:25Z pjkersha $'
17
18import unittest
19import os
20import sys
21import getpass
22import re
23import base64
24import urllib2
25
26from os.path import expandvars as xpdVars
27from os.path import join as jnPath
28mkPath = lambda file: jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], 
29                             file)
30
31from ndg.security.common.sessionmanager import SessionManagerClient, \
32    AttributeRequestDenied
33   
34from ndg.security.common.X509 import X509CertParse, X509CertRead
35from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
36from ndg.security.common.utils.ConfigFileParsers import \
37    CaseSensitiveConfigParser
38
39
40class CombinedServicesTestCase(unittest.TestCase):
41    '''Unit tests for ndg.security.common.sessionmanager.SessionManagerClient
42    - SOAP Session Manager client interface
43    '''
44    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
45       
46    test01Passphrase = None
47    test03Passphrase = None
48
49    def _getCertChainFromProxyCertFile(self, certChainFilePath):
50        '''Read user cert and user cert from a single PEM file and put in
51        a list ready for input into SignatureHandler'''               
52        certChainFileTxt = open(certChainFilePath).read()
53       
54        pemPatRE = re.compile(CombinedServicesTestCase.pemPat, re.S)
55        x509CertList = pemPatRE.findall(certChainFileTxt)
56       
57        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
58                            x509CertList]
59   
60        # Expecting user cert first - move this to the end.  This will
61        # be the cert used to verify the message signature
62        signingCertChain.reverse()
63       
64        return signingCertChain
65
66    def _httpBasicAuthReq(self, *args):
67        """Utility for making a client request to the WSGI test application
68        using HTTP Basic Authentication"""
69        req = urllib2.Request(args[0])
70       
71        # username and password are optional args 2 and 3
72        if len(args) == 3:
73            base64String = base64.encodestring('%s:%s'%(args[1:]))[:-1]
74            authHeader =  "Basic %s" % base64String
75            req.add_header("Authorization", authHeader)
76           
77        handle = urllib2.urlopen(req)
78           
79        return handle.read()
80       
81
82    def setUp(self):
83
84        if 'NDGSEC_INT_DEBUG' in os.environ:
85            import pdb
86            pdb.set_trace()
87       
88        if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ:
89            os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'] = \
90                os.path.abspath(os.path.dirname(__file__))
91
92        self.cfgParser = CaseSensitiveConfigParser()
93        cfgFilePath = jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'],
94                             'test_combinedservices.cfg')
95        self.cfgParser.read(cfgFilePath)
96       
97        self.cfg = {}
98        for section in self.cfgParser.sections():
99            self.cfg[section] = dict(self.cfgParser.items(section))
100
101        try:
102            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
103                         self.cfg['setUp']['sslCACertFilePathList'].split()]
104        except KeyError:
105            sslCACertList = []
106           
107        # Instantiate WS proxy
108        self.clnt = SessionManagerClient(uri=self.cfg['setUp']['uri'],
109                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
110                        sslCACertList=sslCACertList,
111                        cfgFileSection='wsse',
112                        cfg=self.cfgParser) 
113               
114        self.sessID = None
115        self.userX509Cert = None
116        self.userPriKey = None
117        self.issuingCert = None
118       
119
120    def test01Connect(self):
121        """test01Connect: Connect as if acting as a browser client -
122        a session ID is returned"""
123       
124        username = self.cfg['test01Connect']['username']
125       
126        if CombinedServicesTestCase.test01Passphrase is None:
127            CombinedServicesTestCase.test01Passphrase = \
128                                    self.cfg['test01Connect'].get('passphrase')
129       
130        if not CombinedServicesTestCase.test01Passphrase:
131            CombinedServicesTestCase.test01Passphrase = getpass.getpass(\
132                prompt="\ntest01Connect pass-phrase for user %s: " % username)
133
134        self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
135            self.clnt.connect(self.cfg['test01Connect']['username'], 
136                    passphrase=CombinedServicesTestCase.test01Passphrase)
137
138        print("User '%s' connected to Session Manager:\n%s" % (username, 
139                                                               self.sessID))
140           
141           
142    def test02GetSessionStatus(self):
143        """test02GetSessionStatus: check a session is alive"""
144        print "\n\t" + self.test02GetSessionStatus.__doc__
145       
146        self.test01Connect()
147        assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead"
148               
149        print("User connected to Session Manager with sessID=%s" % self.sessID)
150
151        assert not self.clnt.getSessionStatus(sessID='abc'), \
152                                                "sessID=abc shouldn't exist!"
153           
154        print "CORRECT: sessID=abc doesn't exist"
155
156
157    def test03ConnectNoCreateServerSess(self):
158        """test03ConnectNoCreateServerSess: Connect without creating a session -
159        sessID should be None.  This only indicates that the username/password
160        are correct.  To be of practical use the AuthNService plugin at
161        the Session Manager needs to return X.509 credentials e.g.
162        with MyProxy plugin."""
163
164        username = self.cfg['test03ConnectNoCreateServerSess']['username']
165       
166        if CombinedServicesTestCase.test03Passphrase is None:
167            CombinedServicesTestCase.test03Passphrase = \
168                self.cfg['test03ConnectNoCreateServerSess'].get('passphrase')
169               
170        if not CombinedServicesTestCase.test03Passphrase:
171            prompt="\ntest03ConnectNoCreateServerSess pass-phrase for user %s: "
172            CombinedServicesTestCase.test03Passphrase = getpass.getpass(\
173                                                    prompt=prompt % username)
174           
175        userX509Cert, userPriKey,issuingCert, sessID = \
176            self.clnt.connect(username, 
177                      passphrase=CombinedServicesTestCase.test03Passphrase,
178                      createServerSess=False)
179       
180        # Expect null session ID
181        assert(not sessID)
182         
183        print("Successfully authenticated")
184           
185
186    def test04DisconnectWithSessID(self):
187        """test04DisconnectWithSessID: disconnect as if acting as a browser
188        client
189        """
190       
191        print "\n\t" + self.test04DisconnectWithSessID.__doc__
192        self.test01Connect()
193       
194        self.clnt.disconnect(sessID=self.sessID)
195       
196        print("User disconnected from Session Manager:\n%s" % self.sessID)
197           
198
199    def test05DisconnectWithUserX509Cert(self):
200        """test05DisconnectWithUserX509Cert: Disconnect as a command line client
201        """
202       
203        print "\n\t" + self.test05DisconnectWithUserX509Cert.__doc__
204        self.test01Connect()
205       
206        # Use user cert / private key just obtained from connect call for
207        # signature generation
208        if self.issuingCert:
209            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
210            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
211            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
212                                                           self.userX509Cert)
213            self.clnt.signatureHandler.signingCert = None
214        else:
215            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
216            self.clnt.signatureHandler.signingPriKeyPwd = \
217                CombinedServicesTestCase.test01Passphrase
218            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
219            self.clnt.signatureHandler.signingCertChain = ()
220            self.clnt.signatureHandler.signingCert = self.userX509Cert
221           
222        # user X.509 cert in signature determines ID of session to delete
223        self.clnt.disconnect()
224        print("User disconnected from Session Manager:\n%s"%self.userX509Cert)
225
226
227    def test06GetAttCertWithSessID(self):
228        """test06GetAttCertWithSessID: make an attribute request using
229        a session ID as authentication credential"""
230
231        print "\n\t" + self.test06GetAttCertWithSessID.__doc__
232        thisSection = self.cfg['test06GetAttCertWithSessID']     
233        self.test01Connect()
234       
235        attCert = self.clnt.getAttCert(sessID=self.sessID, 
236                                       attAuthorityURI=thisSection['aaURI'])
237       
238        print "Attribute Certificate:\n%s" % attCert
239        attCert.filePath = xpdVars(thisSection['acOutFilePath']) 
240        attCert.write() 
241
242
243    def test07GetAttCertWithUserX509Cert(self):
244        """test07GetAttCertWithUserX509Cert: make an attribute request using
245        a user cert as authentication credential"""
246        print "\n\t" + self.test07GetAttCertWithUserX509Cert.__doc__
247        self.test01Connect()
248
249        if self.issuingCert:
250            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
251            self.clnt.signatureHandler.signingPriKeyPwd = \
252                                CombinedServicesTestCase.test01Passphrase
253            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
254            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
255                                                           self.userX509Cert)
256            self.clnt.signatureHandler.signingCert = None
257        else:
258            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
259            self.clnt.signatureHandler.signingPriKeyPwd = \
260                                CombinedServicesTestCase.test01Passphrase
261            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
262            self.clnt.signatureHandler.signingCertChain = ()
263            self.clnt.signatureHandler.signingCert = self.userX509Cert
264       
265        # Request an attribute certificate from an Attribute Authority
266        # using the userX509Cert returned from connect()
267       
268        aaURI = self.cfg['test07GetAttCertWithUserX509Cert']['aaURI']
269        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
270         
271        print("Attribute Certificate:\n%s" % attCert) 
272
273
274    def test08GetAttCertFromLocalAttributeAuthority(self):
275        """test08GetAttCertFromLocalAttributeAuthority: query the Attribute
276        Authority running in the same server instance as the Session Manager"""
277
278        print "\n\t" + self.test08GetAttCertFromLocalAttributeAuthority.__doc__
279        self.test01Connect()
280       
281        attCert = self.clnt.getAttCert(sessID=self.sessID)
282       
283        print "Attribute Certificate:\n%s" % attCert
284
285
286    def test09WSGILocalSessionManagerInstanceConnect(self):
287        """test09WSGILocalSessionManagerInstanceConnect: test a WSGI app
288        calling a Session Manager WSGI instance local to the server"""
289       
290        # Make a client connection to the WSGI app - authenticate with WSGI
291        # basic auth.  The WSGI app calls a Session Manager WSGI running in
292        # the same code stack
293        thisSection = self.cfg['test09WSGILocalSessionManagerInstanceConnect']
294        url = thisSection['url']
295        username = thisSection['username']
296        password = thisSection['passphrase']
297        print("WSGI app connecting to local Session Manager instance: %s" %
298              self._httpBasicAuthReq(url, username, password))
299
300
301    def test10WSGILocalSessionManagerInstanceGetSessionStatus(self):
302        """test10WSGILocalSessionManagerInstanceGetSessionStatus: test a WSGI
303        app calling a Session Manager WSGI instance local to the server"""
304       
305        # Make a client connection to the WSGI app - authenticate with WSGI
306        # basic auth
307        thisSection = self.cfg[
308                    'test10WSGILocalSessionManagerInstanceGetSessionStatus']
309        url = thisSection['url']
310        username = thisSection['username']
311        password = thisSection['passphrase']
312        print("WSGI app connecting to local Session Manager instance: %s" %
313              self._httpBasicAuthReq(url, username, password))
314
315
316    def test11WSGILocalSessionManagerInstanceDisconnect(self):
317        """test11WSGILocalSessionManagerInstanceDisconnect: test a WSGI app
318        calling a Session Manager WSGI instance local to the server"""
319       
320        # Make a client connection to the WSGI app - authenticate with WSGI
321        # basic auth
322        thisSection=self.cfg['test11WSGILocalSessionManagerInstanceDisconnect']
323        url = thisSection['url']
324        username = thisSection['username']
325        password = thisSection['passphrase']
326        print("WSGI app connecting to local Session Manager instance: %s" %
327              self._httpBasicAuthReq(url, username, password))     
328
329
330    def test12WSGILocalSessionManagerInstanceGetAttCert(self):
331        """test12WSGILocalSessionManagerInstanceGetAttCert: test a WSGI app
332        calling a Session Manager WSGI instance local to the server"""
333       
334        # Make a client connection to the WSGI app - authenticate with WSGI
335        # basic auth
336        thisSection=self.cfg['test12WSGILocalSessionManagerInstanceGetAttCert']
337        args = (thisSection['url'], thisSection['username'],
338                thisSection['passphrase'])
339       
340        print("WSGI app connecting to local Session Manager instance: %s" %
341              self._httpBasicAuthReq(*args))       
342       
343
344    def test13WSGILocalAttributeAuthorityInstanceGetHostInfo(self):
345        """test13WSGILocalAttributeAuthorityInstanceGetHostInfo: test a WSGI
346        app calling a Attribute Authority WSGI instance local to the server"""
347       
348        # Make a client connection to the WSGI app - authenticate with WSGI
349        # basic auth
350        thisSection = self.cfg[
351                        'test13WSGILocalAttributeAuthorityInstanceGetHostInfo']
352       
353        print("WSGI app connecting to local Attribute Authority instance: %s" %
354              self._httpBasicAuthReq(thisSection['url']))       
355       
356
357    def test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo(self):
358        """test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo: test a
359        WSGI app calling a Attribute Authority WSGI instance local to the
360        server"""
361       
362        # Make a client connection to the WSGI app - authenticate with WSGI
363        # basic auth
364        thisSection = self.cfg[
365                'test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo']
366       
367        print("WSGI app connecting to local Attribute Authority instance: %s" %
368            self._httpBasicAuthReq(thisSection['url']+'?'+thisSection['role']))       
369       
370
371    def test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo(self):
372        """test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo: test a
373        WSGI app calling a Attribute Authority WSGI instance local to the
374        server"""
375       
376        # Make a client connection to the WSGI app - authenticate with WSGI
377        # basic auth
378        thisSection = self.cfg[
379                    'test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo']
380       
381        print("WSGI app connecting to local Attribute Authority instance: %s" %
382              self._httpBasicAuthReq(thisSection['url']))       
383
384
385    def test16WSGILocalAttributeAuthorityInstanceGetAttCert(self):
386        """test16WSGILocalAttributeAuthorityInstanceGetAttCert: test a WSGI app
387        calling a Attribute Authority WSGI instance local to the server"""
388       
389        # Make a client connection to the WSGI app - authenticate with WSGI
390        # basic auth
391        thisSection = self.cfg[
392                        'test16WSGILocalAttributeAuthorityInstanceGetAttCert']
393        args = (thisSection['url'], thisSection['username'],
394                thisSection['passphrase'])
395       
396        print("WSGI app connecting to local Attribute Authority instance: %s" %
397              self._httpBasicAuthReq(*args))       
398
399
400class CombinedServicesTestSuite(unittest.TestSuite):
401   
402    def __init__(self):
403        map = map(CombinedServicesTestCase,
404            (
405            "test01Connect",
406            "test02GetSessionStatus",
407            "test03ConnectNoCreateServerSess",
408            "test04DisconnectWithSessID",
409            "test05DisconnectWithUserX509Cert",
410            "test06GetAttCertWithSessID",
411            "test07GetAttCertWithUserX509Cert",
412            "test08GetAttCertFromLocalAttributeAuthority",
413            "test09WSGILocalSessionManagerInstanceConnect",
414            "test10WSGILocalSessionManagerInstanceGetSessionStatus",
415            "test11WSGILocalSessionManagerInstanceDisconnect",
416            "test12WSGILocalSessionManagerInstanceGetAttCert",
417            "test13WSGILocalAttributeAuthorityInstanceGetHostInfo",
418            "test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo",
419            "test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo"
420            ))
421        unittest.TestSuite.__init__(self, map)
422           
423                                                   
424if __name__ == "__main__":
425    unittest.main()       
Note: See TracBrowser for help on using the repository browser.