source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py @ 3196

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py@3196
Revision 3196, 14.8 KB checked in by pjkersha, 12 years ago (diff)

security/architecture/uml/ndg2-dews-security-beta.eap: update from EA upgrade

security/python/ndg.security.client/setup.cfg,
security/python/ndg.security.common/setup.cfg,
security/python/ndg.security.server/setup.cfg,
security/python/ndg.security.test/setup.cfg,
security/python/setup.cfg: new release tag for OMII-UK 1st drop

security/python/ndg.security.server/ndg/security/server/MyProxy.py: iimprove error message for cert file not found - incl. CA cert.

security/python/ndg.security.test/ndg/security/test/sessionMgrClient/README: addtional note about ensuring MYPROXY_SERVER env for server.py shell
security/python/ndg.security.test/ndg/security/test/sessionMgrClient/server.sh: deleted - server.py replaces it

security/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py: working version with test certs included in SVN and unit test env var refs.

security/python/ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrProperties.xml: incl. default serverCNprefix elem setting

security/python/ndg.security.test/setup.py: important fixes to ensure test data and test certs are included in package data for egg.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager client - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "23/02/06"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from ndg.security.common.SessionMgr import SessionMgrClient, \
22    AttributeRequestDenied
23   
24from ndg.security.common.X509 import X509CertParse, X509CertRead
25from ndg.security.common.wsSecurity import SignatureHandler as SigHdlr
26
27from os.path import expandvars as xpdVars
28from os.path import join as jnPath
29mkPath = lambda file: jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'], file)
30
31
32class SessionMgrClientTestCase(unittest.TestCase):
33    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
34       
35    test2Passphrase = None
36    test3Passphrase = None
37
38    def _getCertChainFromProxyCertFile(self, certChainFilePath):
39        '''Read user cert and user cert from a single PEM file and put in
40        a list ready for input into SignatureHandler'''               
41        certChainFileTxt = open(certChainFilePath).read()
42       
43        pemPatRE = re.compile(self.__class__.pemPat, re.S)
44        x509CertList = pemPatRE.findall(certChainFileTxt)
45       
46        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
47                            x509CertList]
48   
49        # Expecting user cert first - move this to the end.  This will
50        # be the cert used to verify the message signature
51        signingCertChain.reverse()
52       
53        return signingCertChain
54
55
56    def setUp(self):
57       
58        if 'NDGSEC_INT_DEBUG' in os.environ:
59            import pdb
60            pdb.set_trace()
61       
62        if 'NDGSEC_SMCLNT_UNITTEST_DIR' not in os.environ:
63            os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'] = \
64                os.path.abspath(os.path.dirname(__file__))
65       
66        configParser = SafeConfigParser()
67        configFilePath = jnPath(os.environ['NDGSEC_SMCLNT_UNITTEST_DIR'],
68                                "sessionMgrClientTest.cfg")
69        configParser.read(configFilePath)
70       
71        self.cfg = {}
72        for section in configParser.sections():
73            self.cfg[section] = dict(configParser.items(section))
74
75        try:
76            if self.cfg['setUp'].get('clntprikeypwd') is None:
77                clntPriKeyPwd = getpass.getpass(\
78                            prompt="\nsetUp - client private key password: ")
79            else:
80                clntPriKeyPwd = self.cfg['setUp'].get('clntprikeypwd')
81        except KeyboardInterrupt:
82            sys.exit(0)
83
84        # List of CA certificates for use in validation of certs used in
85        # signature for server reponse
86        try:
87            caCertFilePathList = [xpdVars(file) for file in \
88                            self.cfg['setUp']['cacertfilepathlist'].split()]
89        except:
90            caCertFilePathList = []
91         
92        try:
93            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
94                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
95        except KeyError:
96            sslCACertList = []
97         
98        clntCertFilePath = xpdVars(self.cfg['setUp']['clntcertfilepath'])
99        clntPriKeyFilePath = xpdVars(self.cfg['setUp']['clntprikeyfilepath'])
100       
101        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
102
103        # Set format for certificate(s) to be included in client SOAP messages
104        # to enable the Session Manager server to verify messages.
105        if reqBinSecTokValType == SigHdlr.binSecTokValType["X509PKIPathv1"]:
106            signingCertChain = \
107                        self._getCertChainFromProxyCertFile(clntCertFilePath)
108            signingCertFilePath = None
109        else:
110            signingCertChain = None
111            signingCertFilePath = clntCertFilePath
112               
113        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
114           
115        # Initialise the Session Manager client connection
116        # Omit traceFile keyword to leave out SOAP debug info
117        self.clnt = SessionMgrClient(uri=self.cfg['setUp']['smuri'],
118                        sslCACertList=sslCACertList,
119                        sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
120                        setSignatureHandler=setSignatureHandler,
121                        reqBinSecTokValType=reqBinSecTokValType,
122                        signingCertFilePath=clntCertFilePath,
123                        signingCertChain=signingCertChain,
124                        signingPriKeyFilePath=clntPriKeyFilePath,
125                        signingPriKeyPwd=clntPriKeyPwd,
126                        caCertFilePathList=caCertFilePathList,
127                        tracefile=sys.stderr) 
128       
129        self.sessID = None
130        self.userCert = None
131        self.userPriKey = None
132        self.issuingCert = None
133       
134
135    def test1Connect(self):
136        """test1Connect: Connect as if acting as a browser client -
137        a session ID is returned"""
138       
139        username = self.cfg['test1Connect']['username']
140       
141        if self.__class__.test2Passphrase is None:
142            self.__class__.test2Passphrase = \
143                                    self.cfg['test1Connect'].get('passphrase')
144       
145        if not self.__class__.test2Passphrase:
146            self.__class__.test2Passphrase = getpass.getpass(\
147                prompt="\ntest1Connect pass-phrase for user %s: " % username)
148
149        self.userCert, self.userPriKey, self.issuingCert, self.sessID = \
150            self.clnt.connect(self.cfg['test1Connect']['username'], 
151                              passphrase=self.__class__.test2Passphrase)
152
153        print "User '%s' connected to Session Manager:\n%s" % \
154                                                        (username, self.sessID)
155           
156        creds = self.issuingCert or '' + self.userCert + self.userPriKey
157        open(mkPath("user.creds"), "w").write(creds)
158           
159           
160    def test2GetSessionStatus(self):
161        """test2GetSessionStatus: check a session is alive"""
162        print "\n\t" + self.test2GetSessionStatus.__doc__
163       
164        self.test1Connect()
165        assert self.clnt.getSessionStatus(sessID=self.sessID), \
166                "Session is dead"
167               
168        print "User connected to Session Manager with sessID=%s" % self.sessID
169
170        assert not self.clnt.getSessionStatus(sessID='abc'), \
171            "sessID=abc shouldn't exist!"
172           
173        print "CORRECT: sessID=abc doesn't exist"
174
175
176    def test3ConnectNoCreateServerSess(self):
177        """test3ConnectNoCreateServerSess: Connect as a non browser client -
178        sessID should be None"""
179
180        username = self.cfg['test3ConnectNoCreateServerSess']['username']
181       
182        if self.__class__.test3Passphrase is None:
183            self.__class__.test3Passphrase = \
184                self.cfg['test3ConnectNoCreateServerSess'].get('passphrase')
185               
186        if not self.__class__.test3Passphrase:
187            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user %s: "
188            self.__class__.test3Passphrase = getpass.getpass(\
189                                                    prompt=prompt % username)
190           
191        self.userCert, self.userPriKey, self.issuingCert, sessID = \
192            self.clnt.connect(username, 
193                              passphrase=self.__class__.test3Passphrase,
194                              createServerSess=False)
195       
196        # Expect null session ID
197        assert(not sessID)
198         
199        print "User '%s' retrieved creds. from Session Manager:\n%s" % \
200                                                    (username, self.userCert)
201           
202
203    def test4DisconnectWithSessID(self):
204        """test4DisconnectWithSessID: disconnect as if acting as a browser client
205        """
206       
207        print "\n\t" + self.test4DisconnectWithSessID.__doc__
208        self.test1Connect()
209       
210        self.clnt.disconnect(sessID=self.sessID)
211       
212        print "User disconnected from Session Manager:\n%s" % self.sessID
213           
214
215    def test5DisconnectWithUserCert(self):
216        """test5DisconnectWithUserCert: Disconnect as a command line client
217        """
218       
219        print "\n\t" + self.test5DisconnectWithUserCert.__doc__
220        self.test1Connect()
221       
222        # Use user cert / private key just obtained from connect call for
223        # signature generation
224        if self.issuingCert:
225            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
226            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
227            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
228                                                           self.userCert)
229            self.clnt.signatureHandler.signingCert = None
230        else:
231            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
232            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
233            self.clnt.signatureHandler.signingCertChain = ()
234            self.clnt.signatureHandler.signingCert = self.userCert
235           
236        # Proxy cert in signature determines ID of session to
237        # delete
238        self.clnt.disconnect()
239        print "User disconnected from Session Manager:\n%s" % self.userCert
240
241
242    def test6GetAttCertWithSessID(self):
243        """test6GetAttCertWithSessID: make an attribute request using
244        a session ID as authentication credential"""
245
246        print "\n\t" + self.test6GetAttCertWithSessID.__doc__       
247        self.test1Connect()
248       
249        attCert = self.clnt.getAttCert(\
250            sessID=self.sessID, 
251            attAuthorityURI=self.cfg['test6GetAttCertWithSessID']['aauri'])
252       
253        print "Attribute Certificate:\n%s" % attCert
254        attCert.filePath = \
255            self.cfg['test6GetAttCertWithSessID']['acoutfilepath'] 
256        attCert.write()
257
258
259    def test6aGetAttCertRefusedWithSessID(self):
260        """test6aGetAttCertRefusedWithSessID: make an attribute request using
261        a sessID as authentication credential requesting an AC from an
262        Attribute Authority where the user is NOT registered"""
263
264        print "\n\t" + self.test6aGetAttCertRefusedWithSessID.__doc__       
265        self.test1Connect()
266       
267        aaURI = self.cfg['test6aGetAttCertRefusedWithSessID']['aauri']
268       
269        try:
270            attCert = self.clnt.getAttCert(sessID=self.sessID, 
271                                           attAuthorityURI=aaURI,
272                                           mapFromTrustedHosts=False)
273        except AttributeRequestDenied, e:
274            print "SUCCESS - obtained expected result: %s" % e
275            return
276       
277        self.fail("Request allowed from AA where user is NOT registered!")
278
279
280    def test6bGetMappedAttCertWithSessID(self):
281        """test6bGetMappedAttCertWithSessID: make an attribute request using
282        a session ID as authentication credential"""
283
284        print "\n\t" + self.test6bGetMappedAttCertWithSessID.__doc__       
285        self.test1Connect()
286       
287        aaURI = self.cfg['test6bGetMappedAttCertWithSessID']['aauri']
288       
289        attCert=self.clnt.getAttCert(sessID=self.sessID,attAuthorityURI=aaURI)
290       
291        print "Attribute Certificate:\n%s" % attCert 
292
293
294    def test6cGetAttCertWithExtAttCertListWithSessID(self):
295        """test6cGetAttCertWithSessID: make an attribute request using
296        a session ID as authentication credential"""
297       
298        print "\n\t" + \
299            self.test6cGetAttCertWithExtAttCertListWithSessID.__doc__       
300        self.test1Connect()
301       
302        aaURI = \
303            self.cfg['test6cGetAttCertWithExtAttCertListWithSessID']['aauri']
304       
305        # Use output from test6GetAttCertWithSessID!
306        extACFilePath = \
307    self.cfg['test6cGetAttCertWithExtAttCertListWithSessID']['extacfilepath']   
308        extAttCert = open(extACFilePath).read()
309       
310        attCert = self.clnt.getAttCert(sessID=self.sessID, 
311                                       attAuthorityURI=aaURI,
312                                       extAttCertList=[extAttCert])
313         
314        print "Attribute Certificate:\n%s" % attCert 
315
316
317    def test7GetAttCertWithUserCert(self):
318        """test7GetAttCertWithUserCert: make an attribute request using
319        a user cert as authentication credential"""
320        print "\n\t" + self.test7GetAttCertWithUserCert.__doc__
321        self.test1Connect()
322
323        if self.issuingCert:
324            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
325            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
326            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
327                                                           self.userCert)
328            self.clnt.signatureHandler.signingCert = None
329        else:
330            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
331            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
332            self.clnt.signatureHandler.signingCertChain = ()
333            self.clnt.signatureHandler.signingCert = self.userCert
334       
335        # Request an attribute certificate from an Attribute Authority
336        # using the userCert returned from connect()
337       
338        aaURI = self.cfg['test7GetAttCertWithUserCert']['aauri']
339        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
340         
341        print "Attribute Certificate:\n%s" % attCert 
342
343
344    def test8GetX509Cert(self):
345        "test8GetX509Cert: return the Session Manager's X.509 Cert."
346        cert = self.clnt.getX509Cert()
347                                             
348        print "Session Manager X.509 Certificate:\n" + cert
349           
350           
351#_____________________________________________________________________________       
352class SessionMgrClientTestSuite(unittest.TestSuite):
353   
354    def __init__(self):
355        map = map(SessionMgrClientTestCase,
356                  (
357                    "test1Connect",
358                    "test2GetSessionStatus",
359                    "test3ConnectNoCreateServerSess",
360                    "test4DisconnectWithSessID",
361                    "test5DisconnectWithUserCert",
362                    "test6GetAttCertWithSessID",
363                    "test6bGetMappedAttCertWithSessID",
364                    "test6cGetAttCertWithExtAttCertListWithSessID",
365                    "test7GetAttCertWithUserCert",
366                    "test8GetX509Cert",
367                  ))
368        unittest.TestSuite.__init__(self, map)
369           
370                                                   
371if __name__ == "__main__":
372    unittest.main()       
Note: See TracBrowser for help on using the repository browser.