source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py @ 4129

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py@4129
Revision 4129, 15.4 KB checked in by cbyrom, 11 years ago (diff)

General refactoring and updating of code, including:

Removal of refC14nKw and singnedInfoC14nKw keywords in wsssecurity session manager config
(the refC14nInclNS and signedInfoC14nInclNS keywords are sufficient);
Creation of new DOM signature handler class, dom.py, based on the wsSecurity
class;
Abstraction of common code between dom.py and etree.py into new parent
class, BaseSignatureHandler?.py.
Fixing and extending use of properties in the SignatureHandler? code.
Fixing a few bugs with the original SignatureHandler? code.
Updating of test cases to new code/code structure.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager client - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "23/02/06"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from ndg.security.common.SessionMgr import SessionMgrClient, \
22    AttributeRequestDenied
23   
24from ndg.security.common.X509 import X509CertParse, X509CertRead
25from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
26
27from os.path import expandvars as xpdVars
28from os.path import join as jnPath
29mkPath = lambda file: jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'], file)
30
31
32class SessionMgrClientTestCase(unittest.TestCase):
33    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
34       
35    test2Passphrase = None
36    test3Passphrase = None
37
38    def _getCertChainFromProxyCertFile(self, certChainFilePath):
39        '''Read user cert and user cert from a single PEM file and put in
40        a list ready for input into SignatureHandler'''               
41        certChainFileTxt = open(certChainFilePath).read()
42       
43        pemPatRE = re.compile(self.__class__.pemPat, re.S)
44        x509CertList = pemPatRE.findall(certChainFileTxt)
45       
46        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
47                            x509CertList]
48   
49        # Expecting user cert first - move this to the end.  This will
50        # be the cert used to verify the message signature
51        signingCertChain.reverse()
52       
53        return signingCertChain
54
55
56    def setUp(self):
57       
58        if 'NDGSEC_INT_DEBUG' in os.environ:
59            import pdb
60            pdb.set_trace()
61       
62        if 'NDGSEC_SMCLNT_UNITTEST_DIR' not in os.environ:
63            os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'] = \
64                os.path.abspath(os.path.dirname(__file__))
65       
66        configParser = SafeConfigParser()
67        configFilePath = jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'],
68                                "sessionMgrClientTest.cfg")
69        configParser.read(configFilePath)
70       
71        self.cfg = {}
72        for section in configParser.sections():
73            self.cfg[section] = dict(configParser.items(section))
74
75        try:
76            if self.cfg['setUp'].get('clntprikeypwd') is None:
77                clntPriKeyPwd = getpass.getpass(\
78                            prompt="\nsetUp - client private key password: ")
79            else:
80                clntPriKeyPwd = self.cfg['setUp'].get('clntprikeypwd')
81        except KeyboardInterrupt:
82            sys.exit(0)
83
84        # List of CA certificates for use in validation of certs used in
85        # signature for server reponse
86        try:
87            caCertFilePathList = [xpdVars(file) for file in \
88                            self.cfg['setUp']['cacertfilepathlist'].split()]
89        except:
90            caCertFilePathList = []
91         
92        try:
93            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
94                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
95        except KeyError:
96            sslCACertList = []
97         
98        clntCertFilePath = xpdVars(self.cfg['setUp']['clntcertfilepath'])
99        clntPriKeyFilePath = xpdVars(self.cfg['setUp']['clntprikeyfilepath'])
100       
101        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
102
103        # Set format for certificate(s) to be included in client SOAP messages
104        # to enable the Session Manager server to verify messages.
105        if reqBinSecTokValType == SigHdlr.binSecTokValType["X509PKIPathv1"]:
106            signingCertChain = \
107                        self._getCertChainFromProxyCertFile(clntCertFilePath)
108            signingCertFilePath = None
109        else:
110            signingCertChain = None
111            signingCertFilePath = clntCertFilePath
112
113        # Inclusive namespace prefixes for Exclusive C14N
114        try:
115            refC14nInclNS = self.cfg['setUp']['wssrefinclns'].split()           
116        except KeyError:
117            refC14nInclNS = []
118
119        try:
120            signedInfoC14nInclNS = self.cfg['setUp']['wsssignedinfoinclns'].split()         
121        except KeyError:
122            signedInfoC14nInclNS = []
123               
124        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
125           
126        # Initialise the Session Manager client connection
127        # Omit traceFile keyword to leave out SOAP debug info
128        self.clnt = SessionMgrClient(uri=self.cfg['setUp']['smuri'],
129                        sslCACertList=sslCACertList,
130                        sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
131                        setSignatureHandler=setSignatureHandler,
132                        reqBinSecTokValType=reqBinSecTokValType,
133                        signingCertFilePath=clntCertFilePath,
134                        signingCertChain=signingCertChain,
135                        signingPriKeyFilePath=clntPriKeyFilePath,
136                        signingPriKeyPwd=clntPriKeyPwd,
137                        caCertFilePathList=caCertFilePathList,
138                        refC14nInclNS=refC14nInclNS,
139                        signedInfoC14nInclNS=signedInfoC14nInclNS,
140                        tracefile=sys.stderr) 
141       
142        self.sessID = None
143        self.userCert = None
144        self.userPriKey = None
145        self.issuingCert = None
146       
147
148    def test1Connect(self):
149        """test1Connect: Connect as if acting as a browser client -
150        a session ID is returned"""
151       
152        username = self.cfg['test1Connect']['username']
153       
154        if self.__class__.test2Passphrase is None:
155            self.__class__.test2Passphrase = \
156                                    self.cfg['test1Connect'].get('passphrase')
157       
158        if not self.__class__.test2Passphrase:
159            self.__class__.test2Passphrase = getpass.getpass(\
160                prompt="\ntest1Connect pass-phrase for user %s: " % username)
161
162        self.userCert, self.userPriKey, self.issuingCert, self.sessID = \
163            self.clnt.connect(self.cfg['test1Connect']['username'], 
164                              passphrase=self.__class__.test2Passphrase)
165
166        print "User '%s' connected to Session Manager:\n%s" % \
167                                                        (username, self.sessID)
168           
169        creds='\n'.join((self.issuingCert or '',
170                         self.userCert,
171                         self.userPriKey))
172        open(mkPath("user.creds"), "w").write(creds)
173           
174           
175    def test2GetSessionStatus(self):
176        """test2GetSessionStatus: check a session is alive"""
177        print "\n\t" + self.test2GetSessionStatus.__doc__
178       
179        self.test1Connect()
180        assert self.clnt.getSessionStatus(sessID=self.sessID), \
181                "Session is dead"
182               
183        print "User connected to Session Manager with sessID=%s" % self.sessID
184
185        assert not self.clnt.getSessionStatus(sessID='abc'), \
186            "sessID=abc shouldn't exist!"
187           
188        print "CORRECT: sessID=abc doesn't exist"
189
190
191    def test3ConnectNoCreateServerSess(self):
192        """test3ConnectNoCreateServerSess: Connect as a non browser client -
193        sessID should be None"""
194
195        username = self.cfg['test3ConnectNoCreateServerSess']['username']
196       
197        if self.__class__.test3Passphrase is None:
198            self.__class__.test3Passphrase = \
199                self.cfg['test3ConnectNoCreateServerSess'].get('passphrase')
200               
201        if not self.__class__.test3Passphrase:
202            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user %s: "
203            self.__class__.test3Passphrase = getpass.getpass(\
204                                                    prompt=prompt % username)
205           
206        self.userCert, self.userPriKey, self.issuingCert, sessID = \
207            self.clnt.connect(username, 
208                              passphrase=self.__class__.test3Passphrase,
209                              createServerSess=False)
210       
211        # Expect null session ID
212        assert(not sessID)
213         
214        print "User '%s' retrieved creds. from Session Manager:\n%s" % \
215                                                    (username, self.userCert)
216           
217
218    def test4DisconnectWithSessID(self):
219        """test4DisconnectWithSessID: disconnect as if acting as a browser client
220        """
221       
222        print "\n\t" + self.test4DisconnectWithSessID.__doc__
223        self.test1Connect()
224       
225        self.clnt.disconnect(sessID=self.sessID)
226       
227        print "User disconnected from Session Manager:\n%s" % self.sessID
228           
229
230    def test5DisconnectWithUserCert(self):
231        """test5DisconnectWithUserCert: Disconnect as a command line client
232        """
233       
234        print "\n\t" + self.test5DisconnectWithUserCert.__doc__
235        self.test1Connect()
236       
237        # Use user cert / private key just obtained from connect call for
238        # signature generation
239        if self.issuingCert:
240            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
241            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
242            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
243                                                           self.userCert)
244            self.clnt.signatureHandler.signingCert = None
245        else:
246            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
247            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
248            self.clnt.signatureHandler.signingCertChain = ()
249            self.clnt.signatureHandler.signingCert = self.userCert
250           
251        # Proxy cert in signature determines ID of session to
252        # delete
253        self.clnt.disconnect()
254        print "User disconnected from Session Manager:\n%s" % self.userCert
255
256
257    def test6GetAttCertWithSessID(self):
258        """test6GetAttCertWithSessID: make an attribute request using
259        a session ID as authentication credential"""
260
261        print "\n\t" + self.test6GetAttCertWithSessID.__doc__       
262        self.test1Connect()
263       
264        attCert = self.clnt.getAttCert(\
265            sessID=self.sessID, 
266            attAuthorityURI=self.cfg['test6GetAttCertWithSessID']['aauri'])
267       
268        print "Attribute Certificate:\n%s" % attCert
269        attCert.filePath = \
270            xpdVars(self.cfg['test6GetAttCertWithSessID']['acoutfilepath']) 
271        attCert.write()
272
273
274    def test6aGetAttCertRefusedWithSessID(self):
275        """test6aGetAttCertRefusedWithSessID: make an attribute request using
276        a sessID as authentication credential requesting an AC from an
277        Attribute Authority where the user is NOT registered"""
278
279        print "\n\t" + self.test6aGetAttCertRefusedWithSessID.__doc__       
280        self.test1Connect()
281       
282        aaURI = self.cfg['test6aGetAttCertRefusedWithSessID']['aauri']
283       
284        try:
285            attCert = self.clnt.getAttCert(sessID=self.sessID, 
286                                           attAuthorityURI=aaURI,
287                                           mapFromTrustedHosts=False)
288        except AttributeRequestDenied, e:
289            print "SUCCESS - obtained expected result: %s" % e
290            return
291       
292        self.fail("Request allowed from AA where user is NOT registered!")
293
294
295    def test6bGetMappedAttCertWithSessID(self):
296        """test6bGetMappedAttCertWithSessID: make an attribute request using
297        a session ID as authentication credential"""
298
299        print "\n\t" + self.test6bGetMappedAttCertWithSessID.__doc__       
300        self.test1Connect()
301       
302        aaURI = self.cfg['test6bGetMappedAttCertWithSessID']['aauri']
303       
304        attCert=self.clnt.getAttCert(sessID=self.sessID,attAuthorityURI=aaURI)
305       
306        print "Attribute Certificate:\n%s" % attCert 
307
308
309    def test6cGetAttCertWithExtAttCertListWithSessID(self):
310        """test6cGetAttCertWithSessID: make an attribute request using
311        a session ID as authentication credential"""
312       
313        print "\n\t" + \
314            self.test6cGetAttCertWithExtAttCertListWithSessID.__doc__       
315        self.test1Connect()
316       
317        aaURI = \
318            self.cfg['test6cGetAttCertWithExtAttCertListWithSessID']['aauri']
319       
320        # Use output from test6GetAttCertWithSessID!
321        extACFilePath = xpdVars(\
322    self.cfg['test6cGetAttCertWithExtAttCertListWithSessID']['extacfilepath'])
323        extAttCert = open(extACFilePath).read()
324       
325        attCert = self.clnt.getAttCert(sessID=self.sessID, 
326                                       attAuthorityURI=aaURI,
327                                       extAttCertList=[extAttCert])
328         
329        print "Attribute Certificate:\n%s" % attCert 
330
331
332    def test7GetAttCertWithUserCert(self):
333        """test7GetAttCertWithUserCert: make an attribute request using
334        a user cert as authentication credential"""
335        print "\n\t" + self.test7GetAttCertWithUserCert.__doc__
336        self.test1Connect()
337
338        if self.issuingCert:
339            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
340            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
341            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
342                                                           self.userCert)
343            self.clnt.signatureHandler.signingCert = None
344        else:
345            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
346            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
347            self.clnt.signatureHandler.signingCertChain = ()
348            self.clnt.signatureHandler.signingCert = self.userCert
349       
350        # Request an attribute certificate from an Attribute Authority
351        # using the userCert returned from connect()
352       
353        aaURI = self.cfg['test7GetAttCertWithUserCert']['aauri']
354        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
355         
356        print "Attribute Certificate:\n%s" % attCert 
357
358
359    def test8GetX509Cert(self):
360        "test8GetX509Cert: return the Session Manager's X.509 Cert."
361        cert = self.clnt.getX509Cert()
362                                             
363        print "Session Manager X.509 Certificate:\n" + cert
364           
365           
366#_____________________________________________________________________________       
367class SessionMgrClientTestSuite(unittest.TestSuite):
368   
369    def __init__(self):
370        map = map(SessionMgrClientTestCase,
371                  (
372                    "test1Connect",
373                    "test2GetSessionStatus",
374                    "test3ConnectNoCreateServerSess",
375                    "test4DisconnectWithSessID",
376                    "test5DisconnectWithUserCert",
377                    "test6GetAttCertWithSessID",
378                    "test6bGetMappedAttCertWithSessID",
379                    "test6cGetAttCertWithExtAttCertListWithSessID",
380                    "test7GetAttCertWithUserCert",
381                    "test8GetX509Cert",
382                  ))
383        unittest.TestSuite.__init__(self, map)
384           
385                                                   
386if __name__ == "__main__":
387    unittest.main()       
Note: See TracBrowser for help on using the repository browser.