source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py @ 3044

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py@3044
Revision 3044, 13.5 KB checked in by pjkersha, 12 years ago (diff)

SessionMgr? SessionMgrClient? unit tests complete with tests for getSessionStatus included

ndg.security.server/ndg/security/server/conf/sessionMgr.tac:

  • code for getSessionStatus incorporated

ndg.security.server/ndg/security/server/SessionMgr/SessionMgr_services_server.py,
ndg.security.common/ndg/security/common/SessionMgr/SessionMgr_services.py,
ndg.security.common/ndg/security/common/SessionMgr/SessionMgr_services_types.py,
www/html/sessionMgr.wsdl: fixed getSessionStatusResponse - isAlive element needs to be nested within a sequence elem.

ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py,

ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrProperties.xml: default to https for tests

ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrClientTest.cfg: get rid of test1AddUser and added test2GetSessionStatus

ndg.security.test/ndg/security/test/sessionMgr/test.py: SessionMgr? unit tests all working

ndg.security.common/ndg/security/common/SessionMgr/init.py: added getSessionStatus method

ndg.security.common/ndg/security/common/AttAuthority/init.py: fix to getHostInfo - return dict indexed by hostname

ndg.security.common/ndg/security/common/AttAuthority/AttAuthority_services.py: re-ran code generation from WSDL

Makefile: added targets for building ZSI code stubs from AA and SM WSDLs.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager client - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "23/02/06"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from ndg.security.common.SessionMgr import SessionMgrClient, \
22    AttributeRequestDenied
23   
24from ndg.security.common.SessionCookie import SessionCookie
25from ndg.security.common.X509 import X509CertParse, X509CertRead
26
27
28class SessionMgrClientTestCase(unittest.TestCase):
29    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
30       
31    test2Passphrase = None
32    test3Passphrase = None
33
34    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
35        '''Read proxy cert and user cert from a single PEM file and put in
36        a list ready for input into SignatureHandler'''               
37        proxyCertFileTxt = open(proxyCertFilePath).read()
38       
39        pemPatRE = re.compile(self.__class__.pemPat, re.S)
40        x509CertList = pemPatRE.findall(proxyCertFileTxt)
41       
42        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
43                            x509CertList]
44   
45        # Expecting proxy cert first - move this to the end.  This will
46        # be the cert used to verify the message signature
47        signingCertChain.reverse()
48       
49        return signingCertChain
50
51
52    def setUp(self):
53       
54        configParser = SafeConfigParser()
55        configParser.read("./sessionMgrClientTest.cfg")
56       
57        self.cfg = {}
58        for section in configParser.sections():
59            self.cfg[section] = dict(configParser.items(section))
60
61        tracefile = sys.stderr
62
63        try:
64            if self.cfg['setUp'].get('clntprikeypwd') is None:
65                clntPriKeyPwd = getpass.getpass(\
66                            prompt="\nsetUp - client private key password: ")
67            else:
68                clntPriKeyPwd = self.cfg['setUp'].get('clntprikeypwd')
69        except KeyboardInterrupt:
70            sys.exit(0)
71
72        # List of CA certificates for use in validation of certs used in
73        # signature for server reponse
74        try:
75            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
76        except:
77            caCertFilePathList = []
78         
79        try:
80            sslCACertList = [X509CertRead(file) for file in \
81                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
82        except KeyError:
83            sslCACertList = []
84         
85         
86        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
87
88        # Check certificate types proxy or standard
89        proxyCertFilePath = self.cfg['setUp'].get('proxycertfilepath')
90        if proxyCertFilePath:
91            signingCertChain = \
92                        self._getCertChainFromProxyCertFile(proxyCertFilePath)
93        else:
94            signingCertChain = None
95               
96        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
97           
98        # Initialise the Session Manager client connection
99        # Omit traceFile keyword to leave out SOAP debug info
100        self.clnt = SessionMgrClient(uri=self.cfg['setUp']['smuri'],
101                sslCACertList=sslCACertList,
102                sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
103                setSignatureHandler=setSignatureHandler,
104                reqBinSecTokValType=reqBinSecTokValType,
105                signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
106                signingCertChain=signingCertChain,
107                signingPriKeyFilePath=self.cfg['setUp']['clntprikeyfilepath'],
108                signingPriKeyPwd=clntPriKeyPwd,
109                caCertFilePathList=caCertFilePathList,
110                tracefile=tracefile) 
111       
112        self.sessID = None
113        self.proxyCert = None
114        self.proxyPriKey = None
115        self.userCert = None
116
117# TODO: is addUser part of session manager?
118#    def test1AddUser(self):
119#        """Add a new user ID to the MyProxy repository"""
120#       
121#        passphrase = self.cfg['test1AddUser'].get('passphrase') or \
122#            getpass.getpass(prompt="\ntest1AddUser pass-phrase for new user: ")
123#           
124#        # Note the pass-phrase is read from the file tmp.  To pass
125#        # explicitly as a string use the 'passphrase' keyword instead
126#        self.clnt.addUser(self.cfg['test1AddUser']['username'],
127#                          passphrase=passphrase)
128#        print "Added user '%s'" % self.cfg['test1AddUser']['username']
129       
130
131    def test1Connect(self):
132        """test1Connect: Connect as if acting as a browser client -
133        a session ID is returned"""
134       
135        if self.__class__.test2Passphrase is None:
136            self.__class__.test2Passphrase = \
137                                    self.cfg['test1Connect'].get('passphrase')
138       
139        if not self.__class__.test2Passphrase:
140            self.__class__.test2Passphrase = getpass.getpass(\
141                               prompt="\ntest2Connect pass-phrase for user: ")
142
143        self.proxyCert, self.proxyPriKey, self.userCert, self.sessID = \
144            self.clnt.connect(self.cfg['test1Connect']['username'], 
145                              passphrase=self.__class__.test2Passphrase)
146
147        print "User '%s' connected to Session Manager:\n%s" % \
148            (self.cfg['test1Connect']['username'], self.sessID)
149           
150           
151    def test2GetSessionStatus(self):
152        """test2GetSessionStatus: check a session is alive"""
153        print "\n\t" + self.test2GetSessionStatus.__doc__
154       
155        self.test1Connect()
156        assert self.clnt.getSessionStatus(sessID=self.sessID), \
157                "Session is dead"
158               
159        print "User connected to Session Manager with sessID=%s" % self.sessID
160
161        assert not self.clnt.getSessionStatus(sessID='abc'), \
162            "sessID=abc shouldn't exist!"
163           
164        print "CORRECT: sessID=abc doesn't exist"
165
166
167    def test3ConnectNoCreateServerSess(self):
168        """test3ConnectNoCreateServerSess: Connect as a non browser client -
169        sessID should be None"""
170
171        if self.__class__.test3Passphrase is None:
172            self.__class__.test3Passphrase = \
173                self.cfg['test3ConnectNoCreateServerSess'].get('passphrase')
174               
175        if not self.__class__.test3Passphrase:
176            self.__class__.test3Passphrase = getpass.getpass(\
177            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user: ")
178
179        self.proxyCert, self.proxyPriKey, self.userCert, sessID = \
180            self.clnt.connect(\
181                      self.cfg['test3ConnectNoCreateServerSess']['username'], 
182                      passphrase=self.__class__.test3Passphrase,
183                      createServerSess=False)
184       
185        # Expect null session ID
186        assert(not sessID)
187         
188        print "User '%s' connected to Session Manager:\n%s" % \
189                    (self.cfg['test3ConnectNoCreateServerSess']['username'], 
190                     self.proxyCert)
191           
192
193    def test4DisconnectUsingSessID(self):
194        """test4DisconnectUsingSessID: disconnect as if acting as a browser client
195        """
196       
197        print "\n\t" + self.test4DisconnectUsingSessID.__doc__
198        self.test1Connect()
199       
200        self.clnt.disconnect(sessID=self.sessID)
201       
202        print "User disconnected from Session Manager:\n%s" % self.sessID
203           
204
205    def test5DisconnectUsingProxyCert(self):
206        """test5DisconnectUsingProxyCert: Disconnect as a command line client
207        """
208       
209        print "\n\t" + self.test5DisconnectUsingProxyCert.__doc__
210        self.test1Connect()
211       
212        # Use proxy cert / private key just obtained from connect call for
213        # signature generation         
214        self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
215        self.clnt.signatureHandler.signingPriKey = self.proxyPriKey       
216        self.clnt.signatureHandler.signingCertChain = (self.userCert,
217                                                       self.proxyCert)
218       
219        # Proxy cert in signature determines ID of session to
220        # delete
221        self.clnt.disconnect()
222        print "User disconnected from Session Manager:\n%s" % self.proxyCert
223
224
225    def test6GetAttCertUsingSessID(self):
226        """test6GetAttCertUsingSessID: make an attribute request using
227        a session ID as authentication credential"""
228
229        print "\n\t" + self.test6GetAttCertUsingSessID.__doc__       
230        self.test1Connect()
231       
232        attCert = self.clnt.getAttCert(\
233            sessID=self.sessID, 
234            attAuthorityURI=self.cfg['test6GetAttCertUsingSessID']['aauri'])
235       
236        print "Attribute Certificate:\n%s" % attCert
237        attCert.filePath = \
238            self.cfg['test6GetAttCertUsingSessID']['acoutfilepath'] 
239        attCert.write()
240
241
242    def test6aGetAttCertRefusedUsingSessID(self):
243        """test6aGetAttCertRefusedUsingSessID: make an attribute request using
244        a sessID as authentication credential requesting an AC from an
245        Attribute Authority where the user is NOT registered"""
246
247        print "\n\t" + self.test6aGetAttCertRefusedUsingSessID.__doc__       
248        self.test1Connect()
249       
250        aaURI = self.cfg['test6aGetAttCertRefusedUsingSessID']['aauri']
251       
252        try:
253            attCert = self.clnt.getAttCert(sessID=self.sessID, 
254                                           attAuthorityURI=aaURI,
255                                           mapFromTrustedHosts=False)
256        except AttributeRequestDenied, e:
257            print "SUCCESS - obtained expected result: %s" % e
258            return
259       
260        self.fail("Request allowed from AA where user is NOT registered!")
261
262
263    def test6bGetMappedAttCertUsingSessID(self):
264        """test6bGetMappedAttCertUsingSessID: make an attribute request using
265        a session ID as authentication credential"""
266
267        print "\n\t" + self.test6bGetMappedAttCertUsingSessID.__doc__       
268        self.test1Connect()
269       
270        aaURI = self.cfg['test6bGetMappedAttCertUsingSessID']['aauri']
271       
272        attCert=self.clnt.getAttCert(sessID=self.sessID,attAuthorityURI=aaURI)
273       
274        print "Attribute Certificate:\n%s" % attCert 
275
276
277    def test6cGetAttCertWithExtAttCertListUsingSessID(self):
278        """test6cGetAttCertUsingSessID: make an attribute request using
279        a session ID as authentication credential"""
280       
281        print "\n\t" + \
282            self.test6cGetAttCertWithExtAttCertListUsingSessID.__doc__       
283        self.test1Connect()
284       
285        aaURI = \
286            self.cfg['test6cGetAttCertWithExtAttCertListUsingSessID']['aauri']
287       
288        # Use output from test6GetAttCertUsingSessID!
289        extACFilePath = \
290    self.cfg['test6cGetAttCertWithExtAttCertListUsingSessID']['extacfilepath']   
291        extAttCert = open(extACFilePath).read()
292       
293        attCert = self.clnt.getAttCert(sessID=self.sessID, 
294                                       attAuthorityURI=aaURI,
295                                       extAttCertList=[extAttCert])
296         
297        print "Attribute Certificate:\n%s" % attCert 
298
299
300    def test7GetAttCertUsingProxyCert(self):
301        """test7GetAttCertUsingProxyCert: make an attribute request using
302        a proxy cert as authentication credential"""
303        print "\n\t" + self.test7GetAttCertUsingProxyCert.__doc__
304        self.test1Connect()
305
306        self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
307        self.clnt.signatureHandler.signingPriKey = self.proxyPriKey       
308        self.clnt.signatureHandler.signingCertChain = (self.userCert,
309                                                       self.proxyCert)
310       
311        # Request an attribute certificate from an Attribute Authority
312        # using the proxyCert returned from connect()
313       
314        aaURI = self.cfg['test7GetAttCertUsingProxyCert']['aauri']
315        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
316         
317        print "Attribute Certificate:\n%s" % attCert 
318
319
320    def test8GetX509Cert(self):
321        "test8GetX509Cert: return the Session Manager's X.509 Cert."
322        cert = self.clnt.getX509Cert()
323                                             
324        print "Session Manager X.509 Certificate:\n" + cert
325           
326           
327#_____________________________________________________________________________       
328class SessionMgrClientTestSuite(unittest.TestSuite):
329   
330    def __init__(self):
331        map = map(SessionMgrClientTestCase,
332                  (
333                    "test1Connect",
334                    "test2GetSessionStatus",
335                    "test3ConnectNoCreateServerSess",
336                    "test4DisconnectUsingSessID",
337                    "test5DisconnectUsingProxyCert",
338                    "test6GetAttCertUsingSessID",
339                    "test6bGetMappedAttCertUsingSessID",
340                    "test6cGetAttCertWithExtAttCertListUsingSessID",
341                    "test7GetAttCertUsingProxyCert",
342                    "test8GetX509Cert",
343                  ))
344        unittest.TestSuite.__init__(self, map)
345           
346                                                   
347if __name__ == "__main__":
348    unittest.main()       
Note: See TracBrowser for help on using the repository browser.