source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py @ 4739

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py@4739
Revision 4739, 17.2 KB checked in by pjkersha, 11 years ago (diff)

Refactored x509, xmlsec, XMLSecDoc and combinedservices unit tests separating out test files into the config dir.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager SOAP client interface - makes requests
3for authentication and attribute retrieval.  Test Session Manager and Attribute
4Authority services must be running for *AttCert* tests.  See README in this
5directory
6
7NERC Data Grid Project
8"""
9__author__ = "P J Kershaw"
10__date__ = "23/02/06"
11__copyright__ = "(C) 2007 STFC"
12__license__ = \
13"""This software may be distributed under the terms of the Q Public
14License, version 1.0 or later."""
15__contact__ = "Philip.Kershaw@stfc.ac.uk"
16__revision__ = '$Id: test_sessionmanagerclient.py 4437 2008-11-18 12:34:25Z pjkersha $'
17import logging
18
19
20import unittest
21import os
22import sys
23import getpass
24import re
25import base64
26import urllib2
27
28from os.path import expandvars as xpdVars
29from os.path import join as jnPath
30mkPath = lambda file: jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], 
31                             file)
32
33from ndg.security.common.sessionmanager import SessionManagerClient, \
34    AttributeRequestDenied
35
36from ndg.security.test import BaseTestCase   
37from ndg.security.common.X509 import X509CertParse, X509CertRead
38from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
39from ndg.security.common.utils.configfileparsers import \
40    CaseSensitiveConfigParser
41
42
43class CombinedServicesTestCase(BaseTestCase):
44    '''Unit tests for ndg.security.common.sessionmanager.SessionManagerClient
45    - SOAP Session Manager client interface
46    '''
47    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
48       
49    test01Passphrase = None
50    test03Passphrase = None
51
52    def _getCertChainFromProxyCertFile(self, certChainFilePath):
53        '''Read user cert and user cert from a single PEM file and put in
54        a list ready for input into SignatureHandler'''               
55        certChainFileTxt = open(certChainFilePath).read()
56       
57        pemPatRE = re.compile(CombinedServicesTestCase.pemPat, re.S)
58        x509CertList = pemPatRE.findall(certChainFileTxt)
59       
60        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
61                            x509CertList]
62   
63        # Expecting user cert first - move this to the end.  This will
64        # be the cert used to verify the message signature
65        signingCertChain.reverse()
66       
67        return signingCertChain
68
69    def _httpBasicAuthReq(self, *args):
70        """Utility for making a client request to the WSGI test application
71        using HTTP Basic Authentication"""
72        req = urllib2.Request(args[0])
73       
74        # username and password are optional args 2 and 3
75        if len(args) == 3:
76            base64String = base64.encodestring('%s:%s'%(args[1:]))[:-1]
77            authHeader =  "Basic %s" % base64String
78            req.add_header("Authorization", authHeader)
79           
80        handle = urllib2.urlopen(req)
81           
82        return handle.read()
83       
84
85    def setUp(self):
86        super(CombinedServicesTestCase, self).setUp()
87
88        if 'NDGSEC_INT_DEBUG' in os.environ:
89            import pdb
90            pdb.set_trace()
91       
92        if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ:
93            os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'] = \
94                os.path.abspath(os.path.dirname(__file__))
95
96        self.cfgParser = CaseSensitiveConfigParser()
97        cfgFilePath = jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'],
98                             'test_combinedservices.cfg')
99        self.cfgParser.read(cfgFilePath)
100       
101        self.cfg = {}
102        for section in self.cfgParser.sections():
103            self.cfg[section] = dict(self.cfgParser.items(section))
104
105        try:
106            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
107                         self.cfg['setUp']['sslCACertFilePathList'].split()]
108        except KeyError:
109            sslCACertList = []
110       
111        # Set logging
112        try:
113            logLevel = getattr(logging, self.cfg['setUp']['logLevel'])
114        except AttributeError:
115            raise AttributeError("logLevel=%s not recognised, try one of: "
116                                 "CRITICAL, ERROR, WARNING, INFO, DEBUG or "
117                                 "NOTSET" % self.cfg['setUp']['logLevel'])
118           
119        logging.basicConfig(level=logLevel)
120       
121        # Instantiate WS proxy
122        self.clnt = SessionManagerClient(uri=self.cfg['setUp']['uri'],
123                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
124                        sslCACertList=sslCACertList,
125                        cfgFileSection='wsse',
126                        cfg=self.cfgParser) 
127               
128        self.sessID = None
129        self.userX509Cert = None
130        self.userPriKey = None
131        self.issuingCert = None
132       
133
134    def test01Connect(self):
135        """test01Connect: Connect as if acting as a browser client -
136        a session ID is returned"""
137       
138        username = self.cfg['test01Connect']['username']
139       
140        if CombinedServicesTestCase.test01Passphrase is None:
141            CombinedServicesTestCase.test01Passphrase = \
142                                    self.cfg['test01Connect'].get('passphrase')
143       
144        if not CombinedServicesTestCase.test01Passphrase:
145            CombinedServicesTestCase.test01Passphrase = getpass.getpass(\
146                prompt="\ntest01Connect pass-phrase for user %s: " % username)
147
148        self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
149            self.clnt.connect(self.cfg['test01Connect']['username'], 
150                    passphrase=CombinedServicesTestCase.test01Passphrase)
151
152        print("User '%s' connected to Session Manager:\n%s" % (username, 
153                                                               self.sessID))
154           
155           
156    def test02GetSessionStatus(self):
157        """test02GetSessionStatus: check a session is alive"""
158        print "\n\t" + self.test02GetSessionStatus.__doc__
159       
160        self.test01Connect()
161        assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead"
162               
163        print("User connected to Session Manager with sessID=%s" % self.sessID)
164
165        assert not self.clnt.getSessionStatus(sessID='abc'), \
166                                                "sessID=abc shouldn't exist!"
167           
168        print "CORRECT: sessID=abc doesn't exist"
169
170
171    def test03ConnectNoCreateServerSess(self):
172        """test03ConnectNoCreateServerSess: Connect without creating a session -
173        sessID should be None.  This only indicates that the username/password
174        are correct.  To be of practical use the AuthNService plugin at
175        the Session Manager needs to return X.509 credentials e.g.
176        with MyProxy plugin."""
177
178        username = self.cfg['test03ConnectNoCreateServerSess']['username']
179       
180        if CombinedServicesTestCase.test03Passphrase is None:
181            CombinedServicesTestCase.test03Passphrase = \
182                self.cfg['test03ConnectNoCreateServerSess'].get('passphrase')
183               
184        if not CombinedServicesTestCase.test03Passphrase:
185            prompt="\ntest03ConnectNoCreateServerSess pass-phrase for user %s: "
186            CombinedServicesTestCase.test03Passphrase = getpass.getpass(\
187                                                    prompt=prompt % username)
188           
189        userX509Cert, userPriKey,issuingCert, sessID = \
190            self.clnt.connect(username, 
191                      passphrase=CombinedServicesTestCase.test03Passphrase,
192                      createServerSess=False)
193       
194        # Expect null session ID
195        assert(not sessID)
196         
197        print("Successfully authenticated")
198           
199
200    def test04DisconnectWithSessID(self):
201        """test04DisconnectWithSessID: disconnect as if acting as a browser
202        client
203        """
204       
205        print "\n\t" + self.test04DisconnectWithSessID.__doc__
206        self.test01Connect()
207       
208        self.clnt.disconnect(sessID=self.sessID)
209       
210        print("User disconnected from Session Manager:\n%s" % self.sessID)
211           
212
213    def test05DisconnectWithUserX509Cert(self):
214        """test05DisconnectWithUserX509Cert: Disconnect as a command line client
215        """
216       
217        print "\n\t" + self.test05DisconnectWithUserX509Cert.__doc__
218        self.test01Connect()
219       
220        # Use user cert / private key just obtained from connect call for
221        # signature generation
222        if self.issuingCert:
223            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
224            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
225            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
226                                                           self.userX509Cert)
227            self.clnt.signatureHandler.signingCert = None
228        else:
229            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
230            self.clnt.signatureHandler.signingPriKeyPwd = \
231                CombinedServicesTestCase.test01Passphrase
232            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
233            self.clnt.signatureHandler.signingCertChain = ()
234            self.clnt.signatureHandler.signingCert = self.userX509Cert
235           
236        # user X.509 cert in signature determines ID of session to delete
237        self.clnt.disconnect()
238        print("User disconnected from Session Manager:\n%s"%self.userX509Cert)
239
240
241    def test06GetAttCertWithSessID(self):
242        """test06GetAttCertWithSessID: make an attribute request using
243        a session ID as authentication credential"""
244
245        print "\n\t" + self.test06GetAttCertWithSessID.__doc__
246        thisSection = self.cfg['test06GetAttCertWithSessID']     
247        self.test01Connect()
248       
249        attCert = self.clnt.getAttCert(sessID=self.sessID, 
250                                       attributeAuthorityURI=thisSection['aaURI'])
251       
252        print "Attribute Certificate:\n%s" % attCert
253        attCert.filePath = xpdVars(thisSection['acOutFilePath']) 
254        attCert.write() 
255
256
257    def test07GetAttCertWithUserX509Cert(self):
258        """test07GetAttCertWithUserX509Cert: make an attribute request using
259        a user cert as authentication credential"""
260        print "\n\t" + self.test07GetAttCertWithUserX509Cert.__doc__
261        self.test01Connect()
262
263        if self.issuingCert:
264            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
265            self.clnt.signatureHandler.signingPriKeyPwd = \
266                                CombinedServicesTestCase.test01Passphrase
267            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
268            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
269                                                           self.userX509Cert)
270            self.clnt.signatureHandler.signingCert = None
271        else:
272            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
273            self.clnt.signatureHandler.signingPriKeyPwd = \
274                                CombinedServicesTestCase.test01Passphrase
275            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
276            self.clnt.signatureHandler.signingCertChain = ()
277            self.clnt.signatureHandler.signingCert = self.userX509Cert
278       
279        # Request an attribute certificate from an Attribute Authority
280        # using the userX509Cert returned from connect()
281       
282        aaURI = self.cfg['test07GetAttCertWithUserX509Cert']['aaURI']
283        attCert = self.clnt.getAttCert(attributeAuthorityURI=aaURI)
284         
285        print("Attribute Certificate:\n%s" % attCert) 
286
287
288    def test08GetAttCertFromLocalAttributeAuthority(self):
289        """test08GetAttCertFromLocalAttributeAuthority: query the Attribute
290        Authority running in the same server instance as the Session Manager"""
291
292        print "\n\t" + self.test08GetAttCertFromLocalAttributeAuthority.__doc__
293        self.test01Connect()
294       
295        attCert = self.clnt.getAttCert(sessID=self.sessID)
296       
297        print "Attribute Certificate:\n%s" % attCert
298
299
300    def test09WSGILocalSessionManagerInstanceConnect(self):
301        """test09WSGILocalSessionManagerInstanceConnect: test a WSGI app
302        calling a Session Manager WSGI instance local to the server"""
303       
304        # Make a client connection to the WSGI app - authenticate with WSGI
305        # basic auth.  The WSGI app calls a Session Manager WSGI running in
306        # the same code stack
307        thisSection = self.cfg['test09WSGILocalSessionManagerInstanceConnect']
308        url = thisSection['url']
309        username = thisSection['username']
310        password = thisSection['passphrase']
311        print("WSGI app connecting to local Session Manager instance: %s" %
312              self._httpBasicAuthReq(url, username, password))
313
314
315    def test10WSGILocalSessionManagerInstanceGetSessionStatus(self):
316        """test10WSGILocalSessionManagerInstanceGetSessionStatus: test a WSGI
317        app calling a Session Manager WSGI instance local to the server"""
318       
319        # Make a client connection to the WSGI app - authenticate with WSGI
320        # basic auth
321        thisSection = self.cfg[
322                    'test10WSGILocalSessionManagerInstanceGetSessionStatus']
323        url = thisSection['url']
324        username = thisSection['username']
325        password = thisSection['passphrase']
326        print("WSGI app connecting to local Session Manager instance: %s" %
327              self._httpBasicAuthReq(url, username, password))
328
329
330    def test11WSGILocalSessionManagerInstanceDisconnect(self):
331        """test11WSGILocalSessionManagerInstanceDisconnect: test a WSGI app
332        calling a Session Manager WSGI instance local to the server"""
333       
334        # Make a client connection to the WSGI app - authenticate with WSGI
335        # basic auth
336        thisSection=self.cfg['test11WSGILocalSessionManagerInstanceDisconnect']
337        url = thisSection['url']
338        username = thisSection['username']
339        password = thisSection['passphrase']
340        print("WSGI app connecting to local Session Manager instance: %s" %
341              self._httpBasicAuthReq(url, username, password))     
342
343
344    def test12WSGILocalSessionManagerInstanceGetAttCert(self):
345        """test12WSGILocalSessionManagerInstanceGetAttCert: test a WSGI app
346        calling a Session Manager WSGI instance local to the server"""
347       
348        # Make a client connection to the WSGI app - authenticate with WSGI
349        # basic auth
350        thisSection=self.cfg['test12WSGILocalSessionManagerInstanceGetAttCert']
351        args = (thisSection['url'], thisSection['username'],
352                thisSection['passphrase'])
353       
354        print("WSGI app connecting to local Session Manager instance: %s" %
355              self._httpBasicAuthReq(*args))       
356       
357
358    def test13WSGILocalAttributeAuthorityInstanceGetHostInfo(self):
359        """test13WSGILocalAttributeAuthorityInstanceGetHostInfo: test a WSGI
360        app calling a Attribute Authority WSGI instance local to the server"""
361       
362        # Make a client connection to the WSGI app - authenticate with WSGI
363        # basic auth
364        thisSection = self.cfg[
365                        'test13WSGILocalAttributeAuthorityInstanceGetHostInfo']
366       
367        print("WSGI app connecting to local Attribute Authority instance: %s" %
368              self._httpBasicAuthReq(thisSection['url']))       
369       
370
371    def test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo(self):
372        """test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo: test a
373        WSGI app calling a Attribute Authority WSGI instance local to the
374        server"""
375       
376        # Make a client connection to the WSGI app - authenticate with WSGI
377        # basic auth
378        thisSection = self.cfg[
379                'test14WSGILocalAttributeAuthorityInstanceGetTrustedHostInfo']
380       
381        print("WSGI app connecting to local Attribute Authority instance: %s" %
382            self._httpBasicAuthReq(thisSection['url']+'?'+thisSection['role']))       
383       
384
385    def test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo(self):
386        """test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo: test a
387        WSGI app calling a Attribute Authority WSGI instance local to the
388        server"""
389       
390        # Make a client connection to the WSGI app - authenticate with WSGI
391        # basic auth
392        thisSection = self.cfg[
393                    'test15WSGILocalAttributeAuthorityInstanceGetAllHostsInfo']
394       
395        print("WSGI app connecting to local Attribute Authority instance: %s" %
396              self._httpBasicAuthReq(thisSection['url']))       
397
398
399    def test16WSGILocalAttributeAuthorityInstanceGetAttCert(self):
400        """test16WSGILocalAttributeAuthorityInstanceGetAttCert: test a WSGI app
401        calling a Attribute Authority WSGI instance local to the server"""
402       
403        # Make a client connection to the WSGI app - authenticate with WSGI
404        # basic auth
405        thisSection = self.cfg[
406                        'test16WSGILocalAttributeAuthorityInstanceGetAttCert']
407        args = (thisSection['url'], thisSection['username'],
408                thisSection['passphrase'])
409       
410        print("WSGI app connecting to local Attribute Authority instance: %s" %
411              self._httpBasicAuthReq(*args))       
412
413
414if __name__ == "__main__":
415    unittest.main()       
Note: See TracBrowser for help on using the repository browser.