source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanagerclient/test_sessionmanagerclient.py @ 4407

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionmanagerclient/test_sessionmanagerclient.py@4407
Revision 4407, 13.3 KB checked in by pjkersha, 12 years ago (diff)

More fixes to session manager client

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager SOAP client interface - makes requests
3for authentication and attribute retrieval.  Test Session Manager and Attribute
4Authority services must be running for *AttCert* tests.  See README in this
5directory
6
7NERC Data Grid Project
8"""
9__author__ = "P J Kershaw"
10__date__ = "23/02/06"
11__copyright__ = "(C) 2007 STFC & NERC"
12__license__ = \
13"""This software may be distributed under the terms of the Q Public
14License, version 1.0 or later."""
15__contact__ = "Philip.Kershaw@stfc.ac.uk"
16__revision__ = '$Id$'
17
18import unittest
19import os
20import sys
21import getpass
22import re
23
24from os.path import expandvars as xpdVars
25from os.path import join as jnPath
26mkPath = lambda file: jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'], file)
27
28from ndg.security.common.sessionmanager import SessionManagerClient, \
29    AttributeRequestDenied
30   
31from ndg.security.common.X509 import X509CertParse, X509CertRead
32from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
33from ndg.security.common.utils.ConfigFileParsers import \
34    CaseSensitiveConfigParser
35
36
37class SessionManagerClientTestCase(unittest.TestCase):
38    '''Unit tests for ndg.security.common.sessionmanager.SessionManagerClient
39    - SOAP Session Manager client interface
40    '''
41    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
42       
43    test2Passphrase = None
44    test3Passphrase = None
45
46    def _getCertChainFromProxyCertFile(self, certChainFilePath):
47        '''Read user cert and user cert from a single PEM file and put in
48        a list ready for input into SignatureHandler'''               
49        certChainFileTxt = open(certChainFilePath).read()
50       
51        pemPatRE = re.compile(SessionManagerClientTestCase.pemPat, re.S)
52        x509CertList = pemPatRE.findall(certChainFileTxt)
53       
54        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
55                            x509CertList]
56   
57        # Expecting user cert first - move this to the end.  This will
58        # be the cert used to verify the message signature
59        signingCertChain.reverse()
60       
61        return signingCertChain
62
63
64       
65    def setUp(self):
66
67        if 'NDGSEC_INT_DEBUG' in os.environ:
68            import pdb
69            pdb.set_trace()
70       
71        if 'NDGSEC_SMCLNT_UNITTEST_DIR' not in os.environ:
72            os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'] = \
73                os.path.abspath(os.path.dirname(__file__))
74
75        self.cfgParser = CaseSensitiveConfigParser()
76        cfgFilePath = jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'],
77                                'sessionMgrClientTest.cfg')
78        self.cfgParser.read(cfgFilePath)
79       
80        self.cfg = {}
81        for section in self.cfgParser.sections():
82            self.cfg[section] = dict(self.cfgParser.items(section))
83
84        try:
85            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
86                         self.cfg['setUp']['sslCACertFilePathList'].split()]
87        except KeyError:
88            sslCACertList = []
89           
90        # Instantiate WS proxy
91        self.clnt = SessionManagerClient(uri=self.cfg['setUp']['uri'],
92                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
93                        sslCACertList=sslCACertList,
94                        cfgFileSection='wsse',
95                        cfg=self.cfgParser) 
96               
97        self.sessID = None
98        self.userX509Cert = None
99        self.userPriKey = None
100        self.issuingCert = None
101       
102
103    def test1Connect(self):
104        """test1Connect: Connect as if acting as a browser client -
105        a session ID is returned"""
106       
107        username = self.cfg['test1Connect']['username']
108       
109        if SessionManagerClientTestCase.test2Passphrase is None:
110            SessionManagerClientTestCase.test2Passphrase = \
111                                    self.cfg['test1Connect'].get('passphrase')
112       
113        if not SessionManagerClientTestCase.test2Passphrase:
114            SessionManagerClientTestCase.test2Passphrase = getpass.getpass(\
115                prompt="\ntest1Connect pass-phrase for user %s: " % username)
116
117        self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
118            self.clnt.connect(self.cfg['test1Connect']['username'], 
119                    passphrase=SessionManagerClientTestCase.test2Passphrase)
120
121        print("User '%s' connected to Session Manager:\n%s" % (username, 
122                                                               self.sessID))
123           
124           
125    def test2GetSessionStatus(self):
126        """test2GetSessionStatus: check a session is alive"""
127        print "\n\t" + self.test2GetSessionStatus.__doc__
128       
129        self.test1Connect()
130        assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead"
131               
132        print("User connected to Session Manager with sessID=%s" % self.sessID)
133
134        assert not self.clnt.getSessionStatus(sessID='abc'), \
135                                                "sessID=abc shouldn't exist!"
136           
137        print "CORRECT: sessID=abc doesn't exist"
138
139
140    def test3ConnectNoCreateServerSess(self):
141        """test3ConnectNoCreateServerSess: Connect without creating a session -
142        sessID should be None.  This only indicates that the username/password
143        are correct.  To be of practical use the AuthNService plugin at
144        the Session Manager needs to return X.509 credentials e.g.
145        with MyProxy plugin."""
146
147        username = self.cfg['test3ConnectNoCreateServerSess']['username']
148       
149        if SessionManagerClientTestCase.test3Passphrase is None:
150            SessionManagerClientTestCase.test3Passphrase = \
151                self.cfg['test3ConnectNoCreateServerSess'].get('passphrase')
152               
153        if not SessionManagerClientTestCase.test3Passphrase:
154            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user %s: "
155            SessionManagerClientTestCase.test3Passphrase = getpass.getpass(\
156                                                    prompt=prompt % username)
157           
158        userX509Cert, userPriKey,issuingCert, sessID = \
159            self.clnt.connect(username, 
160                      passphrase=SessionManagerClientTestCase.test3Passphrase,
161                      createServerSess=False)
162       
163        # Expect null session ID
164        assert(not sessID)
165         
166        print("Successfully authenticated")
167           
168
169    def test4DisconnectWithSessID(self):
170        """test4DisconnectWithSessID: disconnect as if acting as a browser
171        client
172        """
173       
174        print "\n\t" + self.test4DisconnectWithSessID.__doc__
175        self.test1Connect()
176       
177        self.clnt.disconnect(sessID=self.sessID)
178       
179        print("User disconnected from Session Manager:\n%s" % self.sessID)
180           
181
182    def test5DisconnectWithUserX509Cert(self):
183        """test5DisconnectWithUserX509Cert: Disconnect as a command line client
184        """
185       
186        print "\n\t" + self.test5DisconnectWithUserX509Cert.__doc__
187        self.test1Connect()
188       
189        # Use user cert / private key just obtained from connect call for
190        # signature generation
191        if self.issuingCert:
192            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
193            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
194            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
195                                                           self.userX509Cert)
196            self.clnt.signatureHandler.signingCert = None
197        else:
198            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
199            self.clnt.signatureHandler.signingPriKeyPwd = \
200                SessionManagerClientTestCase.test2Passphrase
201            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
202            self.clnt.signatureHandler.signingCertChain = ()
203            self.clnt.signatureHandler.signingCert = self.userX509Cert
204           
205        # user X.509 cert in signature determines ID of session to delete
206        self.clnt.disconnect()
207        print("User disconnected from Session Manager:\n%s"%self.userX509Cert)
208
209
210    def test6GetAttCertWithSessID(self):
211        """test6GetAttCertWithSessID: make an attribute request using
212        a session ID as authentication credential"""
213
214        print "\n\t" + self.test6GetAttCertWithSessID.__doc__
215        thisSection = self.cfg['test6GetAttCertWithSessID']     
216        self.test1Connect()
217       
218        attCert = self.clnt.getAttCert(sessID=self.sessID, 
219                                       attAuthorityURI=thisSection['aaURI'])
220       
221        print "Attribute Certificate:\n%s" % attCert
222        attCert.filePath = xpdVars(thisSection['acOutFilePath']) 
223        attCert.write()
224
225
226    def test7GetAttCertRefusedWithSessID(self):
227        """test7GetAttCertRefusedWithSessID: make an attribute request using
228        a sessID as authentication credential requesting an AC from an
229        Attribute Authority where the user is NOT registered"""
230
231        print "\n\t" + self.test7GetAttCertRefusedWithSessID.__doc__       
232        self.test1Connect()
233       
234        aaURI = self.cfg['test7GetAttCertRefusedWithSessID']['aaURI']
235       
236        try:
237            attCert = self.clnt.getAttCert(sessID=self.sessID, 
238                                           attAuthorityURI=aaURI,
239                                           mapFromTrustedHosts=False)
240        except AttributeRequestDenied, e:
241            print "SUCCESS - obtained expected result: %s" % e
242            return
243       
244        self.fail("Request allowed from AA where user is NOT registered!")
245
246
247    def test8GetMappedAttCertWithSessID(self):
248        """test8GetMappedAttCertWithSessID: make an attribute request using
249        a session ID as authentication credential"""
250
251        print "\n\t" + self.test8GetMappedAttCertWithSessID.__doc__       
252        self.test1Connect()
253       
254        aaURI = self.cfg['test8GetMappedAttCertWithSessID']['aaURI']
255       
256        attCert=self.clnt.getAttCert(sessID=self.sessID, attAuthorityURI=aaURI)
257       
258        print "Attribute Certificate:\n%s" % attCert 
259
260
261    def test9GetAttCertWithExtAttCertListWithSessID(self):
262        """test9GetAttCertWithExtAttCertListWithSessID: make an attribute
263        request usinga session ID as authentication credential"""
264       
265        print "\n\t" + self.test9GetAttCertWithExtAttCertListWithSessID.__doc__       
266        self.test1Connect()
267        thisSection = self.cfg['test9GetAttCertWithExtAttCertListWithSessID']
268       
269        aaURI = thisSection['aaURI']
270       
271        # Use output from test6GetAttCertWithSessID!
272        extACFilePath = xpdVars(thisSection['extACFilePath'])
273        extAttCert = open(extACFilePath).read()
274       
275        attCert = self.clnt.getAttCert(sessID=self.sessID, 
276                                       attAuthorityURI=aaURI,
277                                       extAttCertList=[extAttCert])
278         
279        print("Attribute Certificate:\n%s" % attCert) 
280
281
282    def test10GetAttCertWithUserX509Cert(self):
283        """test10GetAttCertWithUserX509Cert: make an attribute request using
284        a user cert as authentication credential"""
285        print "\n\t" + self.test10GetAttCertWithUserX509Cert.__doc__
286        self.test1Connect()
287
288        if self.issuingCert:
289            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
290            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
291            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
292                                                           self.userX509Cert)
293            self.clnt.signatureHandler.signingCert = None
294        else:
295            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
296            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
297            self.clnt.signatureHandler.signingCertChain = ()
298            self.clnt.signatureHandler.signingCert = self.userX509Cert
299       
300        # Request an attribute certificate from an Attribute Authority
301        # using the userCert returned from connect()
302       
303        aaURI = self.cfg['test10GetAttCertWithUserX509Cert']['aaURI']
304        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
305         
306        print("Attribute Certificate:\n%s" % attCert) 
307
308
309    def test11GetX509Cert(self):
310        "test11GetX509Cert: return the Session Manager's X.509 Cert."
311        cert = self.clnt.getX509Cert()
312                                             
313        print("Session Manager X.509 Certificate:\n" + cert)
314           
315           
316class SessionManagerClientTestSuite(unittest.TestSuite):
317   
318    def __init__(self):
319        map = map(SessionManagerClientTestCase,
320                  (
321                    "test1Connect",
322                    "test2GetSessionStatus",
323                    "test3ConnectNoCreateServerSess",
324                    "test4DisconnectWithSessID",
325                    "test5DisconnectWithUserX509Cert",
326                    "test6GetAttCertWithSessID",
327                    "test8GetMappedAttCertWithSessID",
328                    "test9GetAttCertWithExtAttCertListWithSessID",
329                    "test10GetAttCertWithUserX509Cert",
330                    "test11GetX509Cert",
331                  ))
332        unittest.TestSuite.__init__(self, map)
333           
334                                                   
335if __name__ == "__main__":
336    unittest.main()       
Note: See TracBrowser for help on using the repository browser.