1 | #!/usr/bin/env python |
---|
2 | """NDG Attribute Authority SOAP client unit tests |
---|
3 | |
---|
4 | NERC DataGrid Project |
---|
5 | """ |
---|
6 | __author__ = "P J Kershaw" |
---|
7 | __date__ = "05/05/05, major update 16/01/07" |
---|
8 | __copyright__ = "(C) 2009 Science and Technology Facilities Council" |
---|
9 | __license__ = "BSD - see LICENSE file in top-level directory" |
---|
10 | __contact__ = "Philip.Kershaw@stfc.ac.uk" |
---|
11 | __revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $' |
---|
12 | import logging |
---|
13 | logging.basicConfig(level=logging.DEBUG) |
---|
14 | |
---|
15 | import unittest |
---|
16 | import os, sys, getpass, re |
---|
17 | |
---|
18 | from os.path import expandvars as xpdVars |
---|
19 | from os.path import join as jnPath |
---|
20 | mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file) |
---|
21 | |
---|
22 | from datetime import datetime, timedelta |
---|
23 | from uuid import uuid4 |
---|
24 | from xml.etree import ElementTree |
---|
25 | |
---|
26 | from ndg.security.test.unit import BaseTestCase, mkDataDirPath |
---|
27 | |
---|
28 | from ndg.security.common.utils.etree import prettyPrint |
---|
29 | |
---|
30 | from ndg.security.common.attributeauthority import (AttributeAuthorityClient, |
---|
31 | NoMatchingRoleInTrustedHosts) |
---|
32 | from ndg.security.common.AttCert import AttCertRead |
---|
33 | from ndg.security.common.X509 import X509CertParse, X509CertRead |
---|
34 | from ndg.security.common.utils.configfileparsers import ( |
---|
35 | CaseSensitiveConfigParser) |
---|
36 | |
---|
37 | from saml.utils import SAMLDateTime |
---|
38 | from saml.common.xml import SAMLConstants |
---|
39 | from saml.saml2.core import (Attribute, SAMLVersion, Subject, NameID, Issuer, |
---|
40 | AttributeQuery, XSStringAttributeValue, StatusCode) |
---|
41 | from saml.xml.etree import ResponseElementTree |
---|
42 | |
---|
43 | from ndg.security.common.saml_utils.bindings import SOAPBinding as \ |
---|
44 | SamlSoapBinding |
---|
45 | from ndg.security.common.saml_utils.bindings import AttributeQuerySOAPBinding |
---|
46 | from ndg.security.common.saml_utils.bindings import AttributeQuerySslSOAPBinding |
---|
47 | from ndg.security.common.saml_utils.esg import (EsgSamlNamespaces, |
---|
48 | XSGroupRoleAttributeValue, |
---|
49 | EsgDefaultQueryAttributes) |
---|
50 | |
---|
51 | |
---|
52 | class AttributeAuthorityClientBaseTestCase(BaseTestCase): |
---|
53 | def __init__(self, *arg, **kw): |
---|
54 | super(AttributeAuthorityClientBaseTestCase, self).__init__(*arg, **kw) |
---|
55 | |
---|
56 | if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ: |
---|
57 | os.environ['NDGSEC_AACLNT_UNITTEST_DIR' |
---|
58 | ] = os.path.abspath(os.path.dirname(__file__)) |
---|
59 | |
---|
60 | self.cfgParser = CaseSensitiveConfigParser() |
---|
61 | self.cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], |
---|
62 | 'attAuthorityClientTest.cfg') |
---|
63 | self.cfgParser.read(self.cfgFilePath) |
---|
64 | |
---|
65 | self.cfg = {} |
---|
66 | for section in self.cfgParser.sections(): |
---|
67 | self.cfg[section] = dict(self.cfgParser.items(section)) |
---|
68 | |
---|
69 | try: |
---|
70 | self.sslCACertList = [X509CertRead(xpdVars(caFile)) |
---|
71 | for caFile in self.cfg['setUp'][ |
---|
72 | 'sslcaCertFilePathList'].split()] |
---|
73 | except KeyError: |
---|
74 | self.sslCACertList = [] |
---|
75 | |
---|
76 | self.startAttributeAuthorities() |
---|
77 | |
---|
78 | |
---|
79 | class AttributeAuthorityClientTestCase(AttributeAuthorityClientBaseTestCase): |
---|
80 | clntPriKeyPwd = None |
---|
81 | pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----" |
---|
82 | |
---|
83 | def _getCertChainFromProxyCertFile(self, proxyCertFilePath): |
---|
84 | '''Read proxy cert and user cert from a single PEM file and put in |
---|
85 | a list ready for input into SignatureHandler''' |
---|
86 | proxyCertFileTxt = open(proxyCertFilePath).read() |
---|
87 | |
---|
88 | pemPatRE = re.compile(self.__class__.pemPat, re.S) |
---|
89 | x509CertList = pemPatRE.findall(proxyCertFileTxt) |
---|
90 | |
---|
91 | signingCertChain = [X509CertParse(x509Cert) |
---|
92 | for x509Cert in x509CertList] |
---|
93 | |
---|
94 | # Expecting proxy cert first - move this to the end. This will |
---|
95 | # be the cert used to verify the message signature |
---|
96 | signingCertChain.reverse() |
---|
97 | |
---|
98 | return signingCertChain |
---|
99 | |
---|
100 | def setUp(self): |
---|
101 | super(AttributeAuthorityClientTestCase, self).setUp() |
---|
102 | |
---|
103 | if 'NDGSEC_INT_DEBUG' in os.environ: |
---|
104 | import pdb |
---|
105 | pdb.set_trace() |
---|
106 | |
---|
107 | thisSection = self.cfg['setUp'] |
---|
108 | |
---|
109 | # Instantiate WS proxy |
---|
110 | self.siteAClnt = AttributeAuthorityClient(uri=thisSection['uri'], |
---|
111 | sslPeerCertCN=thisSection.get('sslPeerCertCN'), |
---|
112 | sslCACertList=self.sslCACertList, |
---|
113 | cfgFileSection='wsse', |
---|
114 | cfg=self.cfgParser) |
---|
115 | |
---|
116 | def test01GetHostInfo(self): |
---|
117 | """test01GetHostInfo: retrieve info for AA host""" |
---|
118 | hostInfo = self.siteAClnt.getHostInfo() |
---|
119 | print "Host Info:\n %s" % hostInfo |
---|
120 | |
---|
121 | def test02GetTrustedHostInfo(self): |
---|
122 | """test02GetTrustedHostInfo: retrieve trusted host info matching a |
---|
123 | given role""" |
---|
124 | trustedHostInfo = self.siteAClnt.getTrustedHostInfo( |
---|
125 | self.cfg['test02GetTrustedHostInfo']['role']) |
---|
126 | for hostname, hostInfo in trustedHostInfo.items(): |
---|
127 | self.assert_(hostname, "Hostname not set") |
---|
128 | for k, v in hostInfo.items(): |
---|
129 | self.assert_(k, "hostInfo value key unset") |
---|
130 | |
---|
131 | print "Trusted Host Info:\n %s" % trustedHostInfo |
---|
132 | |
---|
133 | def test03GetTrustedHostInfoWithNoMatchingRoleFound(self): |
---|
134 | """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case |
---|
135 | where the input role doesn't match any roles in the target AA's map |
---|
136 | config file""" |
---|
137 | _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound'] |
---|
138 | try: |
---|
139 | trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role']) |
---|
140 | self.fail("Expecting NoMatchingRoleInTrustedHosts exception") |
---|
141 | |
---|
142 | except NoMatchingRoleInTrustedHosts, e: |
---|
143 | print('As expected - no match for role "%s": %s' % |
---|
144 | (_cfg['role'], e)) |
---|
145 | |
---|
146 | |
---|
147 | def test04GetTrustedHostInfoWithNoRole(self): |
---|
148 | """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info |
---|
149 | irrespective of role""" |
---|
150 | trustedHostInfo = self.siteAClnt.getTrustedHostInfo() |
---|
151 | for hostname, hostInfo in trustedHostInfo.items(): |
---|
152 | self.assert_(hostname, "Hostname not set") |
---|
153 | for k, v in hostInfo.items(): |
---|
154 | self.assert_(k, "hostInfo value key unset") |
---|
155 | |
---|
156 | print "Trusted Host Info:\n %s" % trustedHostInfo |
---|
157 | |
---|
158 | |
---|
159 | def test05GetAllHostsInfo(self): |
---|
160 | """test05GetAllHostsInfo: retrieve info for all hosts""" |
---|
161 | allHostInfo = self.siteAClnt.getAllHostsInfo() |
---|
162 | for hostname, hostInfo in allHostInfo.items(): |
---|
163 | self.assert_(hostname, "Hostname not set") |
---|
164 | for k, v in hostInfo.items(): |
---|
165 | self.assert_(k, "hostInfo value key unset") |
---|
166 | |
---|
167 | print "All Hosts Info:\n %s" % allHostInfo |
---|
168 | |
---|
169 | |
---|
170 | def test06GetAttCert(self): |
---|
171 | """test06GetAttCert: Request attribute certificate from NDG Attribute |
---|
172 | Authority Web Service.""" |
---|
173 | _cfg = self.cfg['test06GetAttCert'] |
---|
174 | |
---|
175 | # Read user Certificate into a string ready for passing via WS |
---|
176 | try: |
---|
177 | userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath')) |
---|
178 | userX509CertTxt = open(userX509CertFilePath, 'r').read() |
---|
179 | |
---|
180 | except TypeError: |
---|
181 | # No issuing cert set |
---|
182 | userX509CertTxt = None |
---|
183 | |
---|
184 | except IOError, ioErr: |
---|
185 | raise Exception("Error reading certificate file \"%s\": %s" % |
---|
186 | (ioErr.filename, ioErr.strerror)) |
---|
187 | |
---|
188 | # Make attribute certificate request |
---|
189 | attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt) |
---|
190 | |
---|
191 | print "Attribute Certificate: \n\n:" + str(attCert) |
---|
192 | |
---|
193 | attCert.filePath = xpdVars(_cfg['attCertFilePath']) |
---|
194 | attCert.write() |
---|
195 | |
---|
196 | |
---|
197 | def test07GetAttCertWithUserIdSet(self): |
---|
198 | """test07GetAttCertWithUserIdSet: Request attribute certificate from |
---|
199 | NDG Attribute Authority Web Service setting a specific user Id |
---|
200 | independent of the signer of the SOAP request.""" |
---|
201 | _cfg = self.cfg['test07GetAttCertWithUserIdSet'] |
---|
202 | |
---|
203 | # Read user Certificate into a string ready for passing via WS |
---|
204 | try: |
---|
205 | userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath')) |
---|
206 | userX509CertTxt = open(userX509CertFilePath, 'r').read() |
---|
207 | |
---|
208 | except TypeError: |
---|
209 | # No issuing cert set |
---|
210 | userX509CertTxt = None |
---|
211 | |
---|
212 | except IOError, ioErr: |
---|
213 | raise Exception("Error reading certificate file \"%s\": %s" % |
---|
214 | (ioErr.filename, ioErr.strerror)) |
---|
215 | |
---|
216 | # Make attribute certificate request |
---|
217 | userId = _cfg['userId'] |
---|
218 | attCert = self.siteAClnt.getAttCert(userId=userId, |
---|
219 | userX509Cert=userX509CertTxt) |
---|
220 | |
---|
221 | print "Attribute Certificate: \n\n:" + str(attCert) |
---|
222 | |
---|
223 | attCert.filePath = xpdVars(_cfg['attCertFilePath']) |
---|
224 | attCert.write() |
---|
225 | |
---|
226 | |
---|
227 | def test08GetMappedAttCert(self): |
---|
228 | """test08GetMappedAttCert: Request mapped attribute certificate from |
---|
229 | NDG Attribute Authority Web Service.""" |
---|
230 | _cfg = self.cfg['test08GetMappedAttCert'] |
---|
231 | |
---|
232 | # Read user Certificate into a string ready for passing via WS |
---|
233 | try: |
---|
234 | userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath')) |
---|
235 | userX509CertTxt = open(userX509CertFilePath, 'r').read() |
---|
236 | |
---|
237 | except TypeError: |
---|
238 | # No issuing cert set |
---|
239 | userX509CertTxt = None |
---|
240 | |
---|
241 | except IOError, ioErr: |
---|
242 | raise Exception("Error reading certificate file \"%s\": %s" % \ |
---|
243 | (ioErr.filename, ioErr.strerror)) |
---|
244 | |
---|
245 | # Simlarly for Attribute Certificate |
---|
246 | try: |
---|
247 | userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath'])) |
---|
248 | |
---|
249 | except IOError, ioErr: |
---|
250 | raise Exception("Error reading attribute certificate file \"%s\": " |
---|
251 | "%s" % (ioErr.filename, ioErr.strerror)) |
---|
252 | |
---|
253 | # Make client to site B Attribute Authority |
---|
254 | siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], |
---|
255 | cfgFileSection='wsse', |
---|
256 | cfg=self.cfgParser) |
---|
257 | |
---|
258 | # Make attribute certificate request |
---|
259 | attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt, |
---|
260 | userAttCert=userAttCert) |
---|
261 | print "Attribute Certificate: \n\n:" + str(attCert) |
---|
262 | |
---|
263 | attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath']) |
---|
264 | attCert.write() |
---|
265 | |
---|
266 | |
---|
267 | def test09GetMappedAttCertStressTest(self): |
---|
268 | """test09GetMappedAttCertStressTest: Request mapped attribute |
---|
269 | certificate from NDG Attribute Authority Web Service.""" |
---|
270 | _cfg = self.cfg['test09GetMappedAttCertStressTest'] |
---|
271 | |
---|
272 | # Read user Certificate into a string ready for passing via WS |
---|
273 | try: |
---|
274 | userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath')) |
---|
275 | userX509CertTxt = open(userX509CertFilePath, 'r').read() |
---|
276 | |
---|
277 | except TypeError: |
---|
278 | # No issuing cert set |
---|
279 | userX509CertTxt = None |
---|
280 | |
---|
281 | except IOError, ioErr: |
---|
282 | raise Exception("Error reading certificate file \"%s\": %s" % |
---|
283 | (ioErr.filename, ioErr.strerror)) |
---|
284 | |
---|
285 | # Make client to site B Attribute Authority |
---|
286 | siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], |
---|
287 | cfgFileSection='wsse', |
---|
288 | cfg=self.cfgParser) |
---|
289 | |
---|
290 | acFilePathList = [xpdVars(file) for file in \ |
---|
291 | _cfg['userAttCertFilePathList'].split()] |
---|
292 | |
---|
293 | for acFilePath in acFilePathList: |
---|
294 | try: |
---|
295 | userAttCert = AttCertRead(acFilePath) |
---|
296 | |
---|
297 | except IOError, ioErr: |
---|
298 | raise Exception("Error reading attribute certificate file " |
---|
299 | '"%s": %s' % (ioErr.filename, ioErr.strerror)) |
---|
300 | |
---|
301 | # Make attribute certificate request |
---|
302 | try: |
---|
303 | attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt, |
---|
304 | userAttCert=userAttCert) |
---|
305 | except Exception, e: |
---|
306 | outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \ |
---|
307 | os.path.basename(acFilePath) |
---|
308 | msgFile = open(outFilePfx+".msg", 'w') |
---|
309 | msgFile.write('Failed for "%s": %s\n' % (acFilePath, e)) |
---|
310 | |
---|
311 | |
---|
312 | class AttributeAuthoritySAMLInterfaceTestCase( |
---|
313 | AttributeAuthorityClientBaseTestCase): |
---|
314 | """Separate class for Attribute Authority SAML Attribute Query interface""" |
---|
315 | |
---|
316 | def __init__(self, *arg, **kw): |
---|
317 | super(AttributeAuthoritySAMLInterfaceTestCase, self).__init__(*arg, |
---|
318 | **kw) |
---|
319 | self.startSiteAAttributeAuthority(withSSL=True, port=5443) |
---|
320 | |
---|
321 | def test01SAMLAttributeQuery(self): |
---|
322 | _cfg = self.cfg['test01SAMLAttributeQuery'] |
---|
323 | |
---|
324 | attributeQuery = AttributeQuery() |
---|
325 | attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) |
---|
326 | attributeQuery.id = str(uuid4()) |
---|
327 | attributeQuery.issueInstant = datetime.utcnow() |
---|
328 | |
---|
329 | attributeQuery.issuer = Issuer() |
---|
330 | attributeQuery.issuer.format = Issuer.X509_SUBJECT |
---|
331 | attributeQuery.issuer.value = "/O=Site A/CN=Authorisation Service" |
---|
332 | |
---|
333 | attributeQuery.subject = Subject() |
---|
334 | attributeQuery.subject.nameID = NameID() |
---|
335 | attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT |
---|
336 | attributeQuery.subject.nameID.value = _cfg['subject'] |
---|
337 | xsStringNs = SAMLConstants.XSD_NS+"#"+\ |
---|
338 | XSStringAttributeValue.TYPE_LOCAL_NAME |
---|
339 | fnAttribute = Attribute() |
---|
340 | fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME |
---|
341 | fnAttribute.nameFormat = xsStringNs |
---|
342 | fnAttribute.friendlyName = "FirstName" |
---|
343 | |
---|
344 | attributeQuery.attributes.append(fnAttribute) |
---|
345 | |
---|
346 | lnAttribute = Attribute() |
---|
347 | lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME |
---|
348 | lnAttribute.nameFormat = xsStringNs |
---|
349 | lnAttribute.friendlyName = "LastName" |
---|
350 | |
---|
351 | attributeQuery.attributes.append(lnAttribute) |
---|
352 | |
---|
353 | emailAddressAttribute = Attribute() |
---|
354 | emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME |
---|
355 | emailAddressAttribute.nameFormat = xsStringNs |
---|
356 | emailAddressAttribute.friendlyName = "emailAddress" |
---|
357 | |
---|
358 | attributeQuery.attributes.append(emailAddressAttribute) |
---|
359 | |
---|
360 | siteAAttribute = Attribute() |
---|
361 | siteAAttribute.name = _cfg['siteAttributeName'] |
---|
362 | siteAAttribute.nameFormat = xsStringNs |
---|
363 | |
---|
364 | attributeQuery.attributes.append(siteAAttribute) |
---|
365 | |
---|
366 | binding = SamlSoapBinding() |
---|
367 | response = binding.send(attributeQuery, _cfg['uri']) |
---|
368 | |
---|
369 | self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI) |
---|
370 | |
---|
371 | # Check Query ID matches the query ID the service received |
---|
372 | self.assert_(response.inResponseTo == attributeQuery.id) |
---|
373 | |
---|
374 | now = datetime.utcnow() |
---|
375 | self.assert_(response.issueInstant < now) |
---|
376 | self.assert_(response.assertions[-1].issueInstant < now) |
---|
377 | self.assert_(response.assertions[-1].conditions.notBefore < now) |
---|
378 | self.assert_(response.assertions[-1].conditions.notOnOrAfter > now) |
---|
379 | |
---|
380 | samlResponseElem = ResponseElementTree.toXML(response) |
---|
381 | |
---|
382 | print("SAML Response ...") |
---|
383 | print(ElementTree.tostring(samlResponseElem)) |
---|
384 | print("Pretty print SAML Response ...") |
---|
385 | print(prettyPrint(samlResponseElem)) |
---|
386 | |
---|
387 | def test02SAMLAttributeQueryInvalidIssuer(self): |
---|
388 | _cfg = self.cfg['test02SAMLAttributeQueryInvalidIssuer'] |
---|
389 | |
---|
390 | attributeQuery = AttributeQuery() |
---|
391 | attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) |
---|
392 | attributeQuery.id = str(uuid4()) |
---|
393 | attributeQuery.issueInstant = datetime.utcnow() |
---|
394 | |
---|
395 | attributeQuery.issuer = Issuer() |
---|
396 | attributeQuery.issuer.format = Issuer.X509_SUBJECT |
---|
397 | attributeQuery.issuer.value = "Invalid Site" |
---|
398 | |
---|
399 | attributeQuery.subject = Subject() |
---|
400 | attributeQuery.subject.nameID = NameID() |
---|
401 | attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT |
---|
402 | attributeQuery.subject.nameID.value = _cfg['subject'] |
---|
403 | xsStringNs = SAMLConstants.XSD_NS+"#"+\ |
---|
404 | XSStringAttributeValue.TYPE_LOCAL_NAME |
---|
405 | |
---|
406 | siteAAttribute = Attribute() |
---|
407 | siteAAttribute.name = _cfg['siteAttributeName'] |
---|
408 | siteAAttribute.nameFormat = xsStringNs |
---|
409 | |
---|
410 | attributeQuery.attributes.append(siteAAttribute) |
---|
411 | |
---|
412 | binding = SamlSoapBinding() |
---|
413 | response = binding.send(attributeQuery, _cfg['uri']) |
---|
414 | |
---|
415 | samlResponseElem = ResponseElementTree.toXML(response) |
---|
416 | |
---|
417 | print("SAML Response ...") |
---|
418 | print(ElementTree.tostring(samlResponseElem)) |
---|
419 | print("Pretty print SAML Response ...") |
---|
420 | print(prettyPrint(samlResponseElem)) |
---|
421 | |
---|
422 | self.assert_( |
---|
423 | response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI) |
---|
424 | |
---|
425 | def test03SAMLAttributeQueryUnknownSubject(self): |
---|
426 | _cfg = self.cfg['test03SAMLAttributeQueryUnknownSubject'] |
---|
427 | |
---|
428 | attributeQuery = AttributeQuery() |
---|
429 | attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) |
---|
430 | attributeQuery.id = str(uuid4()) |
---|
431 | attributeQuery.issueInstant = datetime.utcnow() |
---|
432 | |
---|
433 | attributeQuery.issuer = Issuer() |
---|
434 | attributeQuery.issuer.format = Issuer.X509_SUBJECT |
---|
435 | attributeQuery.issuer.value = "/O=Site A/CN=Authorisation Service" |
---|
436 | |
---|
437 | attributeQuery.subject = Subject() |
---|
438 | attributeQuery.subject.nameID = NameID() |
---|
439 | attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT |
---|
440 | attributeQuery.subject.nameID.value = _cfg['subject'] |
---|
441 | xsStringNs = SAMLConstants.XSD_NS+"#"+\ |
---|
442 | XSStringAttributeValue.TYPE_LOCAL_NAME |
---|
443 | |
---|
444 | siteAAttribute = Attribute() |
---|
445 | siteAAttribute.name = _cfg['siteAttributeName'] |
---|
446 | siteAAttribute.nameFormat = xsStringNs |
---|
447 | |
---|
448 | attributeQuery.attributes.append(siteAAttribute) |
---|
449 | |
---|
450 | binding = SamlSoapBinding() |
---|
451 | response = binding.send(attributeQuery, _cfg['uri']) |
---|
452 | |
---|
453 | samlResponseElem = ResponseElementTree.toXML(response) |
---|
454 | print("SAML Response ...") |
---|
455 | print(ElementTree.tostring(samlResponseElem)) |
---|
456 | print("Pretty print SAML Response ...") |
---|
457 | print(prettyPrint(samlResponseElem)) |
---|
458 | |
---|
459 | self.assert_( |
---|
460 | response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI) |
---|
461 | |
---|
462 | def test04SAMLAttributeQueryInvalidAttrName(self): |
---|
463 | _cfg = self.cfg['test04SAMLAttributeQueryInvalidAttrName'] |
---|
464 | |
---|
465 | attributeQuery = AttributeQuery() |
---|
466 | attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) |
---|
467 | attributeQuery.id = str(uuid4()) |
---|
468 | attributeQuery.issueInstant = datetime.utcnow() |
---|
469 | |
---|
470 | attributeQuery.issuer = Issuer() |
---|
471 | attributeQuery.issuer.format = Issuer.X509_SUBJECT |
---|
472 | attributeQuery.issuer.value = "/O=Site A/CN=Authorisation Service" |
---|
473 | |
---|
474 | attributeQuery.subject = Subject() |
---|
475 | attributeQuery.subject.nameID = NameID() |
---|
476 | attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT |
---|
477 | attributeQuery.subject.nameID.value = _cfg['subject'] |
---|
478 | xsStringNs = SAMLConstants.XSD_NS+"#"+\ |
---|
479 | XSStringAttributeValue.TYPE_LOCAL_NAME |
---|
480 | |
---|
481 | invalidAttribute = Attribute() |
---|
482 | invalidAttribute.name = "myInvalidAttributeName" |
---|
483 | invalidAttribute.nameFormat = xsStringNs |
---|
484 | |
---|
485 | attributeQuery.attributes.append(invalidAttribute) |
---|
486 | |
---|
487 | binding = SamlSoapBinding() |
---|
488 | response = binding.send(attributeQuery, _cfg['uri']) |
---|
489 | |
---|
490 | samlResponseElem = ResponseElementTree.toXML(response) |
---|
491 | |
---|
492 | print("SAML Response ...") |
---|
493 | print(ElementTree.tostring(samlResponseElem)) |
---|
494 | print("Pretty print SAML Response ...") |
---|
495 | print(prettyPrint(samlResponseElem)) |
---|
496 | |
---|
497 | self.assert_(response.status.statusCode.value==\ |
---|
498 | StatusCode.INVALID_ATTR_NAME_VALUE_URI) |
---|
499 | |
---|
500 | def test05AttributeQuerySOAPBindingInterface(self): |
---|
501 | _cfg = self.cfg['test05AttributeQuerySOAPBindingInterface'] |
---|
502 | |
---|
503 | binding = AttributeQuerySOAPBinding() |
---|
504 | |
---|
505 | binding.subjectID = AttributeAuthoritySAMLInterfaceTestCase.OPENID_URI |
---|
506 | binding.issuerDN = \ |
---|
507 | AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0] |
---|
508 | |
---|
509 | binding.queryAttributes = EsgDefaultQueryAttributes.ATTRIBUTES |
---|
510 | |
---|
511 | response = binding.send(uri=_cfg['uri']) |
---|
512 | samlResponseElem = ResponseElementTree.toXML(response) |
---|
513 | |
---|
514 | print("SAML Response ...") |
---|
515 | print(ElementTree.tostring(samlResponseElem)) |
---|
516 | print("Pretty print SAML Response ...") |
---|
517 | print(prettyPrint(samlResponseElem)) |
---|
518 | |
---|
519 | self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI) |
---|
520 | |
---|
521 | def test06AttributeQueryFromConfig(self): |
---|
522 | thisSection = 'test06AttributeQueryFromConfig' |
---|
523 | _cfg = self.cfg[thisSection] |
---|
524 | |
---|
525 | binding = AttributeQuerySOAPBinding.fromConfig(self.cfgFilePath, |
---|
526 | section=thisSection, |
---|
527 | prefix='attributeQuery.') |
---|
528 | binding.subjectID = _cfg['subject'] |
---|
529 | response = binding.send(uri=_cfg['uri']) |
---|
530 | samlResponseElem = ResponseElementTree.toXML(response) |
---|
531 | |
---|
532 | print("SAML Response ...") |
---|
533 | print(ElementTree.tostring(samlResponseElem)) |
---|
534 | print("Pretty print SAML Response ...") |
---|
535 | print(prettyPrint(samlResponseElem)) |
---|
536 | |
---|
537 | self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI) |
---|
538 | |
---|
539 | def test07AttributeQuerySslSOAPBindingInterface(self): |
---|
540 | thisSection = 'test07AttributeQuerySslSOAPBindingInterface' |
---|
541 | _cfg = self.cfg[thisSection] |
---|
542 | |
---|
543 | binding = AttributeQuerySslSOAPBinding.fromConfig(self.cfgFilePath, |
---|
544 | section=thisSection, |
---|
545 | prefix='attributeQuery.') |
---|
546 | |
---|
547 | binding.subjectID = _cfg['subject'] |
---|
548 | response = binding.send(uri=_cfg['uri']) |
---|
549 | samlResponseElem = ResponseElementTree.toXML(response) |
---|
550 | |
---|
551 | print("SAML Response ...") |
---|
552 | print(ElementTree.tostring(samlResponseElem)) |
---|
553 | print("Pretty print SAML Response ...") |
---|
554 | print(prettyPrint(samlResponseElem)) |
---|
555 | |
---|
556 | self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI) |
---|
557 | |
---|
558 | if __name__ == "__main__": |
---|
559 | unittest.main() |
---|