source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/SessionMgr/SessionMgrClientTest.py @ 2530

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/SessionMgr/SessionMgrClientTest.py@2530
Revision 2530, 12.1 KB checked in by pjkersha, 13 years ago (diff)

Working Session Manager unit tests for connect and disconmect calls and
getAttCert calls. Correct use of proxy certs with WS-Security signature
interface is also configured.

ndg.security.server/ndg/security/server/AttAuthority/server-config.tac:
removed blank line

ndg.security.server/ndg/security/server/conf/sessionMgrProperties.xml:
added setting for signature handler flag and CA cert

ndg.security.server/ndg/security/server/SessionMgr/server-config.tac:

  • fix to soap_disconnect - call SessionMgr?.deleteUserSession
  • fix to soap_getX509Cert - base64 encode DER format cert output
  • added 'useSignatureHandler' flag to enable WS-Security signature handling

to be omitted if required.

ndg.security.server/ndg/security/server/SessionMgr/init.py:

  • ref to CredWalletInvalidUserX509Cert
  • give explicit keyword names in connect2UserSession method signature
  • raise CredWalletInvalidUserX509Cert if Credential Wallet cert is invalid
  • SessionMgr?.deleteUserSession method - added userSess keyword; fixed userDN

setting to ensure its a string

ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py,
ndg.security.test/ndg/security/test/AttAuthority/attAuthorityClientTest.cfg:
cosmetic changes

ndg.security.test/ndg/security/test/SessionMgr/SessionMgrClientTest.py:

  • added _getCertChainFromProxyCertFile method to enable correct proxy cert

loading

  • added caCertFilePathList, reqBinSecTokValType, setSignatureHandler and

signingCertChain keyword settings to SessionMgrClient? initialisation

  • removed duplicated test6bCookieGetMappedAttCert method

ndg.security.test/ndg/security/test/SessionMgr/sessionMgrProperties.xml:

  • dropped serverCNprefix element setting - not needed for test certs used.

ndg.security.test/ndg/security/test/SessionMgr/sessionMgrClientTest.cfg:

  • added new params caCertFilePathList, reqBinSecTokValType,

setSignatureHandler and proxycertfilepath

ndg.security.common/ndg/security/common/SessionMgr/init.py:

SignatureHandler? to switched on/off

ndg.security.common/ndg/security/common/AttAuthority/init.py: fix to
pydoc for AttAuthorityClient?.init

ndg.security.common/ndg/security/common/CredWallet.py: major fixes for
SessionMgr? - AA calls -

  • CredWalletInvalidUserX509Cert new exception type raised if user cert is

invalid

  • separate setAAuri into a new method createAAClnt
  • getAttCert method can take an aaClnt keyword. This enables the client

object to the AA to call to be passed in. Default is the target AA,
self.aaClnt.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2
3"""Test harness for NDG Session Manager client - makes requests for
4authentication and authorisation.  An Attribute Authority and Simple CA
5services must be running for the reqAuthorisation and addUser tests
6
7NERC Data Grid Project
8
9@author P J Kershaw
10
1123/02/06
12
13Renamed from SessionClientTest.py 27/0/4/06
14Moved and renamed SessionMgrClientTest.py 23/11/06
15
16@copyright (C) 2007 CCLRC & NERC
17
18@license This software may be distributed under the terms of the Q Public
19License, version 1.0 or later.
20"""
21__revision__ = "$Id:$"
22
23import unittest
24import os, sys, getpass, re
25from ConfigParser import SafeConfigParser
26
27from ndg.security.common.SessionMgr import SessionMgrClient, \
28    AttributeRequestDenied
29   
30from ndg.security.common.SessionCookie import SessionCookie
31from ndg.security.common.X509 import X509CertParse
32
33
34class SessionMgrClientTestCase(unittest.TestCase):
35    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
36
37    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
38        '''Read proxy cert and user cert from a single PEM file and put in
39        a list ready for input into SignatureHandler'''               
40        proxyCertFileTxt = open(proxyCertFilePath).read()
41       
42        pemPatRE = re.compile(self.__class__.pemPat, re.S)
43        x509CertList = pemPatRE.findall(proxyCertFileTxt)
44       
45        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
46                            x509CertList]
47   
48        # Expecting proxy cert first - move this to the end.  This will
49        # be the cert used to verify the message signature
50        signingCertChain.reverse()
51       
52        return signingCertChain
53
54
55    def setUp(self):
56       
57        configParser = SafeConfigParser()
58        configParser.read("./sessionMgrClientTest.cfg")
59       
60        self.cfg = {}
61        for section in configParser.sections():
62            self.cfg[section] = dict(configParser.items(section))
63
64        tracefile = sys.stderr
65
66        try:
67            if self.cfg['setUp'].get('clntprikeypwd') is None:
68                clntPriKeyPwd = getpass.getpass(\
69                            prompt="\nsetUp - client private key password: ")
70            else:
71                clntPriKeyPwd = self.cfg['setUp'].get('clntprikeypwd')
72        except KeyboardInterrupt:
73            sys.exit(0)
74
75        # List of CA certificates for use in validation of certs used in
76        # signature for server reponse
77        try:
78            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
79        except:
80            caCertFilePathList = []
81         
82         
83        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
84
85        # Check certificate types proxy or standard
86        proxyCertFilePath = self.cfg['setUp'].get('proxycertfilepath')
87        if proxyCertFilePath:
88            signingCertChain = \
89                        self._getCertChainFromProxyCertFile(proxyCertFilePath)
90           
91        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
92           
93        # Initialise the Session Manager client connection
94        # Omit traceFile keyword to leave out SOAP debug info
95        self.clnt = SessionMgrClient(uri=self.cfg['setUp']['smuri'],
96                setSignatureHandler=setSignatureHandler,
97                reqBinSecTokValType=reqBinSecTokValType,
98                signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
99                signingCertChain=signingCertChain,
100                signingPriKeyFilePath=self.cfg['setUp']['clntprikeyfilepath'],
101                signingPriKeyPwd=clntPriKeyPwd,
102                caCertFilePathList=caCertFilePathList,
103                tracefile=tracefile) 
104       
105        self.sessCookie = None
106        self.proxyCert = None
107        self.proxyPriKey = None
108        self.userCert = None
109
110    def test1AddUser(self):
111        """Add a new user ID to the MyProxy repository"""
112       
113        passphrase = self.cfg['test1AddUser'].get('passphrase') or \
114            getpass.getpass(prompt="\ntest1AddUser pass-phrase for new user: ")
115           
116        # Note the pass-phrase is read from the file tmp.  To pass
117        # explicitly as a string use the 'passphrase' keyword instead
118        self.clnt.addUser(self.cfg['test1AddUser']['username'], 
119                          passphrase=passphrase)
120        print "Added user '%s'" % self.cfg['test1AddUser']['username']
121       
122
123    def test2CookieConnect(self):
124        """test2CookieConnect: Connect as if acting as a browser client -
125        a cookie is returned"""
126
127        passphrase = self.cfg['test2CookieConnect'].get('passphrase')
128        if passphrase is None:
129            passphrase = getpass.getpass(\
130                     prompt="\ntest2CookieConnect pass-phrase for user: ")
131
132        self.proxyCert, self.proxyPriKey, self.userCert, cookie = \
133            self.clnt.connect(self.cfg['test2CookieConnect']['username'], 
134                              passphrase=passphrase,
135                              getCookie=True)
136
137        self.sessCookie = SessionCookie(cookie)
138        print "User '%s' connected to Session Manager:\n%s" % \
139            (self.cfg['test2CookieConnect']['username'], self.sessCookie)
140           
141
142    def test3ProxyCertConnect(self):
143        """test3ProxyCertConnect: Connect as a command line client -
144        a proxyCert is returned"""
145
146        passphrase = self.cfg['test3ProxyCertConnect'].get('passphrase') or \
147            getpass.getpass(\
148                    prompt="\ntest3ProxyCertConnect pass-phrase for user: ")
149
150        self.proxyCert, self.proxyPriKey, self.userCert, null = \
151            self.clnt.connect(self.cfg['test3ProxyCertConnect']['username'], 
152                              passphrase=passphrase,
153                              getCookie=False)
154        print "User '%s' connected to Session Manager:\n%s" % \
155            (self.cfg['test3ProxyCertConnect']['username'], self.proxyCert)
156           
157
158    def test4CookieDisconnect(self):
159        """test4CookieDisconnect: disconnect as if acting as a browser client
160        """
161       
162        print "\n\t" + self.test4CookieDisconnect.__doc__
163        self.test2CookieConnect()
164       
165        # Use proxy cert / private key just obtained from connect call for
166        # signature generation
167        self.clnt.signatureHandler.signingCert = self.proxyCert
168        self.clnt.signatureHandler.signingCertPriKey = self.proxyPriKey
169        import pdb;pdb.set_trace()
170        self.clnt.disconnect(sessCookie=str(self.sessCookie))
171       
172        print "User disconnected from Session Manager:\n%s" % self.sessCookie
173           
174
175    def test5ProxyCertDisconnect(self):
176        """test5ProxyCertDisconnect: Disconnect as a command line client
177        """
178       
179        print "\n\t" + self.test5ProxyCertDisconnect.__doc__
180        self.test3ProxyCertConnect()
181       
182        # Use proxy to sign outbound SOAP message
183        self.clnt.signingCert = self.proxyCert
184        self.clnt.signingKey = self.proxyPriKey
185        self.clnt.signingPriKeyPwd = None
186       
187        # Proxy cert in signature determines ID of session to
188        # delete
189        self.clnt.disconnect()
190        print "User disconnected from Session Manager:\n%s" % self.proxyCert
191
192
193    def test6CookieGetAttCert(self):
194        """test6CookieGetAttCert: make an attribute request using
195        a cookie as authentication credential"""
196
197        print "\n\t" + self.test6CookieGetAttCert.__doc__       
198        self.test2CookieConnect()
199       
200        attCert, extAttCertList = self.clnt.getAttCert(\
201            sessID=self.sessCookie.sessionID, 
202            encrSessionMgrURI=self.sessCookie.encrSessionMgrURI,
203            attAuthorityURI=self.cfg['test6CookieGetAttCert']['aauri'])
204       
205        print "Attribute Certificate:\n%s" % attCert 
206        print "External Attribute Certificate List:\n%s" % extAttCertList
207
208
209    def test6aCookieGetAttCertRefused(self):
210        """test6aCookieGetAttCertRefused: make an attribute request using
211        a cookie as authentication credential requesting an AC from an
212        Attribute Authority where the user is NOT registered"""
213
214        print "\n\t" + self.test6aCookieGetAttCertRefused.__doc__       
215        self.test2CookieConnect()
216       
217        aaURI = self.cfg['test6aCookieGetAttCertRefused']['aauri']
218       
219        try:
220            attCert, extAttCertList = self.clnt.getAttCert(\
221                        sessID=self.sessCookie.sessionID, 
222                        encrSessionMgrURI=self.sessCookie.encrSessionMgrURI,
223                        attAuthorityURI=aaURI,
224                        mapFromTrustedHosts=False)
225        except AttributeRequestDenied, e:
226            print "SUCCESS - obtained expected result: %s" % e
227            return
228       
229        self.fail("Request allowed from AA where user is NOT registered!")
230
231
232    def test6bCookieGetMappedAttCert(self):
233        """test6bCookieGetMappedAttCert: make an attribute request using
234        a cookie as authentication credential"""
235
236        print "\n\t" + self.test6bCookieGetMappedAttCert.__doc__       
237        self.test2CookieConnect()
238       
239        attCert, extAttCertList = self.clnt.getAttCert(\
240            sessID=self.sessCookie.sessionID, 
241            encrSessionMgrURI=self.sessCookie.encrSessionMgrURI,
242            attAuthorityURI=self.cfg['test6bCookieGetMappedAttCert']['aauri'])
243       
244        print "Attribute Certificate:\n%s" % attCert 
245        print "External Attribute Certificate List:\n%s" % extAttCertList
246
247
248    def test6cCookieGetAttCertWithExtAttCertList(self):
249        """test6CookieGetAttCert: make an attribute request using
250        a cookie as authentication credential"""
251       
252        print "\n\t" + self.test6cCookieGetAttCertWithExtAttCertList.__doc__       
253        self.test2CookieConnect()
254       
255        aaURI = \
256            self.cfg['test6cCookieGetAttCertWithExtAttCertList']['aauri']
257           
258        attCert, extAttCertList = self.clnt.getAttCert(\
259                        sessID=self.sessCookie.sessionID, 
260                        encrSessionMgrURI=self.sessCookie.encrSessionMgrURI,
261                        attAuthorityURI=aaURI,
262                        extAttCertList=['AC1', 'AC2', 'AC3'])
263         
264        print "Attribute Certificate:\n%s" % attCert 
265        print "External Attribute Certificate List:\n%s" % extAttCertList
266
267
268    def test7ProxyCertGetAttCert(self):
269        """test7ProxyCertGetAttCert: make an attribute request using
270        a proxy cert as authentication credential"""
271        print "\n\t" + self.test7ProxyCertGetAttCert.__doc__
272        self.test3ProxyCertConnect()
273       
274        # Request an attribute certificate from an Attribute Authority
275        # using the proxyCert returned from connect()
276       
277        aaURI = self.cfg['test7ProxyCertGetAttCert']['aauri']
278        attCert, extAttCertList = self.clnt.getAttCert(\
279                                                 proxyCert=self.proxyCert,
280                                                 attAuthorityURI=aaURI)
281         
282        print "Attribute Certificate:\n%s" % attCert 
283        print "External Attribute Certificate List:\n%s" % extAttCertList
284
285
286    def test8GetX509Cert(self):
287        "test8GetX509Cert: return the Session Manager's X.509 Cert."
288        cert = self.clnt.getX509Cert()
289                                             
290        print "Session Manager X.509 Certificate:\n" + cert
291           
292           
293#_____________________________________________________________________________       
294class SessionMgrClientTestSuite(unittest.TestSuite):
295   
296    def __init__(self):
297        map = map(SessionMgrClientTestCase,
298                  (
299                    "test1AddUser",
300                    "test2CookieConnect",
301                    "test3ProxyCertConnect",
302                    "test4CookieDisconnect",
303                    "test5ProxyCertDisconnect",
304                    "test6CookieGetAttCert",
305                    "test6bCookieGetMappedAttCert",
306                    "test6cCookieGetAttCertWithExtAttCertList",
307                    "test7ProxyCertGetAttCert",
308                    "test8GetX509Cert",
309                  ))
310        unittest.TestSuite.__init__(self, map)
311           
312                                                   
313if __name__ == "__main__":
314    unittest.main()       
Note: See TracBrowser for help on using the repository browser.