source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanagerclient/test_sessionmanagerclient.py @ 4437

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanagerclient/test_sessionmanagerclient.py@4437
Revision 4437, 13.4 KB checked in by pjkersha, 12 years ago (diff)

Working Session Manager client unit tests for WSGI based Session Manager

  • removed getX509Cert operation from WSDL - no longer needed
  • fix to prefix keyword for ConfigFileParsers? ini file parsing.
  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager SOAP client interface - makes requests
3for authentication and attribute retrieval.  Test Session Manager and Attribute
4Authority services must be running for *AttCert* tests.  See README in this
5directory
6
7NERC Data Grid Project
8"""
9__author__ = "P J Kershaw"
10__date__ = "23/02/06"
11__copyright__ = "(C) 2007 STFC & NERC"
12__license__ = \
13"""This software may be distributed under the terms of the Q Public
14License, version 1.0 or later."""
15__contact__ = "Philip.Kershaw@stfc.ac.uk"
16__revision__ = '$Id$'
17
18import unittest
19import os
20import sys
21import getpass
22import re
23
24from os.path import expandvars as xpdVars
25from os.path import join as jnPath
26mkPath = lambda file: jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'], file)
27
28from ndg.security.common.sessionmanager import SessionManagerClient, \
29    AttributeRequestDenied
30   
31from ndg.security.common.X509 import X509CertParse, X509CertRead
32from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
33from ndg.security.common.utils.ConfigFileParsers import \
34    CaseSensitiveConfigParser
35
36
37class SessionManagerClientTestCase(unittest.TestCase):
38    '''Unit tests for ndg.security.common.sessionmanager.SessionManagerClient
39    - SOAP Session Manager client interface
40    '''
41    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
42       
43    test01Passphrase = None
44    test03Passphrase = None
45
46    def _getCertChainFromProxyCertFile(self, certChainFilePath):
47        '''Read user cert and user cert from a single PEM file and put in
48        a list ready for input into SignatureHandler'''               
49        certChainFileTxt = open(certChainFilePath).read()
50       
51        pemPatRE = re.compile(SessionManagerClientTestCase.pemPat, re.S)
52        x509CertList = pemPatRE.findall(certChainFileTxt)
53       
54        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
55                            x509CertList]
56   
57        # Expecting user cert first - move this to the end.  This will
58        # be the cert used to verify the message signature
59        signingCertChain.reverse()
60       
61        return signingCertChain
62
63
64       
65    def setUp(self):
66
67        if 'NDGSEC_INT_DEBUG' in os.environ:
68            import pdb
69            pdb.set_trace()
70       
71        if 'NDGSEC_SMCLNT_UNITTEST_DIR' not in os.environ:
72            os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'] = \
73                os.path.abspath(os.path.dirname(__file__))
74
75        self.cfgParser = CaseSensitiveConfigParser()
76        cfgFilePath = jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'],
77                                'sessionMgrClientTest.cfg')
78        self.cfgParser.read(cfgFilePath)
79       
80        self.cfg = {}
81        for section in self.cfgParser.sections():
82            self.cfg[section] = dict(self.cfgParser.items(section))
83
84        try:
85            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
86                         self.cfg['setUp']['sslCACertFilePathList'].split()]
87        except KeyError:
88            sslCACertList = []
89           
90        # Instantiate WS proxy
91        self.clnt = SessionManagerClient(uri=self.cfg['setUp']['uri'],
92                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
93                        sslCACertList=sslCACertList,
94                        cfgFileSection='wsse',
95                        cfg=self.cfgParser) 
96               
97        self.sessID = None
98        self.userX509Cert = None
99        self.userPriKey = None
100        self.issuingCert = None
101       
102
103    def test01Connect(self):
104        """test01Connect: Connect as if acting as a browser client -
105        a session ID is returned"""
106       
107        username = self.cfg['test01Connect']['username']
108       
109        if SessionManagerClientTestCase.test01Passphrase is None:
110            SessionManagerClientTestCase.test01Passphrase = \
111                                    self.cfg['test01Connect'].get('passphrase')
112       
113        if not SessionManagerClientTestCase.test01Passphrase:
114            SessionManagerClientTestCase.test01Passphrase = getpass.getpass(\
115                prompt="\ntest01Connect pass-phrase for user %s: " % username)
116
117        self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
118            self.clnt.connect(self.cfg['test01Connect']['username'], 
119                    passphrase=SessionManagerClientTestCase.test01Passphrase)
120
121        print("User '%s' connected to Session Manager:\n%s" % (username, 
122                                                               self.sessID))
123           
124           
125    def test02GetSessionStatus(self):
126        """test02GetSessionStatus: check a session is alive"""
127        print "\n\t" + self.test02GetSessionStatus.__doc__
128       
129        self.test01Connect()
130        assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead"
131               
132        print("User connected to Session Manager with sessID=%s" % self.sessID)
133
134        assert not self.clnt.getSessionStatus(sessID='abc'), \
135                                                "sessID=abc shouldn't exist!"
136           
137        print "CORRECT: sessID=abc doesn't exist"
138
139
140    def test03ConnectNoCreateServerSess(self):
141        """test03ConnectNoCreateServerSess: Connect without creating a session -
142        sessID should be None.  This only indicates that the username/password
143        are correct.  To be of practical use the AuthNService plugin at
144        the Session Manager needs to return X.509 credentials e.g.
145        with MyProxy plugin."""
146
147        username = self.cfg['test03ConnectNoCreateServerSess']['username']
148       
149        if SessionManagerClientTestCase.test03Passphrase is None:
150            SessionManagerClientTestCase.test03Passphrase = \
151                self.cfg['test03ConnectNoCreateServerSess'].get('passphrase')
152               
153        if not SessionManagerClientTestCase.test03Passphrase:
154            prompt="\ntest03ConnectNoCreateServerSess pass-phrase for user %s: "
155            SessionManagerClientTestCase.test03Passphrase = getpass.getpass(\
156                                                    prompt=prompt % username)
157           
158        userX509Cert, userPriKey,issuingCert, sessID = \
159            self.clnt.connect(username, 
160                      passphrase=SessionManagerClientTestCase.test03Passphrase,
161                      createServerSess=False)
162       
163        # Expect null session ID
164        assert(not sessID)
165         
166        print("Successfully authenticated")
167           
168
169    def test04DisconnectWithSessID(self):
170        """test04DisconnectWithSessID: disconnect as if acting as a browser
171        client
172        """
173       
174        print "\n\t" + self.test04DisconnectWithSessID.__doc__
175        self.test01Connect()
176       
177        self.clnt.disconnect(sessID=self.sessID)
178       
179        print("User disconnected from Session Manager:\n%s" % self.sessID)
180           
181
182    def test05DisconnectWithUserX509Cert(self):
183        """test05DisconnectWithUserX509Cert: Disconnect as a command line client
184        """
185       
186        print "\n\t" + self.test05DisconnectWithUserX509Cert.__doc__
187        self.test01Connect()
188       
189        # Use user cert / private key just obtained from connect call for
190        # signature generation
191        if self.issuingCert:
192            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
193            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
194            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
195                                                           self.userX509Cert)
196            self.clnt.signatureHandler.signingCert = None
197        else:
198            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
199            self.clnt.signatureHandler.signingPriKeyPwd = \
200                SessionManagerClientTestCase.test01Passphrase
201            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
202            self.clnt.signatureHandler.signingCertChain = ()
203            self.clnt.signatureHandler.signingCert = self.userX509Cert
204           
205        # user X.509 cert in signature determines ID of session to delete
206        self.clnt.disconnect()
207        print("User disconnected from Session Manager:\n%s"%self.userX509Cert)
208
209
210    def test06GetAttCertWithSessID(self):
211        """test06GetAttCertWithSessID: make an attribute request using
212        a session ID as authentication credential"""
213
214        print "\n\t" + self.test06GetAttCertWithSessID.__doc__
215        thisSection = self.cfg['test06GetAttCertWithSessID']     
216        self.test01Connect()
217       
218        attCert = self.clnt.getAttCert(sessID=self.sessID, 
219                                       attAuthorityURI=thisSection['aaURI'])
220       
221        print "Attribute Certificate:\n%s" % attCert
222        attCert.filePath = xpdVars(thisSection['acOutFilePath']) 
223        attCert.write()
224
225
226    def test07GetAttCertRefusedWithSessID(self):
227        """test07GetAttCertRefusedWithSessID: make an attribute request using
228        a sessID as authentication credential requesting an AC from an
229        Attribute Authority where the user is NOT registered"""
230
231        print "\n\t" + self.test07GetAttCertRefusedWithSessID.__doc__       
232        self.test01Connect()
233       
234        aaURI = self.cfg['test07GetAttCertRefusedWithSessID']['aaURI']
235       
236        try:
237            attCert = self.clnt.getAttCert(sessID=self.sessID, 
238                                           attAuthorityURI=aaURI,
239                                           mapFromTrustedHosts=False)
240        except AttributeRequestDenied, e:
241            print "SUCCESS - obtained expected result: %s" % e
242            return
243       
244        self.fail("Request allowed from AA where user is NOT registered!")
245
246
247    def test08GetMappedAttCertWithSessID(self):
248        """test08GetMappedAttCertWithSessID: make an attribute request using
249        a session ID as authentication credential"""
250
251        print "\n\t" + self.test08GetMappedAttCertWithSessID.__doc__       
252        self.test01Connect()
253       
254        aaURI = self.cfg['test08GetMappedAttCertWithSessID']['aaURI']
255       
256        attCert=self.clnt.getAttCert(sessID=self.sessID, attAuthorityURI=aaURI)
257       
258        print "Attribute Certificate:\n%s" % attCert 
259
260
261    def test09GetAttCertWithExtAttCertListWithSessID(self):
262        """test09GetAttCertWithExtAttCertListWithSessID: make an attribute
263        request usinga session ID as authentication credential"""
264       
265        print "\n\t"+self.test09GetAttCertWithExtAttCertListWithSessID.__doc__       
266        self.test01Connect()
267        thisSection = self.cfg['test09GetAttCertWithExtAttCertListWithSessID']
268       
269        aaURI = thisSection['aaURI']
270       
271        # Use output from test06GetAttCertWithSessID!
272        extACFilePath = xpdVars(thisSection['extACFilePath'])
273        extAttCert = open(extACFilePath).read()
274       
275        attCert = self.clnt.getAttCert(sessID=self.sessID, 
276                                       attAuthorityURI=aaURI,
277                                       extAttCertList=[extAttCert])
278         
279        print("Attribute Certificate:\n%s" % attCert) 
280
281
282    def test10GetAttCertWithUserX509Cert(self):
283        """test10GetAttCertWithUserX509Cert: make an attribute request using
284        a user cert as authentication credential"""
285        print "\n\t" + self.test10GetAttCertWithUserX509Cert.__doc__
286        self.test01Connect()
287
288        if self.issuingCert:
289            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
290            self.clnt.signatureHandler.signingPriKeyPwd = \
291                                SessionManagerClientTestCase.test01Passphrase
292            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
293            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
294                                                           self.userX509Cert)
295            self.clnt.signatureHandler.signingCert = None
296        else:
297            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
298            self.clnt.signatureHandler.signingPriKeyPwd = \
299                                SessionManagerClientTestCase.test01Passphrase
300            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
301            self.clnt.signatureHandler.signingCertChain = ()
302            self.clnt.signatureHandler.signingCert = self.userX509Cert
303       
304        # Request an attribute certificate from an Attribute Authority
305        # using the userCert returned from connect()
306       
307        aaURI = self.cfg['test10GetAttCertWithUserX509Cert']['aaURI']
308        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
309         
310        print("Attribute Certificate:\n%s" % attCert) 
311           
312           
313class SessionManagerClientTestSuite(unittest.TestSuite):
314   
315    def __init__(self):
316        map = map(SessionManagerClientTestCase,
317                  (
318                    "test01Connect",
319                    "test02GetSessionStatus",
320                    "test03ConnectNoCreateServerSess",
321                    "test04DisconnectWithSessID",
322                    "test05DisconnectWithUserX509Cert",
323                    "test06GetAttCertWithSessID",
324                    "test08GetMappedAttCertWithSessID",
325                    "test09GetAttCertWithExtAttCertListWithSessID",
326                    "test10GetAttCertWithUserX509Cert",
327                  ))
328        unittest.TestSuite.__init__(self, map)
329           
330                                                   
331if __name__ == "__main__":
332    unittest.main()       
Note: See TracBrowser for help on using the repository browser.