source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/sessionmanagerclient/test_sessionmanagerclient.py @ 5779

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/sessionmanagerclient/test_sessionmanagerclient.py@5779
Revision 5779, 13.8 KB checked in by pjkersha, 11 years ago (diff)

Integrated automated start-up and shutdown of Paste http servers for unit tests.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager SOAP client interface - makes requests
3for authentication and attribute retrieval.  Test Session Manager and Attribute
4Authority services must be running for *AttCert* tests.  See README in this
5directory
6
7NERC Data Grid Project
8"""
9__author__ = "P J Kershaw"
10__date__ = "23/02/06"
11__copyright__ = "(C) 2009 Science and Technology Facilities Council"
12__license__ = "BSD - see LICENSE file in top-level directory"
13__contact__ = "Philip.Kershaw@stfc.ac.uk"
14__revision__ = '$Id$'
15import logging
16logging.basicConfig(level=logging.DEBUG)
17log = logging.getLogger(__name__)
18
19import unittest
20import os
21import sys
22import getpass
23import re
24
25from os.path import expandvars as xpdVars
26from os.path import join as jnPath
27mkPath = lambda file: jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'], file)
28
29from ndg.security.test.unit import BaseTestCase
30
31from ndg.security.common.sessionmanager import SessionManagerClient, \
32    AttributeRequestDenied
33   
34from ndg.security.common.X509 import X509CertParse, X509CertRead
35from ndg.security.common.wssecurity.signaturehandler.dom import SignatureHandler as SigHdlr
36from ndg.security.common.utils.configfileparsers import \
37    CaseSensitiveConfigParser
38
39
40class SessionManagerClientTestCase(BaseTestCase):
41    '''Unit tests for ndg.security.common.sessionmanager.SessionManagerClient
42    - SOAP Session Manager client interface
43    '''
44    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
45       
46    test01Passphrase = None
47    test03Passphrase = None
48
49    def __init__(self, *arg, **kw):
50        super(SessionManagerClientTestCase, self).__init__(*arg, **kw)
51        self.startSessionManager()
52        self.startAttributeAuthorities()
53
54    def _getCertChainFromProxyCertFile(self, certChainFilePath):
55        '''Read user cert and user cert from a single PEM file and put in
56        a list ready for input into SignatureHandler'''               
57        certChainFileTxt = open(certChainFilePath).read()
58       
59        pemPatRE = re.compile(SessionManagerClientTestCase.pemPat, re.S)
60        x509CertList = pemPatRE.findall(certChainFileTxt)
61       
62        signingCertChain = [X509CertParse(x509Cert) for x509Cert in 
63                            x509CertList]
64   
65        # Expecting user cert first - move this to the end.  This will
66        # be the cert used to verify the message signature
67        signingCertChain.reverse()
68       
69        return signingCertChain
70
71
72       
73    def setUp(self):
74        super(SessionManagerClientTestCase, self).setUp()
75
76        if 'NDGSEC_INT_DEBUG' in os.environ:
77            import pdb
78            pdb.set_trace()
79       
80        if 'NDGSEC_SMCLNT_UNITTEST_DIR' not in os.environ:
81            os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'] = \
82                os.path.abspath(os.path.dirname(__file__))
83
84        self.cfgParser = CaseSensitiveConfigParser()
85        cfgFilePath = jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'],
86                                'sessionMgrClientTest.cfg')
87        self.cfgParser.read(cfgFilePath)
88       
89        self.cfg = {}
90        for section in self.cfgParser.sections():
91            self.cfg[section] = dict(self.cfgParser.items(section))
92
93        try:
94            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
95                         self.cfg['setUp']['sslCACertFilePathList'].split()]
96        except KeyError:
97            sslCACertList = []
98           
99        # Instantiate WS proxy
100        self.clnt = SessionManagerClient(uri=self.cfg['setUp']['uri'],
101                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
102                        sslCACertList=sslCACertList,
103                        cfgFileSection='wsse',
104                        cfg=self.cfgParser) 
105               
106        self.sessID = None
107        self.userX509Cert = None
108        self.userPriKey = None
109        self.issuingCert = None
110       
111
112    def test01Connect(self):
113        """test01Connect: Connect as if acting as a browser client -
114        a session ID is returned"""
115       
116        username = self.cfg['test01Connect']['username']
117       
118        if SessionManagerClientTestCase.test01Passphrase is None:
119            SessionManagerClientTestCase.test01Passphrase = \
120                                    self.cfg['test01Connect'].get('passphrase')
121       
122        if not SessionManagerClientTestCase.test01Passphrase:
123            SessionManagerClientTestCase.test01Passphrase = getpass.getpass(\
124                prompt="\ntest01Connect pass-phrase for user %s: " % username)
125
126        self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \
127            self.clnt.connect(self.cfg['test01Connect']['username'], 
128                    passphrase=SessionManagerClientTestCase.test01Passphrase)
129
130        print("User '%s' connected to Session Manager:\n%s" % (username, 
131                                                               self.sessID))
132           
133           
134    def test02GetSessionStatus(self):
135        """test02GetSessionStatus: check a session is alive"""
136        print "\n\t" + self.test02GetSessionStatus.__doc__
137       
138        self.test01Connect()
139        assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead"
140               
141        print("User connected to Session Manager with sessID=%s" % self.sessID)
142
143        assert not self.clnt.getSessionStatus(sessID='abc'), \
144                                                "sessID=abc shouldn't exist!"
145           
146        print "CORRECT: sessID=abc doesn't exist"
147
148
149    def test03ConnectNoCreateServerSess(self):
150        """test03ConnectNoCreateServerSess: Connect without creating a session -
151        sessID should be None.  This only indicates that the username/password
152        are correct.  To be of practical use the AuthNService plugin at
153        the Session Manager needs to return X.509 credentials e.g.
154        with MyProxy plugin."""
155
156        username = self.cfg['test03ConnectNoCreateServerSess']['username']
157       
158        if SessionManagerClientTestCase.test03Passphrase is None:
159            SessionManagerClientTestCase.test03Passphrase = \
160                self.cfg['test03ConnectNoCreateServerSess'].get('passphrase')
161               
162        if not SessionManagerClientTestCase.test03Passphrase:
163            prompt="\ntest03ConnectNoCreateServerSess pass-phrase for user %s: "
164            SessionManagerClientTestCase.test03Passphrase = getpass.getpass(\
165                                                    prompt=prompt % username)
166           
167        userX509Cert, userPriKey,issuingCert, sessID = \
168            self.clnt.connect(username, 
169                      passphrase=SessionManagerClientTestCase.test03Passphrase,
170                      createServerSess=False)
171       
172        # Expect null session ID
173        assert(not sessID)
174         
175        print("Successfully authenticated")
176           
177
178    def test04DisconnectWithSessID(self):
179        """test04DisconnectWithSessID: disconnect as if acting as a browser
180        client
181        """
182       
183        print "\n\t" + self.test04DisconnectWithSessID.__doc__
184        self.test01Connect()
185       
186        self.clnt.disconnect(sessID=self.sessID)
187       
188        print("User disconnected from Session Manager:\n%s" % self.sessID)
189           
190
191    def test05DisconnectWithUserX509Cert(self):
192        """test05DisconnectWithUserX509Cert: Disconnect as a command line client
193        """
194       
195        print "\n\t" + self.test05DisconnectWithUserX509Cert.__doc__
196        self.test01Connect()
197       
198        # Use user cert / private key just obtained from connect call for
199        # signature generation
200        if self.issuingCert:
201            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
202            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
203            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
204                                                           self.userX509Cert)
205            self.clnt.signatureHandler.signingCert = None
206        else:
207            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
208            self.clnt.signatureHandler.signingPriKeyPwd = \
209                SessionManagerClientTestCase.test01Passphrase
210            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
211            self.clnt.signatureHandler.signingCertChain = ()
212            self.clnt.signatureHandler.signingCert = self.userX509Cert
213           
214        # user X.509 cert in signature determines ID of session to delete
215        self.clnt.disconnect()
216        print("User disconnected from Session Manager:\n%s"%self.userX509Cert)
217
218
219    def test06GetAttCertWithSessID(self):
220        """test06GetAttCertWithSessID: make an attribute request using
221        a session ID as authentication credential"""
222
223        print "\n\t" + self.test06GetAttCertWithSessID.__doc__
224        thisSection = self.cfg['test06GetAttCertWithSessID']     
225        self.test01Connect()
226       
227        attCert = self.clnt.getAttCert(sessID=self.sessID, 
228                                       attributeAuthorityURI=thisSection['aaURI'])
229       
230        print "Attribute Certificate:\n%s" % attCert
231        attCert.filePath = xpdVars(thisSection['acOutFilePath']) 
232        attCert.write()
233
234
235    def test07GetAttCertRefusedWithSessID(self):
236        """test07GetAttCertRefusedWithSessID: make an attribute request using
237        a sessID as authentication credential requesting an AC from an
238        Attribute Authority where the user is NOT registered"""
239
240        print "\n\t" + self.test07GetAttCertRefusedWithSessID.__doc__       
241        self.test01Connect()
242       
243        aaURI = self.cfg['test07GetAttCertRefusedWithSessID']['aaURI']
244       
245        try:
246            attCert = self.clnt.getAttCert(sessID=self.sessID, 
247                                           attributeAuthorityURI=aaURI,
248                                           mapFromTrustedHosts=False)
249        except AttributeRequestDenied, e:
250            print "SUCCESS - obtained expected result: %s" % e
251            return
252       
253        self.fail("Request allowed from AA where user is NOT registered!")
254
255
256    def test08GetMappedAttCertWithSessID(self):
257        """test08GetMappedAttCertWithSessID: make an attribute request using
258        a session ID as authentication credential"""
259
260        print "\n\t" + self.test08GetMappedAttCertWithSessID.__doc__       
261        self.test01Connect()
262       
263        aaURI = self.cfg['test08GetMappedAttCertWithSessID']['aaURI']
264       
265        attCert=self.clnt.getAttCert(sessID=self.sessID, attributeAuthorityURI=aaURI)
266       
267        print "Attribute Certificate:\n%s" % attCert 
268
269
270    def test09GetAttCertWithExtAttCertListWithSessID(self):
271        """test09GetAttCertWithExtAttCertListWithSessID: make an attribute
272        request usinga session ID as authentication credential"""
273       
274        print "\n\t"+self.test09GetAttCertWithExtAttCertListWithSessID.__doc__       
275        self.test01Connect()
276        thisSection = self.cfg['test09GetAttCertWithExtAttCertListWithSessID']
277       
278        aaURI = thisSection['aaURI']
279       
280        # Use output from test06GetAttCertWithSessID!
281        extACFilePath = xpdVars(thisSection['extACFilePath'])
282        extAttCert = open(extACFilePath).read()
283       
284        attCert = self.clnt.getAttCert(sessID=self.sessID, 
285                                       attributeAuthorityURI=aaURI,
286                                       extAttCertList=[extAttCert])
287         
288        print("Attribute Certificate:\n%s" % attCert) 
289
290
291    def test10GetAttCertWithUserX509Cert(self):
292        """test10GetAttCertWithUserX509Cert: make an attribute request using
293        a user cert as authentication credential"""
294        print "\n\t" + self.test10GetAttCertWithUserX509Cert.__doc__
295        self.test01Connect()
296
297        if self.issuingCert:
298            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
299            self.clnt.signatureHandler.signingPriKeyPwd = \
300                                SessionManagerClientTestCase.test01Passphrase
301            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
302            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
303                                                           self.userX509Cert)
304            self.clnt.signatureHandler.signingCert = None
305        else:
306            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
307            self.clnt.signatureHandler.signingPriKeyPwd = \
308                                SessionManagerClientTestCase.test01Passphrase
309            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
310            self.clnt.signatureHandler.signingCertChain = ()
311            self.clnt.signatureHandler.signingCert = self.userX509Cert
312       
313        # Request an attribute certificate from an Attribute Authority
314        # using the userX509Cert returned from connect()
315       
316        aaURI = self.cfg['test10GetAttCertWithUserX509Cert']['aaURI']
317        attCert = self.clnt.getAttCert(attributeAuthorityURI=aaURI)
318         
319        print("Attribute Certificate:\n%s" % attCert) 
320           
321           
322class SessionManagerClientTestSuite(unittest.TestSuite):
323   
324    def __init__(self):
325        map = map(SessionManagerClientTestCase,
326                  (
327                    "test01Connect",
328                    "test02GetSessionStatus",
329                    "test03ConnectNoCreateServerSess",
330                    "test04DisconnectWithSessID",
331                    "test05DisconnectWithUserX509Cert",
332                    "test06GetAttCertWithSessID",
333                    "test08GetMappedAttCertWithSessID",
334                    "test09GetAttCertWithExtAttCertListWithSessID",
335                    "test10GetAttCertWithUserX509Cert",
336                  ))
337        unittest.TestSuite.__init__(self, map)
338           
339                                                   
340if __name__ == "__main__":
341    unittest.main()       
Note: See TracBrowser for help on using the repository browser.