1 | #!/usr/bin/env python |
---|
2 | |
---|
3 | """Test harness for NDG Session Manager client - makes requests for |
---|
4 | authentication and authorisation. An Attribute Authority and Simple CA |
---|
5 | services must be running for the reqAuthorisation and addUser tests |
---|
6 | |
---|
7 | NERC Data Grid Project |
---|
8 | |
---|
9 | @author P J Kershaw |
---|
10 | |
---|
11 | 23/02/06 |
---|
12 | |
---|
13 | Renamed from SessionClientTest.py 27/0/4/06 |
---|
14 | Moved and renamed SessionMgrClientTest.py 23/11/06 |
---|
15 | |
---|
16 | @copyright (C) 2007 CCLRC & NERC |
---|
17 | |
---|
18 | @license This software may be distributed under the terms of the Q Public |
---|
19 | License, version 1.0 or later. |
---|
20 | """ |
---|
21 | import unittest |
---|
22 | import os, sys, getpass |
---|
23 | from ConfigParser import SafeConfigParser |
---|
24 | |
---|
25 | from ndg.security.common.SessionMgr import SessionMgrClient |
---|
26 | from ndg.security.common.SessionCookie import SessionCookie |
---|
27 | |
---|
28 | |
---|
29 | class SessionMgrClientTestCase(unittest.TestCase): |
---|
30 | |
---|
31 | def setUp(self): |
---|
32 | |
---|
33 | configParser = SafeConfigParser() |
---|
34 | configParser.read("./sessionMgrClientTest.cfg") |
---|
35 | |
---|
36 | self.cfg = {} |
---|
37 | for section in configParser.sections(): |
---|
38 | self.cfg[section] = dict(configParser.items(section)) |
---|
39 | |
---|
40 | tracefile = sys.stderr |
---|
41 | |
---|
42 | try: |
---|
43 | if self.cfg['setUp'].get('clntprikeypwd') is None: |
---|
44 | clntPriKeyPwd = getpass.getpass(\ |
---|
45 | prompt="\nsetUp - client private key password: ") |
---|
46 | else: |
---|
47 | clntPriKeyPwd = self.cfg['setUp'].get('clntprikeypwd') |
---|
48 | except KeyboardInterrupt: |
---|
49 | sys.exit(0) |
---|
50 | |
---|
51 | # Initialise the Session Manager client connection |
---|
52 | # Omit traceFile keyword to leave out SOAP debug info |
---|
53 | self.clnt = SessionMgrClient(uri=self.cfg['setUp']['smuri'], |
---|
54 | verifyingCertFilePath=self.cfg['setUp']['srvcertfilepath'], |
---|
55 | signingCertFilePath=self.cfg['setUp']['clntcertfilepath'], |
---|
56 | signingPriKeyFilePath=self.cfg['setUp']['clntprikeyfilepath'], |
---|
57 | signingPriKeyPwd=clntPriKeyPwd, |
---|
58 | tracefile=tracefile) |
---|
59 | |
---|
60 | self.sessCookie = None |
---|
61 | self.proxyCert = None |
---|
62 | self.proxyPriKey = None |
---|
63 | self.userCert = None |
---|
64 | |
---|
65 | def test1AddUser(self): |
---|
66 | """Add a new user ID to the MyProxy repository""" |
---|
67 | |
---|
68 | passphrase = self.cfg['test1AddUser'].get('passphrase') or \ |
---|
69 | getpass.getpass(prompt="\ntest1AddUser pass-phrase for new user: ") |
---|
70 | |
---|
71 | # Note the pass-phrase is read from the file tmp. To pass |
---|
72 | # explicitly as a string use the 'passphrase' keyword instead |
---|
73 | self.clnt.addUser(self.cfg['test1AddUser']['username'], |
---|
74 | passphrase=passphrase) |
---|
75 | print "Added user '%s'" % self.cfg['test1AddUser']['username'] |
---|
76 | |
---|
77 | |
---|
78 | def test2CookieConnect(self): |
---|
79 | """test2CookieConnect: Connect as if acting as a browser client - |
---|
80 | a cookie is returned""" |
---|
81 | |
---|
82 | passphrase = self.cfg['test2CookieConnect'].get('passphrase') or \ |
---|
83 | getpass.getpass(prompt="\ntest2CookieConnect pass-phrase for user: ") |
---|
84 | |
---|
85 | self.proxyCert, self.proxyPriKey, self.userCert, cookie = \ |
---|
86 | self.clnt.connect(self.cfg['test2CookieConnect']['username'], |
---|
87 | passphrase=passphrase, |
---|
88 | getCookie=True) |
---|
89 | |
---|
90 | self.sessCookie = SessionCookie(cookie) |
---|
91 | print "User '%s' connected to Session Manager:\n%s" % \ |
---|
92 | (self.cfg['test2CookieConnect']['username'], self.sessCookie) |
---|
93 | |
---|
94 | |
---|
95 | def test3ProxyCertConnect(self): |
---|
96 | """test3ProxyCertConnect: Connect as a command line client - |
---|
97 | a proxyCert is returned""" |
---|
98 | |
---|
99 | passphrase = self.cfg['test3ProxyCertConnect'].get('passphrase') or \ |
---|
100 | getpass.getpass(\ |
---|
101 | prompt="\ntest3ProxyCertConnect pass-phrase for user: ") |
---|
102 | |
---|
103 | self.proxyCert, self.proxyPriKey, self.userCert, null = \ |
---|
104 | self.clnt.connect(self.cfg['test3ProxyCertConnect']['username'], |
---|
105 | passphrase=passphrase, |
---|
106 | createServerSess=True, |
---|
107 | getCookie=False) |
---|
108 | print "User '%s' connected to Session Manager:\n%s" % \ |
---|
109 | (self.cfg['test3ProxyCertConnect']['username'], self.proxyCert) |
---|
110 | |
---|
111 | |
---|
112 | def test4CookieDisconnect(self): |
---|
113 | """test4CookieDisconnect: disconnect as if acting as a browser client - |
---|
114 | a cookie is returned""" |
---|
115 | |
---|
116 | print "\n\t" + self.test4CookieDisconnect.__doc__ |
---|
117 | self.test2CookieConnect() |
---|
118 | |
---|
119 | # Use proxy cert / private key just obtained from connect call for |
---|
120 | # signature generation |
---|
121 | self.clnt.signatureHandler.signingCert = self.proxyCert |
---|
122 | self.clnt.signatureHandler.signingCertPriKey = self.proxyPriKey |
---|
123 | |
---|
124 | self.clnt.disconnect(#userCert=self.userCert, |
---|
125 | #sessCookie=str(self.sessCookie) |
---|
126 | #sessID="A", |
---|
127 | #encrSessionMgrURI="B" |
---|
128 | ) |
---|
129 | |
---|
130 | print "User disconnected from Session Manager:\n%s" % self.sessCookie |
---|
131 | |
---|
132 | |
---|
133 | def test5ProxyCertDisconnect(self): |
---|
134 | """test5ProxyCertDisconnect: Connect as a command line client - |
---|
135 | a proxyCert is returned""" |
---|
136 | |
---|
137 | print "\n\t" + self.test5ProxyCertDisconnect.__doc__ |
---|
138 | self.test3ProxyCertConnect() |
---|
139 | |
---|
140 | # Use proxy to sign outbound SOAP message |
---|
141 | self.clnt.clntCert = self.proxyCert |
---|
142 | self.clnt.clntKey = self.proxyPriKey |
---|
143 | self.clnt.clntPriKeyPwd = None |
---|
144 | |
---|
145 | self.clnt.disconnect(proxyCert=self.proxyCert) |
---|
146 | print "User disconnected from Session Manager:\n%s" % self.proxyCert |
---|
147 | |
---|
148 | |
---|
149 | def test6CookieGetAttCert(self): |
---|
150 | """test6CookieGetAttCert: make an authorisation request using |
---|
151 | a cookie as authentication credential""" |
---|
152 | |
---|
153 | print "\n\t" + self.test6CookieGetAttCert.__doc__ |
---|
154 | self.test2CookieConnect() |
---|
155 | |
---|
156 | attCert, statusCode, msg, extAttCertList = self.clnt.getAttCert(\ |
---|
157 | sessID=self.sessCookie.sessionID, |
---|
158 | encrSessionMgrURI=self.sessCookie.encrSessionMgrURI, |
---|
159 | attAuthorityURI=self.cfg['test6CookieGetAttCert']['aauri']) |
---|
160 | |
---|
161 | print "Attribute Certificate:\n%s" % attCert |
---|
162 | print "Status: %s" % statusCode |
---|
163 | print "Message: %s" % msg |
---|
164 | print "External Attribute Certificate List:\n%s" % extAttCertList |
---|
165 | |
---|
166 | |
---|
167 | def test6aCookieGetAttCertWithExtAttCertList(self): |
---|
168 | """test6CookieGetAttCert: make an authorisation request using |
---|
169 | a cookie as authentication credential""" |
---|
170 | |
---|
171 | print "\n\t" + self.test6aCookieGetAttCertWithExtAttCertList.__doc__ |
---|
172 | self.test2CookieConnect() |
---|
173 | |
---|
174 | aaURI = \ |
---|
175 | self.cfg['test6aCookieGetAttCertWithExtAttCertList']['aauri'] |
---|
176 | |
---|
177 | attCert, statusCode, msg, extAttCertList = self.clnt.getAttCert(\ |
---|
178 | sessID=self.sessCookie.sessionID, |
---|
179 | encrSessionMgrURI=self.sessCookie.encrSessionMgrURI, |
---|
180 | attAuthorityURI=aaURI, |
---|
181 | extAttCertList=['AC1', 'AC2', 'AC3']) |
---|
182 | |
---|
183 | print "Attribute Certificate:\n%s" % attCert |
---|
184 | print "Status: %s" % statusCode |
---|
185 | print "Message: %s" % msg |
---|
186 | print "External Attribute Certificate List:\n%s" % extAttCertList |
---|
187 | |
---|
188 | |
---|
189 | def test7ProxyCertGetAttCert(self): |
---|
190 | """test7ProxyCertGetAttCert: make an authorisation request using |
---|
191 | a proxy cert as authentication credential""" |
---|
192 | print "\n\t" + self.test7ProxyCertGetAttCert.__doc__ |
---|
193 | self.test3ProxyCertConnect() |
---|
194 | |
---|
195 | # Request an attribute certificate from an Attribute Authority |
---|
196 | # using the proxyCert returned from connect() |
---|
197 | |
---|
198 | aaURI = self.cfg['test7ProxyCertGetAttCert']['aauri'] |
---|
199 | attCert, statusCode, msg, extAttCertList = self.clnt.getAttCert(\ |
---|
200 | proxyCert=self.proxyCert, |
---|
201 | attAuthorityURI=aaURI) |
---|
202 | |
---|
203 | print "Attribute Certificate:\n%s" % attCert |
---|
204 | print "Status: %s" % statusCode |
---|
205 | print "Message: %s" % msg |
---|
206 | print "External Attribute Certificate List:\n%s" % extAttCertList |
---|
207 | |
---|
208 | |
---|
209 | def test8GetX509Cert(self): |
---|
210 | "test8GetX509Cert: return the Session Manager's X.509 Cert." |
---|
211 | cert = self.clnt.getX509Cert() |
---|
212 | |
---|
213 | print "Session Manager X.509 Certificate:\n" + cert |
---|
214 | |
---|
215 | |
---|
216 | #_____________________________________________________________________________ |
---|
217 | class SessionMgrClientTestSuite(unittest.TestSuite): |
---|
218 | |
---|
219 | def __init__(self): |
---|
220 | map = map(SessionMgrClientTestCase, |
---|
221 | ( |
---|
222 | "test1AddUser", |
---|
223 | "test2CookieConnect", |
---|
224 | "test3ProxyCertConnect", |
---|
225 | "test4CookieDisconnect", |
---|
226 | "test5ProxyCertDisconnect", |
---|
227 | "test6CookieGetAttCert", |
---|
228 | "test6aCookieGetAttCertWithExtAttCertList", |
---|
229 | "test7ProxyCertGetAttCert", |
---|
230 | "test8GetX509Cert", |
---|
231 | )) |
---|
232 | unittest.TestSuite.__init__(self, map) |
---|
233 | |
---|
234 | |
---|
235 | if __name__ == "__main__": |
---|
236 | unittest.main() |
---|