source: TI12-security/trunk/python/Tests/SecurityClientTest.py @ 1303

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/SecurityClientTest.py@1303
Revision 1303, 9.7 KB checked in by pjkersha, 13 years ago (diff)

NDG/Session.py:

  • UserSession? createSessID -> addNewSessID public method to enable more than one session ID

to be tied to one session.

  • deleteUserSession - fix to input keyword args

NDG/CredWallet.py: fix to reqAuthorisation authzResp check.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2
3"""Test harness for NDG Security client - makes requests for authentication
4and authorisation
5
6NERC Data Grid Project
7
8P J Kershaw 23/02/06
9
10(Renamed from SessionClientTest.py 27/0/4/06)
11
12Copyright (C) 2006 CCLRC & NERC
13
14This software may be distributed under the terms of the Q Public License,
15version 1.0 or later.
16"""
17import unittest
18import os
19
20from Cookie import SimpleCookie
21
22from NDG.SecurityClient import *
23
24
25class SecurityClientTestCase(unittest.TestCase):
26   
27    def setUp(self):
28        try:
29            # Session Manager WSDL
30#            self.smWSDL = './sessionMgr.wsdl'
31#            self.smWSDL = 'http://glue.badc.rl.ac.uk/sessionMgr.wsdl'
32            self.smWSDL = 'http://gabriel.bnsc.rl.ac.uk/sessionMgr.wsdl'
33
34#            self.aaWSDL = '/home/pjkersha/Development/security/python/Tests/attAuthority.wsdl'
35#            self.aaWSDL = 'http://glue.badc.rl.ac.uk/attAuthority.wsdl'
36            self.aaWSDL = 'http://gabriel.bnsc.rl.ac.uk/attAuthority.wsdl'
37
38            aaPubKeyFilePath = None
39   
40            # Public key of session manager used to encrypt requests
41            # If no public key is set, it will be retrieved using the
42            # getPubKey WS method
43            smPubKeyFilePath = None
44           
45#            self.newUserName = 'lawrence'
46            self.newUserName = 'YosemiteSam'
47           
48            self.userName = 'gabriel'
49#           self.userName = 'lawrence'
50
51#            self.trustedHostRequiredRole = 'acsoe'
52#            self.trustedHostRequiredRole = 'coapec'
53            self.trustedHostRequiredRole = 'academic'
54
55            self.__clntPriKeyPwd = open("./tmp2").read().strip()
56
57            clntPubKeyFilePath = "./Junk-cert.pem"
58            clntPriKeyFilePath = "./Junk-key.pem"
59            traceFile = None#sys.stderr
60           
61            # Initialise the Session Manager client connection
62            # Omit traceFile keyword to leave out SOAP debug info
63            self.sessClnt = SessionClient(smWSDL=self.smWSDL,
64                                        smPubKeyFilePath=smPubKeyFilePath,
65                                        clntPubKeyFilePath=clntPubKeyFilePath,
66                                        clntPriKeyFilePath=clntPriKeyFilePath,
67                                        traceFile=traceFile) 
68
69            # Attribute Authority client tests           
70            self.aaClnt = AttAuthorityClient(aaWSDL=self.aaWSDL,
71                                        aaPubKeyFilePath=aaPubKeyFilePath,
72                                        clntPubKeyFilePath=clntPubKeyFilePath,
73                                        clntPriKeyFilePath=clntPriKeyFilePath,
74                                        traceFile=traceFile) 
75            self.sessCookie = None
76            self.proxyCert = None
77       
78        except Exception, e:
79            self.fail(str(e))
80           
81           
82    def tearDown(self):
83        pass
84
85
86    def testAddUser(self):
87       
88        try:
89            # Uncomment to add a new user ID to the MyProxy repository
90            # Note the pass-phrase is read from the file tmp.  To pass
91            # explicitly as a string use the 'pPhrase' keyword instead
92            self.sessClnt.addUser(self.newUserName, 
93                                  pPhraseFilePath="./tmp",
94                                  clntPriKeyPwd=self.__clntPriKeyPwd)
95            print "Added user '%s'" % self.newUserName
96           
97        except Exception, e:
98            self.fail(str(e))
99       
100
101    def testCookieConnect(self):
102       
103#        import pdb
104#        pdb.set_trace()
105        try:
106            # Connect as if acting as a browser client - a cookie is returned
107            sSessCookie = self.sessClnt.connect(self.userName, 
108                                        pPhraseFilePath="./tmp",
109                                        clntPriKeyPwd=self.__clntPriKeyPwd)
110            self.sessCookie = SimpleCookie(sSessCookie)
111            print "User '%s' connected to Session Manager:\n%s" % \
112                (self.userName, sSessCookie)
113
114        except Exception, e:
115            self.fail(str(e))
116           
117
118    def testProxyCertConnect(self):
119       
120        try:
121            # Connect as a command line client - a proxyCert is returned       
122            self.proxyCert = self.sessClnt.connect(self.userName, 
123                                          pPhraseFilePath="./tmp",
124                                          createServerSess=True,
125                                          getCookie=False,
126                                          clntPriKeyPwd=self.__clntPriKeyPwd)
127            print "User '%s' connected to Session Manager:\n%s" % \
128                (self.userName, self.proxyCert)
129
130        except Exception, e:
131            self.fail(str(e))
132
133
134    def testCookieReqAuthorisation(self):
135        try:
136            # Request an attribute certificate from an Attribute Authority
137            # using the cookie returned from connect()
138            self.testCookieConnect()
139            authResp = self.sessClnt.reqAuthorisation(\
140                        sessID=self.sessCookie['NDG-ID1'].value, 
141                        aaWSDL=self.aaWSDL,
142                        encrSessMgrWSDLuri=self.sessCookie['NDG-ID2'].value,
143                        clntPriKeyPwd=self.__clntPriKeyPwd)
144                                                                 
145            # The authorisation response is returned as an object which
146            # behaves like a python dictionary.  See
147            # NDG.SessionMgrIO.AuthorisationResp
148            if 'errMsg' in authResp:
149                print "Authorisation failed for user: %s" % authResp['errMsg']           
150            else:
151                print "User authorised"
152               
153            print authResp
154
155        except Exception, e:
156            self.fail(str(e))
157
158
159    def testProxyCertReqAuthorisation(self):
160        try:
161            self.testProxyCertConnect()
162           
163            # Request an attribute certificate from an Attribute Authority
164            # using the proxyCert returned from connect()
165            authResp = self.sessClnt.reqAuthorisation(\
166                                         proxyCert=self.proxyCert,
167                                         aaWSDL=self.aaWSDL,
168                                         clntPriKeyPwd=self.__clntPriKeyPwd)
169                                                 
170            # The authorisation response is returned as an object which
171            # behaves like a python dictionary.  See
172            # NDG.SessionMgrIO.AuthorisationResp
173            if 'errMsg' in authResp:
174                print "Authorisation failed for user %s" % authResp['errMsg']           
175            else:
176                print "User authorised"
177               
178            print authResp
179
180        except Exception, e:
181            self.fail(str(e))
182
183
184    def testGetPubKey(self):
185        try:
186            # Request an attribute certificate from an Attribute Authority
187            # using the proxyCert returned from connect()
188            pubKey = self.sessClnt.getPubKey()
189                                                 
190            print "Public Key:\n" + pubKey
191
192        except Exception, e:
193            self.fail(str(e))
194           
195
196    def testAAgetHostInfo(self):
197        """Call Attribute Authority getHostInfo"""
198       
199        try:
200            hostInfo = self.aaClnt.getHostInfo(
201                                           clntPriKeyPwd=self.__clntPriKeyPwd)
202            print hostInfo
203           
204        except Exception, e:
205            self.fail(str(e))
206           
207
208    def testAAgetTrustedHostInfo(self):
209        """Call Attribute Authority getTrustedHostInfo with a given role
210        to match against"""
211       
212#        import pdb
213#        pdb.set_trace()
214        try:
215            trustedHosts = self.aaClnt.getTrustedHostInfo(
216                                       role=self.trustedHostRequiredRole,
217                                       clntPriKeyPwd=self.__clntPriKeyPwd)
218            print trustedHosts
219           
220        except Exception, e:
221            self.fail(str(e))
222           
223
224    def testAAgetTrustedHostInfoWithNoRoleSet(self):
225        """Call Attribute Authority getTrustedHostInfo"""
226       
227        import pdb
228        pdb.set_trace()
229        try:
230            trustedHosts = self.aaClnt.getTrustedHostInfo(
231                                       clntPriKeyPwd=self.__clntPriKeyPwd)
232            print trustedHosts
233           
234        except Exception, e:
235            self.fail(str(e))
236           
237
238    def testAAReqAuthorisation(self):
239        """Call Attribute Authority authorisation request"""
240       
241        import pdb
242        pdb.set_trace()
243        try:
244            # Alternative means of getting proxy cert - from file
245            #self.proxyCert = open("./proxy.pem").read().strip()
246            self.testProxyCertConnect()
247            userAttCert = None
248           
249            ac = self.aaClnt.reqAuthorisation(
250                                       proxyCert=self.proxyCert,
251                                       userAttCert=userAttCert,
252                                       clntPriKeyPwd=self.__clntPriKeyPwd)
253            print ac
254           
255        except Exception, e:
256            self.fail(str(e))
257           
258           
259#_____________________________________________________________________________       
260class SecurityClientTestSuite(unittest.TestSuite):
261   
262    def __init__(self):
263        map = map(SecurityClientTestCase,
264                  (
265                    "testAddUser",
266                    "testConnect",
267                    "testReqAuthorisation",
268                    "testGetPubKey",
269                    "testAAgetTrustedHostInfo",
270                    "testAAReqAuthorisation",
271                  ))
272        unittest.TestSuite.__init__(self, map)
273           
274                                                   
275if __name__ == "__main__":
276    unittest.main()       
Note: See TracBrowser for help on using the repository browser.