source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py @ 4480

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py@4480
Revision 4480, 11.5 KB checked in by pjkersha, 12 years ago (diff)

Combined Services tests:

  • added capability for Session Manager to call a local Attribute Authority in the WSGI stack of the same Paste instance
  • SOAP client can specify that the Session Manager call a local Attribute Authority by setting AttAuthorityURI to nill in the web service call.
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager SOAP client interface - makes requests
3for authentication and attribute retrieval.  Test Session Manager and Attribute
4Authority services must be running for *AttCert* tests.  See README in this
5directory
6
7NERC Data Grid Project
8"""
9__author__ = "P J Kershaw"
10__date__ = "23/02/06"
11__copyright__ = "(C) 2007 STFC & NERC"
12__license__ = \
13"""This software may be distributed under the terms of the Q Public
14License, version 1.0 or later."""
15__contact__ = "Philip.Kershaw@stfc.ac.uk"
16__revision__ = '$Id: test_sessionmanagerclient.py 4437 2008-11-18 12:34:25Z pjkersha $'
17
18import unittest
19import os
20import sys
21import getpass
22import re
23
24from os.path import expandvars as xpdVars
25from os.path import join as jnPath
26mkPath = lambda file: jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], file)
27
28from ndg.security.common.sessionmanager import SessionManagerClient, \
29    AttributeRequestDenied
30   
31from ndg.security.common.X509 import X509CertParse, X509CertRead
32from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
33from ndg.security.common.utils.ConfigFileParsers import \
34    CaseSensitiveConfigParser
35
36
37class CombinedServicesTestCase(unittest.TestCase):
38    '''Unit tests for ndg.security.common.sessionmanager.SessionManagerClient
39    - SOAP Session Manager client interface
40    '''
41    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
42       
43    test01Passphrase = None
44    test03Passphrase = None
45
46    def _getCertChainFromProxyCertFile(self, certChainFilePath):
47        '''Read user cert and user cert from a single PEM file and put in
48        a list ready for input into SignatureHandler'''               
49        certChainFileTxt = open(certChainFilePath).read()
50       
51        pemPatRE = re.compile(CombinedServicesTestCase.pemPat, re.S)
52        x509CertList = pemPatRE.findall(certChainFileTxt)
53       
54        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
55                            x509CertList]
56   
57        # Expecting user cert first - move this to the end.  This will
58        # be the cert used to verify the message signature
59        signingCertChain.reverse()
60       
61        return signingCertChain
62
63
64       
65    def setUp(self):
66
67        if 'NDGSEC_INT_DEBUG' in os.environ:
68            import pdb
69            pdb.set_trace()
70       
71        if 'NDGSEC_COMBINED_SRVS_UNITTEST_DIR' not in os.environ:
72            os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'] = \
73                os.path.abspath(os.path.dirname(__file__))
74
75        self.cfgParser = CaseSensitiveConfigParser()
76        cfgFilePath = jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'],
77                             'test_combinedservices.cfg')
78        self.cfgParser.read(cfgFilePath)
79       
80        self.cfg = {}
81        for section in self.cfgParser.sections():
82            self.cfg[section] = dict(self.cfgParser.items(section))
83
84        try:
85            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
86                         self.cfg['setUp']['sslCACertFilePathList'].split()]
87        except KeyError:
88            sslCACertList = []
89           
90        # Instantiate WS proxy
91        self.clnt = SessionManagerClient(uri=self.cfg['setUp']['uri'],
92                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
93                        sslCACertList=sslCACertList,
94                        cfgFileSection='wsse',
95                        cfg=self.cfgParser) 
96               
97        self.sessID = None
98        self.userX509Cert = None
99        self.userPriKey = None
100        self.issuingCert = None
101       
102
103    def test01Connect(self):
104        """test01Connect: Connect as if acting as a browser client -
105        a session ID is returned"""
106       
107        username = self.cfg['test01Connect']['username']
108       
109        if CombinedServicesTestCase.test01Passphrase is None:
110            CombinedServicesTestCase.test01Passphrase = \
111                                    self.cfg['test01Connect'].get('passphrase')
112       
113        if not CombinedServicesTestCase.test01Passphrase:
114            CombinedServicesTestCase.test01Passphrase = getpass.getpass(\
115                prompt="\ntest01Connect pass-phrase for user %s: " % username)
116
117        self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
118            self.clnt.connect(self.cfg['test01Connect']['username'], 
119                    passphrase=CombinedServicesTestCase.test01Passphrase)
120
121        print("User '%s' connected to Session Manager:\n%s" % (username, 
122                                                               self.sessID))
123           
124           
125    def test02GetSessionStatus(self):
126        """test02GetSessionStatus: check a session is alive"""
127        print "\n\t" + self.test02GetSessionStatus.__doc__
128       
129        self.test01Connect()
130        assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead"
131               
132        print("User connected to Session Manager with sessID=%s" % self.sessID)
133
134        assert not self.clnt.getSessionStatus(sessID='abc'), \
135                                                "sessID=abc shouldn't exist!"
136           
137        print "CORRECT: sessID=abc doesn't exist"
138
139
140    def test03ConnectNoCreateServerSess(self):
141        """test03ConnectNoCreateServerSess: Connect without creating a session -
142        sessID should be None.  This only indicates that the username/password
143        are correct.  To be of practical use the AuthNService plugin at
144        the Session Manager needs to return X.509 credentials e.g.
145        with MyProxy plugin."""
146
147        username = self.cfg['test03ConnectNoCreateServerSess']['username']
148       
149        if CombinedServicesTestCase.test03Passphrase is None:
150            CombinedServicesTestCase.test03Passphrase = \
151                self.cfg['test03ConnectNoCreateServerSess'].get('passphrase')
152               
153        if not CombinedServicesTestCase.test03Passphrase:
154            prompt="\ntest03ConnectNoCreateServerSess pass-phrase for user %s: "
155            CombinedServicesTestCase.test03Passphrase = getpass.getpass(\
156                                                    prompt=prompt % username)
157           
158        userX509Cert, userPriKey,issuingCert, sessID = \
159            self.clnt.connect(username, 
160                      passphrase=CombinedServicesTestCase.test03Passphrase,
161                      createServerSess=False)
162       
163        # Expect null session ID
164        assert(not sessID)
165         
166        print("Successfully authenticated")
167           
168
169    def test04DisconnectWithSessID(self):
170        """test04DisconnectWithSessID: disconnect as if acting as a browser
171        client
172        """
173       
174        print "\n\t" + self.test04DisconnectWithSessID.__doc__
175        self.test01Connect()
176       
177        self.clnt.disconnect(sessID=self.sessID)
178       
179        print("User disconnected from Session Manager:\n%s" % self.sessID)
180           
181
182    def test05DisconnectWithUserX509Cert(self):
183        """test05DisconnectWithUserX509Cert: Disconnect as a command line client
184        """
185       
186        print "\n\t" + self.test05DisconnectWithUserX509Cert.__doc__
187        self.test01Connect()
188       
189        # Use user cert / private key just obtained from connect call for
190        # signature generation
191        if self.issuingCert:
192            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
193            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
194            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
195                                                           self.userX509Cert)
196            self.clnt.signatureHandler.signingCert = None
197        else:
198            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
199            self.clnt.signatureHandler.signingPriKeyPwd = \
200                CombinedServicesTestCase.test01Passphrase
201            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
202            self.clnt.signatureHandler.signingCertChain = ()
203            self.clnt.signatureHandler.signingCert = self.userX509Cert
204           
205        # user X.509 cert in signature determines ID of session to delete
206        self.clnt.disconnect()
207        print("User disconnected from Session Manager:\n%s"%self.userX509Cert)
208
209
210    def test06GetAttCertWithSessID(self):
211        """test06GetAttCertWithSessID: make an attribute request using
212        a session ID as authentication credential"""
213
214        print "\n\t" + self.test06GetAttCertWithSessID.__doc__
215        thisSection = self.cfg['test06GetAttCertWithSessID']     
216        self.test01Connect()
217       
218        attCert = self.clnt.getAttCert(sessID=self.sessID, 
219                                       attAuthorityURI=thisSection['aaURI'])
220       
221        print "Attribute Certificate:\n%s" % attCert
222        attCert.filePath = xpdVars(thisSection['acOutFilePath']) 
223        attCert.write() 
224
225
226    def test07GetAttCertWithUserX509Cert(self):
227        """test07GetAttCertWithUserX509Cert: make an attribute request using
228        a user cert as authentication credential"""
229        print "\n\t" + self.test07GetAttCertWithUserX509Cert.__doc__
230        self.test01Connect()
231
232        if self.issuingCert:
233            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
234            self.clnt.signatureHandler.signingPriKeyPwd = \
235                                CombinedServicesTestCase.test01Passphrase
236            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
237            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
238                                                           self.userX509Cert)
239            self.clnt.signatureHandler.signingCert = None
240        else:
241            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
242            self.clnt.signatureHandler.signingPriKeyPwd = \
243                                CombinedServicesTestCase.test01Passphrase
244            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
245            self.clnt.signatureHandler.signingCertChain = ()
246            self.clnt.signatureHandler.signingCert = self.userX509Cert
247       
248        # Request an attribute certificate from an Attribute Authority
249        # using the userCert returned from connect()
250       
251        aaURI = self.cfg['test07GetAttCertWithUserX509Cert']['aaURI']
252        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
253         
254        print("Attribute Certificate:\n%s" % attCert) 
255
256
257    def test08GetAttCertFromLocalAttributeAuthority(self):
258        """test08GetAttCertFromLocalAttributeAuthority: query the Attribute
259        Authority running in the same server instance as the Session Manager"""
260
261        print "\n\t" + self.test08GetAttCertFromLocalAttributeAuthority.__doc__
262        self.test01Connect()
263       
264        attCert = self.clnt.getAttCert(sessID=self.sessID)
265       
266        print "Attribute Certificate:\n%s" % attCert
267
268           
269class CombinedServicesTestSuite(unittest.TestSuite):
270   
271    def __init__(self):
272        map = map(CombinedServicesTestCase,
273                  (
274                    "test01Connect",
275                    "test02GetSessionStatus",
276                    "test03ConnectNoCreateServerSess",
277                    "test04DisconnectWithSessID",
278                    "test05DisconnectWithUserX509Cert",
279                    "test06GetAttCertWithSessID",
280                    "test07GetAttCertWithUserX509Cert",
281                  ))
282        unittest.TestSuite.__init__(self, map)
283           
284                                                   
285if __name__ == "__main__":
286    unittest.main()       
Note: See TracBrowser for help on using the repository browser.