source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py @ 4120

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py@4120
Revision 4120, 15.5 KB checked in by pjkersha, 11 years ago (diff)

Fix for ndg.security.test.sessionMgr.test.SessionMgrTestCase?.test1Connect: use asPEM method to convert self.userCert to a string for output to file.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager client - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "23/02/06"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from ndg.security.common.SessionMgr import SessionMgrClient, \
22    AttributeRequestDenied
23   
24from ndg.security.common.X509 import X509CertParse, X509CertRead
25from ndg.security.common.wsSecurity import SignatureHandler as SigHdlr
26
27from os.path import expandvars as xpdVars
28from os.path import join as jnPath
29mkPath = lambda file: jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'], file)
30
31
32class SessionMgrClientTestCase(unittest.TestCase):
33    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
34       
35    test2Passphrase = None
36    test3Passphrase = None
37
38    def _getCertChainFromProxyCertFile(self, certChainFilePath):
39        '''Read user cert and user cert from a single PEM file and put in
40        a list ready for input into SignatureHandler'''               
41        certChainFileTxt = open(certChainFilePath).read()
42       
43        pemPatRE = re.compile(self.__class__.pemPat, re.S)
44        x509CertList = pemPatRE.findall(certChainFileTxt)
45       
46        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
47                            x509CertList]
48   
49        # Expecting user cert first - move this to the end.  This will
50        # be the cert used to verify the message signature
51        signingCertChain.reverse()
52       
53        return signingCertChain
54
55
56    def setUp(self):
57       
58        if 'NDGSEC_INT_DEBUG' in os.environ:
59            import pdb
60            pdb.set_trace()
61       
62        if 'NDGSEC_SMCLNT_UNITTEST_DIR' not in os.environ:
63            os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'] = \
64                os.path.abspath(os.path.dirname(__file__))
65       
66        configParser = SafeConfigParser()
67        configFilePath = jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'],
68                                "sessionMgrClientTest.cfg")
69        configParser.read(configFilePath)
70       
71        self.cfg = {}
72        for section in configParser.sections():
73            self.cfg[section] = dict(configParser.items(section))
74
75        try:
76            if self.cfg['setUp'].get('clntprikeypwd') is None:
77                clntPriKeyPwd = getpass.getpass(\
78                            prompt="\nsetUp - client private key password: ")
79            else:
80                clntPriKeyPwd = self.cfg['setUp'].get('clntprikeypwd')
81        except KeyboardInterrupt:
82            sys.exit(0)
83
84        # List of CA certificates for use in validation of certs used in
85        # signature for server reponse
86        try:
87            caCertFilePathList = [xpdVars(file) for file in \
88                            self.cfg['setUp']['cacertfilepathlist'].split()]
89        except:
90            caCertFilePathList = []
91         
92        try:
93            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
94                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
95        except KeyError:
96            sslCACertList = []
97         
98        clntCertFilePath = xpdVars(self.cfg['setUp']['clntcertfilepath'])
99        clntPriKeyFilePath = xpdVars(self.cfg['setUp']['clntprikeyfilepath'])
100       
101        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
102
103        # Set format for certificate(s) to be included in client SOAP messages
104        # to enable the Session Manager server to verify messages.
105        if reqBinSecTokValType == SigHdlr.binSecTokValType["X509PKIPathv1"]:
106            signingCertChain = \
107                        self._getCertChainFromProxyCertFile(clntCertFilePath)
108            signingCertFilePath = None
109        else:
110            signingCertChain = None
111            signingCertFilePath = clntCertFilePath
112
113        # Inclusive namespace prefixes for Exclusive C14N
114        try:
115            refC14nKw = {'unsuppressedPrefixes':
116                         self.cfg['setUp']['wssrefinclns'].split()}           
117        except KeyError:
118            refC14nKw = {'unsuppressedPrefixes':[]}
119
120        try:
121            signedInfoC14nKw = {'unsuppressedPrefixes':
122                            self.cfg['setUp']['wsssignedinfoinclns'].split()}         
123        except KeyError:
124            signedInfoC14nKw = {'unsuppressedPrefixes':[]}
125               
126        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
127           
128        # Initialise the Session Manager client connection
129        # Omit traceFile keyword to leave out SOAP debug info
130        self.clnt = SessionMgrClient(uri=self.cfg['setUp']['smuri'],
131                        sslCACertList=sslCACertList,
132                        sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
133                        setSignatureHandler=setSignatureHandler,
134                        reqBinSecTokValType=reqBinSecTokValType,
135                        signingCertFilePath=clntCertFilePath,
136                        signingCertChain=signingCertChain,
137                        signingPriKeyFilePath=clntPriKeyFilePath,
138                        signingPriKeyPwd=clntPriKeyPwd,
139                        caCertFilePathList=caCertFilePathList,
140                        refC14nKw=refC14nKw,
141                        signedInfoC14nKw=signedInfoC14nKw,
142                        tracefile=sys.stderr) 
143       
144        self.sessID = None
145        self.userCert = None
146        self.userPriKey = None
147        self.issuingCert = None
148       
149
150    def test1Connect(self):
151        """test1Connect: Connect as if acting as a browser client -
152        a session ID is returned"""
153       
154        username = self.cfg['test1Connect']['username']
155       
156        if self.__class__.test2Passphrase is None:
157            self.__class__.test2Passphrase = \
158                                    self.cfg['test1Connect'].get('passphrase')
159       
160        if not self.__class__.test2Passphrase:
161            self.__class__.test2Passphrase = getpass.getpass(\
162                prompt="\ntest1Connect pass-phrase for user %s: " % username)
163
164        self.userCert, self.userPriKey, self.issuingCert, self.sessID = \
165            self.clnt.connect(self.cfg['test1Connect']['username'], 
166                              passphrase=self.__class__.test2Passphrase)
167
168        print "User '%s' connected to Session Manager:\n%s" % \
169                                                        (username, self.sessID)
170           
171        creds='\n'.join((self.issuingCert or '',
172                         self.userCert,
173                         self.userPriKey))
174        open(mkPath("user.creds"), "w").write(creds)
175           
176           
177    def test2GetSessionStatus(self):
178        """test2GetSessionStatus: check a session is alive"""
179        print "\n\t" + self.test2GetSessionStatus.__doc__
180       
181        self.test1Connect()
182        assert self.clnt.getSessionStatus(sessID=self.sessID), \
183                "Session is dead"
184               
185        print "User connected to Session Manager with sessID=%s" % self.sessID
186
187        assert not self.clnt.getSessionStatus(sessID='abc'), \
188            "sessID=abc shouldn't exist!"
189           
190        print "CORRECT: sessID=abc doesn't exist"
191
192
193    def test3ConnectNoCreateServerSess(self):
194        """test3ConnectNoCreateServerSess: Connect as a non browser client -
195        sessID should be None"""
196
197        username = self.cfg['test3ConnectNoCreateServerSess']['username']
198       
199        if self.__class__.test3Passphrase is None:
200            self.__class__.test3Passphrase = \
201                self.cfg['test3ConnectNoCreateServerSess'].get('passphrase')
202               
203        if not self.__class__.test3Passphrase:
204            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user %s: "
205            self.__class__.test3Passphrase = getpass.getpass(\
206                                                    prompt=prompt % username)
207           
208        self.userCert, self.userPriKey, self.issuingCert, sessID = \
209            self.clnt.connect(username, 
210                              passphrase=self.__class__.test3Passphrase,
211                              createServerSess=False)
212       
213        # Expect null session ID
214        assert(not sessID)
215         
216        print "User '%s' retrieved creds. from Session Manager:\n%s" % \
217                                                    (username, self.userCert)
218           
219
220    def test4DisconnectWithSessID(self):
221        """test4DisconnectWithSessID: disconnect as if acting as a browser client
222        """
223       
224        print "\n\t" + self.test4DisconnectWithSessID.__doc__
225        self.test1Connect()
226       
227        self.clnt.disconnect(sessID=self.sessID)
228       
229        print "User disconnected from Session Manager:\n%s" % self.sessID
230           
231
232    def test5DisconnectWithUserCert(self):
233        """test5DisconnectWithUserCert: Disconnect as a command line client
234        """
235       
236        print "\n\t" + self.test5DisconnectWithUserCert.__doc__
237        self.test1Connect()
238       
239        # Use user cert / private key just obtained from connect call for
240        # signature generation
241        if self.issuingCert:
242            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
243            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
244            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
245                                                           self.userCert)
246            self.clnt.signatureHandler.signingCert = None
247        else:
248            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
249            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
250            self.clnt.signatureHandler.signingCertChain = ()
251            self.clnt.signatureHandler.signingCert = self.userCert
252           
253        # Proxy cert in signature determines ID of session to
254        # delete
255        self.clnt.disconnect()
256        print "User disconnected from Session Manager:\n%s" % self.userCert
257
258
259    def test6GetAttCertWithSessID(self):
260        """test6GetAttCertWithSessID: make an attribute request using
261        a session ID as authentication credential"""
262
263        print "\n\t" + self.test6GetAttCertWithSessID.__doc__       
264        self.test1Connect()
265       
266        attCert = self.clnt.getAttCert(\
267            sessID=self.sessID, 
268            attAuthorityURI=self.cfg['test6GetAttCertWithSessID']['aauri'])
269       
270        print "Attribute Certificate:\n%s" % attCert
271        attCert.filePath = \
272            xpdVars(self.cfg['test6GetAttCertWithSessID']['acoutfilepath']) 
273        attCert.write()
274
275
276    def test6aGetAttCertRefusedWithSessID(self):
277        """test6aGetAttCertRefusedWithSessID: make an attribute request using
278        a sessID as authentication credential requesting an AC from an
279        Attribute Authority where the user is NOT registered"""
280
281        print "\n\t" + self.test6aGetAttCertRefusedWithSessID.__doc__       
282        self.test1Connect()
283       
284        aaURI = self.cfg['test6aGetAttCertRefusedWithSessID']['aauri']
285       
286        try:
287            attCert = self.clnt.getAttCert(sessID=self.sessID, 
288                                           attAuthorityURI=aaURI,
289                                           mapFromTrustedHosts=False)
290        except AttributeRequestDenied, e:
291            print "SUCCESS - obtained expected result: %s" % e
292            return
293       
294        self.fail("Request allowed from AA where user is NOT registered!")
295
296
297    def test6bGetMappedAttCertWithSessID(self):
298        """test6bGetMappedAttCertWithSessID: make an attribute request using
299        a session ID as authentication credential"""
300
301        print "\n\t" + self.test6bGetMappedAttCertWithSessID.__doc__       
302        self.test1Connect()
303       
304        aaURI = self.cfg['test6bGetMappedAttCertWithSessID']['aauri']
305       
306        attCert=self.clnt.getAttCert(sessID=self.sessID,attAuthorityURI=aaURI)
307       
308        print "Attribute Certificate:\n%s" % attCert 
309
310
311    def test6cGetAttCertWithExtAttCertListWithSessID(self):
312        """test6cGetAttCertWithSessID: make an attribute request using
313        a session ID as authentication credential"""
314       
315        print "\n\t" + \
316            self.test6cGetAttCertWithExtAttCertListWithSessID.__doc__       
317        self.test1Connect()
318       
319        aaURI = \
320            self.cfg['test6cGetAttCertWithExtAttCertListWithSessID']['aauri']
321       
322        # Use output from test6GetAttCertWithSessID!
323        extACFilePath = xpdVars(\
324    self.cfg['test6cGetAttCertWithExtAttCertListWithSessID']['extacfilepath'])
325        extAttCert = open(extACFilePath).read()
326       
327        attCert = self.clnt.getAttCert(sessID=self.sessID, 
328                                       attAuthorityURI=aaURI,
329                                       extAttCertList=[extAttCert])
330         
331        print "Attribute Certificate:\n%s" % attCert 
332
333
334    def test7GetAttCertWithUserCert(self):
335        """test7GetAttCertWithUserCert: make an attribute request using
336        a user cert as authentication credential"""
337        print "\n\t" + self.test7GetAttCertWithUserCert.__doc__
338        self.test1Connect()
339
340        if self.issuingCert:
341            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
342            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
343            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
344                                                           self.userCert)
345            self.clnt.signatureHandler.signingCert = None
346        else:
347            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
348            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
349            self.clnt.signatureHandler.signingCertChain = ()
350            self.clnt.signatureHandler.signingCert = self.userCert
351       
352        # Request an attribute certificate from an Attribute Authority
353        # using the userCert returned from connect()
354       
355        aaURI = self.cfg['test7GetAttCertWithUserCert']['aauri']
356        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
357         
358        print "Attribute Certificate:\n%s" % attCert 
359
360
361    def test8GetX509Cert(self):
362        "test8GetX509Cert: return the Session Manager's X.509 Cert."
363        cert = self.clnt.getX509Cert()
364                                             
365        print "Session Manager X.509 Certificate:\n" + cert
366           
367           
368#_____________________________________________________________________________       
369class SessionMgrClientTestSuite(unittest.TestSuite):
370   
371    def __init__(self):
372        map = map(SessionMgrClientTestCase,
373                  (
374                    "test1Connect",
375                    "test2GetSessionStatus",
376                    "test3ConnectNoCreateServerSess",
377                    "test4DisconnectWithSessID",
378                    "test5DisconnectWithUserCert",
379                    "test6GetAttCertWithSessID",
380                    "test6bGetMappedAttCertWithSessID",
381                    "test6cGetAttCertWithExtAttCertListWithSessID",
382                    "test7GetAttCertWithUserCert",
383                    "test8GetX509Cert",
384                  ))
385        unittest.TestSuite.__init__(self, map)
386           
387                                                   
388if __name__ == "__main__":
389    unittest.main()       
Note: See TracBrowser for help on using the repository browser.