source: TI12-security/trunk/python/Tests/SecurityClientTest.py @ 1304

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/SecurityClientTest.py@1304
Revision 1304, 10.3 KB checked in by pjkersha, 13 years ago (diff)

Tests/SecurityClientTest?.py: make it easier to switch between tests on different targets.

NDG/CredWallet.py: update date.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2
3"""Test harness for NDG Security client - makes requests for authentication
4and authorisation
5
6NERC Data Grid Project
7
8P J Kershaw 23/02/06
9
10(Renamed from SessionClientTest.py 27/0/4/06)
11
12Copyright (C) 2006 CCLRC & NERC
13
14This software may be distributed under the terms of the Q Public License,
15version 1.0 or later.
16"""
17import unittest
18import os
19
20from Cookie import SimpleCookie
21
22from NDG.SecurityClient import *
23
24
25class SecurityClientTestCase(unittest.TestCase):
26   
27    def setUp(self):
28        try:
29            self.config = {}
30
31            # Gabriel settings
32            gabrielConfig = {}
33            gabrielConfig['smWSDL'] = 'http://gabriel.bnsc.rl.ac.uk/sessionMgr.wsdl'
34            gabrielConfig['aaWSDL'] = 'http://gabriel.bnsc.rl.ac.uk/attAuthority.wsdl'
35
36           
37            gabrielConfig['newUserName'] = 'YosemiteSam'           
38            gabrielConfig['userName'] = 'gabriel'
39            gabrielConfig['trustedHostRequiredRole'] = 'academic'
40
41            gabrielConfig['aaPubKeyFilePath'] = None
42   
43            # Public key of session manager used to encrypt requests
44            # If no public key is set, it will be retrieved using the
45            # getPubKey WS method
46            gabrielConfig['smPubKeyFilePath'] = None
47
48           
49            # Glue settings
50            glueConfig = {}
51            glueConfig['smWSDL'] = 'http://glue.badc.rl.ac.uk/sessionMgr.wsdl'
52            glueConfig['aaWSDL'] = 'http://glue.badc.rl.ac.uk/attAuthority.wsdl'
53
54           
55            glueConfig['newUserName'] = 'YosemiteSam'           
56            glueConfig['userName'] = 'lawrence'
57            glueConfig['trustedHostRequiredRole'] = 'acsoe'
58            #glueConfig['trustedHostRequiredRole'] = 'coapec'
59
60            glueConfig['aaPubKeyFilePath'] = None
61   
62            # Public key of session manager used to encrypt requests
63            # If no public key is set, it will be retrieved using the
64            # getPubKey WS method
65            glueConfig['smPubKeyFilePath'] = None
66
67
68            # Uncomment for required test
69            self.config = gabrielConfig
70            #self.config = glueConfig
71
72           
73            self.__clntPriKeyPwd = open("./tmp2").read().strip()
74
75            clntPubKeyFilePath = "./Junk-cert.pem"
76            clntPriKeyFilePath = "./Junk-key.pem"
77            traceFile = None#sys.stderr
78           
79            # Initialise the Session Manager client connection
80            # Omit traceFile keyword to leave out SOAP debug info
81            self.sessClnt = SessionClient(smWSDL=self.config['smWSDL'],
82                            smPubKeyFilePath=self.config['smPubKeyFilePath'],
83                            clntPubKeyFilePath=clntPubKeyFilePath,
84                            clntPriKeyFilePath=clntPriKeyFilePath,
85                            traceFile=traceFile) 
86
87            # Attribute Authority client tests           
88            self.aaClnt = AttAuthorityClient(aaWSDL=self.config['aaWSDL'],
89                            aaPubKeyFilePath=self.config['aaPubKeyFilePath'],
90                            clntPubKeyFilePath=clntPubKeyFilePath,
91                            clntPriKeyFilePath=clntPriKeyFilePath,
92                            traceFile=traceFile) 
93            self.sessCookie = None
94            self.proxyCert = None
95       
96        except Exception, e:
97            self.fail(str(e))
98           
99           
100    def tearDown(self):
101        pass
102
103
104    def testAddUser(self):
105       
106        try:
107            # Uncomment to add a new user ID to the MyProxy repository
108            # Note the pass-phrase is read from the file tmp.  To pass
109            # explicitly as a string use the 'pPhrase' keyword instead
110            self.sessClnt.addUser(self.config['newUserName'], 
111                                  pPhraseFilePath="./tmp",
112                                  clntPriKeyPwd=self.__clntPriKeyPwd)
113            print "Added user '%s'" % self.config['newUserName']
114           
115        except Exception, e:
116            self.fail(str(e))
117       
118
119    def testCookieConnect(self):
120       
121#        import pdb
122#        pdb.set_trace()
123        try:
124            # Connect as if acting as a browser client - a cookie is returned
125            sSessCookie = self.sessClnt.connect(self.config['userName'], 
126                                        pPhraseFilePath="./tmp",
127                                        clntPriKeyPwd=self.__clntPriKeyPwd)
128            self.sessCookie = SimpleCookie(sSessCookie)
129            print "User '%s' connected to Session Manager:\n%s" % \
130                (self.config['userName'], sSessCookie)
131
132        except Exception, e:
133            self.fail(str(e))
134           
135
136    def testProxyCertConnect(self):
137       
138        try:
139            # Connect as a command line client - a proxyCert is returned       
140            self.proxyCert = self.sessClnt.connect(self.config['userName'], 
141                                          pPhraseFilePath="./tmp",
142                                          createServerSess=True,
143                                          getCookie=False,
144                                          clntPriKeyPwd=self.__clntPriKeyPwd)
145            print "User '%s' connected to Session Manager:\n%s" % \
146                (self.config['userName'], self.proxyCert)
147
148        except Exception, e:
149            self.fail(str(e))
150
151
152    def testCookieReqAuthorisation(self):
153        try:
154            # Request an attribute certificate from an Attribute Authority
155            # using the cookie returned from connect()
156            self.testCookieConnect()
157            authResp = self.sessClnt.reqAuthorisation(\
158                        sessID=self.sessCookie['NDG-ID1'].value, 
159                        aaWSDL=self.config['aaWSDL'],
160                        encrSessMgrWSDLuri=self.sessCookie['NDG-ID2'].value,
161                        clntPriKeyPwd=self.__clntPriKeyPwd)
162                                                                 
163            # The authorisation response is returned as an object which
164            # behaves like a python dictionary.  See
165            # NDG.SessionMgrIO.AuthorisationResp
166            if 'errMsg' in authResp:
167                print "Authorisation failed for user: %s" % authResp['errMsg']           
168            else:
169                print "User authorised"
170               
171            print authResp
172
173        except Exception, e:
174            self.fail(str(e))
175
176
177    def testProxyCertReqAuthorisation(self):
178        try:
179            self.testProxyCertConnect()
180           
181            # Request an attribute certificate from an Attribute Authority
182            # using the proxyCert returned from connect()
183            authResp = self.sessClnt.reqAuthorisation(\
184                                         proxyCert=self.proxyCert,
185                                         aaWSDL=self.config['aaWSDL'],
186                                         clntPriKeyPwd=self.__clntPriKeyPwd)
187                                                 
188            # The authorisation response is returned as an object which
189            # behaves like a python dictionary.  See
190            # NDG.SessionMgrIO.AuthorisationResp
191            if 'errMsg' in authResp:
192                print "Authorisation failed for user %s" % authResp['errMsg']           
193            else:
194                print "User authorised"
195               
196            print authResp
197
198        except Exception, e:
199            self.fail(str(e))
200
201
202    def testGetPubKey(self):
203        try:
204            # Request an attribute certificate from an Attribute Authority
205            # using the proxyCert returned from connect()
206            pubKey = self.sessClnt.getPubKey()
207                                                 
208            print "Public Key:\n" + pubKey
209
210        except Exception, e:
211            self.fail(str(e))
212           
213
214    def testAAgetHostInfo(self):
215        """Call Attribute Authority getHostInfo"""
216       
217        try:
218            hostInfo = self.aaClnt.getHostInfo(
219                                           clntPriKeyPwd=self.__clntPriKeyPwd)
220            print hostInfo
221           
222        except Exception, e:
223            self.fail(str(e))
224           
225
226    def testAAgetTrustedHostInfo(self):
227        """Call Attribute Authority getTrustedHostInfo with a given role
228        to match against"""
229       
230#        import pdb
231#        pdb.set_trace()
232        try:
233            trustedHosts = self.aaClnt.getTrustedHostInfo(
234                               role=self.config['trustedHostRequiredRole'],
235                               clntPriKeyPwd=self.__clntPriKeyPwd)
236            print trustedHosts
237           
238        except Exception, e:
239            self.fail(str(e))
240           
241
242    def testAAgetTrustedHostInfoWithNoRoleSet(self):
243        """Call Attribute Authority getTrustedHostInfo"""
244       
245        import pdb
246        pdb.set_trace()
247        try:
248            trustedHosts = self.aaClnt.getTrustedHostInfo(
249                                       clntPriKeyPwd=self.__clntPriKeyPwd)
250            print trustedHosts
251           
252        except Exception, e:
253            self.fail(str(e))
254           
255
256    def testAAReqAuthorisation(self):
257        """Call Attribute Authority authorisation request"""
258       
259        import pdb
260        pdb.set_trace()
261        try:
262            # Alternative means of getting proxy cert - from file
263            #self.proxyCert = open("./proxy.pem").read().strip()
264            self.testProxyCertConnect()
265            userAttCert = None
266           
267            ac = self.aaClnt.reqAuthorisation(
268                                       proxyCert=self.proxyCert,
269                                       userAttCert=userAttCert,
270                                       clntPriKeyPwd=self.__clntPriKeyPwd)
271            print ac
272           
273        except Exception, e:
274            self.fail(str(e))
275           
276           
277#_____________________________________________________________________________       
278class SecurityClientTestSuite(unittest.TestSuite):
279   
280    def __init__(self):
281        map = map(SecurityClientTestCase,
282                  (
283                    "testAddUser",
284                    "testConnect",
285                    "testReqAuthorisation",
286                    "testGetPubKey",
287                    "testAAgetTrustedHostInfo",
288                    "testAAReqAuthorisation",
289                  ))
290        unittest.TestSuite.__init__(self, map)
291           
292                                                   
293if __name__ == "__main__":
294    unittest.main()       
Note: See TracBrowser for help on using the repository browser.