source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py @ 3145

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py@3145
Revision 3145, 13.5 KB checked in by pjkersha, 12 years ago (diff)

python/www/html/sessionMgr.wsdl,
python/ndg.security.common/ndg/security/common/SessionMgr/SessionMgr_services.py,
python/ndg.security.common/ndg/security/common/SessionMgr/SessionMgr_services_types.py,
python/ndg.security.server/ndg/security/server/SessionMgr/SessionMgr_services_server.py:

  • remove refs to proxy certs - using MyProxy? as CA proxy certs aren't generated.
  • make issuingCert nillable as it won't be set if calling MyProxy? in Simple CA mode

python/ndg.security.server/ndg/security/server/conf/sessionMgr.tac: removes refs to proxy cert - replace with user cert

python/ndg.security.server/ndg/security/server/conf/sessionMgrProperties.xml: fix MyProxy? cert times - these are in seconds NOT hours

python/ndg.security.server/ndg/security/server/MyProxy.py: remove '\0's from get and info commands

python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py: fixed for tests with new MyProxy? config as SimpleCA

python/ndg.security.test/ndg/security/test/sessionMgrClient/server.sh: get rid of --pidfile arg to twistd - not needed.

python/ndg.security.test/ndg/security/test/sessionMgrClient/sm-clnt.crt,
python/ndg.security.test/ndg/security/test/sessionMgrClient/sm-clnt.key,

python/ndg.security.test/ndg/security/test/sessionMgr/sessionMgrProperties.xml,
python/ndg.security.test/ndg/security/test/sessionMgr/sessionMgrTest.cfg,
python/ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrProperties.xml,
python/ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrClientTest.cfg: altered for tests with multiple CAs

python/ndg.security.common/ndg/security/common/SessionMgr/init.py:

  • removed addUser method - not needed
  • switched refs to proxy cert -> user cert
  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager client - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "23/02/06"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from ndg.security.common.SessionMgr import SessionMgrClient, \
22    AttributeRequestDenied
23   
24from ndg.security.common.SessionCookie import SessionCookie
25from ndg.security.common.X509 import X509CertParse, X509CertRead
26
27
28class SessionMgrClientTestCase(unittest.TestCase):
29    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
30       
31    test2Passphrase = None
32    test3Passphrase = None
33
34    def _getCertChainFromProxyCertFile(self, certChainFilePath):
35        '''Read user cert and user cert from a single PEM file and put in
36        a list ready for input into SignatureHandler'''               
37        certChainFileTxt = open(certChainFilePath).read()
38       
39        pemPatRE = re.compile(self.__class__.pemPat, re.S)
40        x509CertList = pemPatRE.findall(certChainFileTxt)
41       
42        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
43                            x509CertList]
44   
45        # Expecting user cert first - move this to the end.  This will
46        # be the cert used to verify the message signature
47        signingCertChain.reverse()
48       
49        return signingCertChain
50
51
52    def setUp(self):
53       
54        configParser = SafeConfigParser()
55        configParser.read("./sessionMgrClientTest.cfg")
56       
57        self.cfg = {}
58        for section in configParser.sections():
59            self.cfg[section] = dict(configParser.items(section))
60
61        tracefile = sys.stderr
62
63        try:
64            if self.cfg['setUp'].get('clntprikeypwd') is None:
65                clntPriKeyPwd = getpass.getpass(\
66                            prompt="\nsetUp - client private key password: ")
67            else:
68                clntPriKeyPwd = self.cfg['setUp'].get('clntprikeypwd')
69        except KeyboardInterrupt:
70            sys.exit(0)
71
72        # List of CA certificates for use in validation of certs used in
73        # signature for server reponse
74        try:
75            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
76        except:
77            caCertFilePathList = []
78         
79        try:
80            sslCACertList = [X509CertRead(file) for file in \
81                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
82        except KeyError:
83            sslCACertList = []
84         
85         
86        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
87
88        # Check certificate types user or standard
89        userCertFilePath = self.cfg['setUp'].get('usercertfilepath')
90        if userCertFilePath:
91            signingCertChain = \
92                        self._getCertChainFromProxyCertFile(userCertFilePath)
93        else:
94            signingCertChain = None
95               
96        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
97           
98        # Initialise the Session Manager client connection
99        # Omit traceFile keyword to leave out SOAP debug info
100        self.clnt = SessionMgrClient(uri=self.cfg['setUp']['smuri'],
101                sslCACertList=sslCACertList,
102                sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
103                setSignatureHandler=setSignatureHandler,
104                reqBinSecTokValType=reqBinSecTokValType,
105                signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
106                signingCertChain=signingCertChain,
107                signingPriKeyFilePath=self.cfg['setUp']['clntprikeyfilepath'],
108                signingPriKeyPwd=clntPriKeyPwd,
109                caCertFilePathList=caCertFilePathList,
110                tracefile=tracefile) 
111       
112        self.sessID = None
113        self.userCert = None
114        self.userPriKey = None
115        self.issuingCert = None
116       
117
118    def test1Connect(self):
119        """test1Connect: Connect as if acting as a browser client -
120        a session ID is returned"""
121       
122        if self.__class__.test2Passphrase is None:
123            self.__class__.test2Passphrase = \
124                                    self.cfg['test1Connect'].get('passphrase')
125       
126        if not self.__class__.test2Passphrase:
127            self.__class__.test2Passphrase = getpass.getpass(\
128                               prompt="\ntest1Connect pass-phrase for user: ")
129
130        self.userCert, self.userPriKey, self.issuingCert, self.sessID = \
131            self.clnt.connect(self.cfg['test1Connect']['username'], 
132                              passphrase=self.__class__.test2Passphrase)
133
134        print "User '%s' connected to Session Manager:\n%s" % \
135            (self.cfg['test1Connect']['username'], self.sessID)
136           
137           
138    def test2GetSessionStatus(self):
139        """test2GetSessionStatus: check a session is alive"""
140        print "\n\t" + self.test2GetSessionStatus.__doc__
141       
142        self.test1Connect()
143        assert self.clnt.getSessionStatus(sessID=self.sessID), \
144                "Session is dead"
145               
146        print "User connected to Session Manager with sessID=%s" % self.sessID
147
148        assert not self.clnt.getSessionStatus(sessID='abc'), \
149            "sessID=abc shouldn't exist!"
150           
151        print "CORRECT: sessID=abc doesn't exist"
152
153
154    def test3ConnectNoCreateServerSess(self):
155        """test3ConnectNoCreateServerSess: Connect as a non browser client -
156        sessID should be None"""
157
158        if self.__class__.test3Passphrase is None:
159            self.__class__.test3Passphrase = \
160                self.cfg['test3ConnectNoCreateServerSess'].get('passphrase')
161               
162        if not self.__class__.test3Passphrase:
163            self.__class__.test3Passphrase = getpass.getpass(\
164            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user: ")
165
166        self.userCert, self.userPriKey, self.issuingCert, sessID = \
167            self.clnt.connect(\
168                      self.cfg['test3ConnectNoCreateServerSess']['username'], 
169                      passphrase=self.__class__.test3Passphrase,
170                      createServerSess=False)
171       
172        # Expect null session ID
173        assert(not sessID)
174         
175        print "User '%s' connected to Session Manager:\n%s" % \
176                    (self.cfg['test3ConnectNoCreateServerSess']['username'], 
177                     self.userCert)
178           
179
180    def test4DisconnectWithSessID(self):
181        """test4DisconnectWithSessID: disconnect as if acting as a browser client
182        """
183       
184        print "\n\t" + self.test4DisconnectWithSessID.__doc__
185        self.test1Connect()
186       
187        self.clnt.disconnect(sessID=self.sessID)
188       
189        print "User disconnected from Session Manager:\n%s" % self.sessID
190           
191
192    def test5DisconnectWithUserCert(self):
193        """test5DisconnectWithUserCert: Disconnect as a command line client
194        """
195       
196        print "\n\t" + self.test5DisconnectWithUserCert.__doc__
197        self.test1Connect()
198       
199        # Use user cert / private key just obtained from connect call for
200        # signature generation
201        if self.issuingCert:
202            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
203            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
204            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
205                                                           self.userCert)
206            self.clnt.signatureHandler.signingCert = None
207        else:
208            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
209            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
210            self.clnt.signatureHandler.signingCertChain = ()
211            self.clnt.signatureHandler.signingCert = self.userCert
212           
213        # Proxy cert in signature determines ID of session to
214        # delete
215        self.clnt.disconnect()
216        print "User disconnected from Session Manager:\n%s" % self.userCert
217
218
219    def test6GetAttCertWithSessID(self):
220        """test6GetAttCertWithSessID: make an attribute request using
221        a session ID as authentication credential"""
222
223        print "\n\t" + self.test6GetAttCertWithSessID.__doc__       
224        self.test1Connect()
225       
226        attCert = self.clnt.getAttCert(\
227            sessID=self.sessID, 
228            attAuthorityURI=self.cfg['test6GetAttCertWithSessID']['aauri'])
229       
230        print "Attribute Certificate:\n%s" % attCert
231        attCert.filePath = \
232            self.cfg['test6GetAttCertWithSessID']['acoutfilepath'] 
233        attCert.write()
234
235
236    def test6aGetAttCertRefusedWithSessID(self):
237        """test6aGetAttCertRefusedWithSessID: make an attribute request using
238        a sessID as authentication credential requesting an AC from an
239        Attribute Authority where the user is NOT registered"""
240
241        print "\n\t" + self.test6aGetAttCertRefusedWithSessID.__doc__       
242        self.test1Connect()
243       
244        aaURI = self.cfg['test6aGetAttCertRefusedWithSessID']['aauri']
245       
246        try:
247            attCert = self.clnt.getAttCert(sessID=self.sessID, 
248                                           attAuthorityURI=aaURI,
249                                           mapFromTrustedHosts=False)
250        except AttributeRequestDenied, e:
251            print "SUCCESS - obtained expected result: %s" % e
252            return
253       
254        self.fail("Request allowed from AA where user is NOT registered!")
255
256
257    def test6bGetMappedAttCertWithSessID(self):
258        """test6bGetMappedAttCertWithSessID: make an attribute request using
259        a session ID as authentication credential"""
260
261        print "\n\t" + self.test6bGetMappedAttCertWithSessID.__doc__       
262        self.test1Connect()
263       
264        aaURI = self.cfg['test6bGetMappedAttCertWithSessID']['aauri']
265       
266        attCert=self.clnt.getAttCert(sessID=self.sessID,attAuthorityURI=aaURI)
267       
268        print "Attribute Certificate:\n%s" % attCert 
269
270
271    def test6cGetAttCertWithExtAttCertListWithSessID(self):
272        """test6cGetAttCertWithSessID: make an attribute request using
273        a session ID as authentication credential"""
274       
275        print "\n\t" + \
276            self.test6cGetAttCertWithExtAttCertListWithSessID.__doc__       
277        self.test1Connect()
278       
279        aaURI = \
280            self.cfg['test6cGetAttCertWithExtAttCertListWithSessID']['aauri']
281       
282        # Use output from test6GetAttCertWithSessID!
283        extACFilePath = \
284    self.cfg['test6cGetAttCertWithExtAttCertListWithSessID']['extacfilepath']   
285        extAttCert = open(extACFilePath).read()
286       
287        attCert = self.clnt.getAttCert(sessID=self.sessID, 
288                                       attAuthorityURI=aaURI,
289                                       extAttCertList=[extAttCert])
290         
291        print "Attribute Certificate:\n%s" % attCert 
292
293
294    def test7GetAttCertWithUserCert(self):
295        """test7GetAttCertWithUserCert: make an attribute request using
296        a user cert as authentication credential"""
297        print "\n\t" + self.test7GetAttCertWithUserCert.__doc__
298        self.test1Connect()
299
300        if self.issuingCert:
301            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
302            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
303            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
304                                                           self.userCert)
305            self.clnt.signatureHandler.signingCert = None
306        else:
307            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
308            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
309            self.clnt.signatureHandler.signingCertChain = ()
310            self.clnt.signatureHandler.signingCert = self.userCert
311       
312        # Request an attribute certificate from an Attribute Authority
313        # using the userCert returned from connect()
314       
315        aaURI = self.cfg['test7GetAttCertWithUserCert']['aauri']
316        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
317         
318        print "Attribute Certificate:\n%s" % attCert 
319
320
321    def test8GetX509Cert(self):
322        "test8GetX509Cert: return the Session Manager's X.509 Cert."
323        cert = self.clnt.getX509Cert()
324                                             
325        print "Session Manager X.509 Certificate:\n" + cert
326           
327           
328#_____________________________________________________________________________       
329class SessionMgrClientTestSuite(unittest.TestSuite):
330   
331    def __init__(self):
332        map = map(SessionMgrClientTestCase,
333                  (
334                    "test1Connect",
335                    "test2GetSessionStatus",
336                    "test3ConnectNoCreateServerSess",
337                    "test4DisconnectWithSessID",
338                    "test5DisconnectWithUserCert",
339                    "test6GetAttCertWithSessID",
340                    "test6bGetMappedAttCertWithSessID",
341                    "test6cGetAttCertWithExtAttCertListWithSessID",
342                    "test7GetAttCertWithUserCert",
343                    "test8GetX509Cert",
344                  ))
345        unittest.TestSuite.__init__(self, map)
346           
347                                                   
348if __name__ == "__main__":
349    unittest.main()       
Note: See TracBrowser for help on using the repository browser.