source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py @ 6575

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py@6575
Revision 6575, 14.1 KB checked in by pjkersha, 11 years ago (diff)

Changes for addition of AuthzDecisionQuery? WSGI interface (Authorisation service)

Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SAML SOAP Binding client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/10 (moved from test_attributeauthorityclient)"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14import os
15from datetime import datetime
16from uuid import uuid4
17from xml.etree import ElementTree
18
19from saml.common import SAMLVersion
20from saml.common.xml import SAMLConstants
21from saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
22from saml.saml2.core import (Subject, Issuer, Attribute, NameID, AttributeQuery,
23                             StatusCode, XSStringAttributeValue, )
24
25from ndg.security.common.saml_utils.binding.soap import SOAPBinding
26from ndg.security.common.saml_utils.binding.soap.attributequery import (
27                                        AttributeQuerySOAPBinding, 
28                                        AttributeQuerySslSOAPBinding)
29from ndg.security.common.saml_utils.esg import (EsgSamlNamespaces,
30                                                EsgDefaultQueryAttributes)
31from ndg.security.test.unit.attributeauthorityclient import \
32                                        AttributeAuthorityClientBaseTestCase
33from ndg.security.common.utils.etree import prettyPrint
34
35   
36class AttributeAuthoritySAMLInterfaceTestCase(
37                                        AttributeAuthorityClientBaseTestCase):
38    """NDG Attribute Authority SAML SOAP Binding client unit tests"""
39    CONFIG_FILENAME = 'test_samlattributeauthorityclient.cfg'
40    CONFIG_FILEPATH = os.path.join(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
41                                   CONFIG_FILENAME)
42   
43    def __init__(self, *arg, **kw):
44        super(AttributeAuthoritySAMLInterfaceTestCase, self).__init__(*arg, 
45                                                                      **kw)
46       
47        # Run same config but on two different ports - one HTTP and one HTTPS
48        self.startSiteAAttributeAuthority()
49        self.startSiteAAttributeAuthority(withSSL=True, port=5443)
50       
51    def test01AttributeQuery(self):
52        _cfg = self.cfg['test01AttributeQuery']
53       
54        attributeQuery = AttributeQuery()
55        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
56        attributeQuery.id = str(uuid4())
57        attributeQuery.issueInstant = datetime.utcnow()
58       
59        attributeQuery.issuer = Issuer()
60        attributeQuery.issuer.format = Issuer.X509_SUBJECT
61        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
62                       
63        attributeQuery.subject = Subject()
64        attributeQuery.subject.nameID = NameID()
65        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
66        attributeQuery.subject.nameID.value = _cfg['subject']
67        xsStringNs = SAMLConstants.XSD_NS+"#"+\
68                                        XSStringAttributeValue.TYPE_LOCAL_NAME
69        fnAttribute = Attribute()
70        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
71        fnAttribute.nameFormat = xsStringNs
72        fnAttribute.friendlyName = "FirstName"
73
74        attributeQuery.attributes.append(fnAttribute)
75   
76        lnAttribute = Attribute()
77        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
78        lnAttribute.nameFormat = xsStringNs
79        lnAttribute.friendlyName = "LastName"
80
81        attributeQuery.attributes.append(lnAttribute)
82   
83        emailAddressAttribute = Attribute()
84        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
85        emailAddressAttribute.nameFormat = xsStringNs
86        emailAddressAttribute.friendlyName = "emailAddress"
87       
88        attributeQuery.attributes.append(emailAddressAttribute) 
89
90        siteAAttribute = Attribute()
91        siteAAttribute.name = _cfg['siteAttributeName']
92        siteAAttribute.nameFormat = xsStringNs
93       
94        attributeQuery.attributes.append(siteAAttribute) 
95
96        binding = SOAPBinding()
97        binding.serialise = AttributeQueryElementTree.toXML
98        binding.deserialise = ResponseElementTree.fromXML
99        response = binding.send(attributeQuery, _cfg['uri'])
100       
101        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
102       
103        # Check Query ID matches the query ID the service received
104        self.assert_(response.inResponseTo == attributeQuery.id)
105       
106        now = datetime.utcnow()
107        self.assert_(response.issueInstant < now)
108        self.assert_(response.assertions[-1].issueInstant < now)       
109        self.assert_(response.assertions[-1].conditions.notBefore < now) 
110        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
111         
112        samlResponseElem = ResponseElementTree.toXML(response)
113       
114        print("SAML Response ...")
115        print(ElementTree.tostring(samlResponseElem))
116        print("Pretty print SAML Response ...")
117        print(prettyPrint(samlResponseElem))
118             
119    def test02AttributeQueryInvalidIssuer(self):
120        _cfg = self.cfg['test02AttributeQueryInvalidIssuer']
121       
122        attributeQuery = AttributeQuery()
123        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
124        attributeQuery.id = str(uuid4())
125        attributeQuery.issueInstant = datetime.utcnow()
126       
127        attributeQuery.issuer = Issuer()
128        attributeQuery.issuer.format = Issuer.X509_SUBJECT
129        attributeQuery.issuer.value = "/O=Invalid Site/CN=PDP"   
130                       
131        attributeQuery.subject = Subject() 
132        attributeQuery.subject.nameID = NameID()
133        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
134        attributeQuery.subject.nameID.value = _cfg['subject']
135        xsStringNs = SAMLConstants.XSD_NS+"#"+\
136                                        XSStringAttributeValue.TYPE_LOCAL_NAME
137
138        siteAAttribute = Attribute()
139        siteAAttribute.name = _cfg['siteAttributeName']
140        siteAAttribute.nameFormat = xsStringNs
141       
142        attributeQuery.attributes.append(siteAAttribute) 
143
144        binding = SOAPBinding()
145        binding.serialise = AttributeQueryElementTree.toXML
146        binding.deserialise = ResponseElementTree.fromXML
147        response = binding.send(attributeQuery, _cfg['uri'])
148
149        samlResponseElem = ResponseElementTree.toXML(response)
150       
151        print("SAML Response ...")
152        print(ElementTree.tostring(samlResponseElem))
153        print("Pretty print SAML Response ...")
154        print(prettyPrint(samlResponseElem))
155       
156        self.assert_(
157            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
158                   
159    def test03AttributeQueryUnknownSubject(self):
160        _cfg = self.cfg['test03AttributeQueryUnknownSubject']
161       
162        attributeQuery = AttributeQuery()
163        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
164        attributeQuery.id = str(uuid4())
165        attributeQuery.issueInstant = datetime.utcnow()
166       
167        attributeQuery.issuer = Issuer()
168        attributeQuery.issuer.format = Issuer.X509_SUBJECT
169        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
170                       
171        attributeQuery.subject = Subject() 
172        attributeQuery.subject.nameID = NameID()
173        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
174        attributeQuery.subject.nameID.value = _cfg['subject']
175        xsStringNs = SAMLConstants.XSD_NS+"#"+\
176                                        XSStringAttributeValue.TYPE_LOCAL_NAME
177
178        siteAAttribute = Attribute()
179        siteAAttribute.name = _cfg['siteAttributeName']
180        siteAAttribute.nameFormat = xsStringNs
181       
182        attributeQuery.attributes.append(siteAAttribute) 
183
184        binding = SOAPBinding()
185        binding.serialise = AttributeQueryElementTree.toXML
186        binding.deserialise = ResponseElementTree.fromXML
187        response = binding.send(attributeQuery, _cfg['uri'])
188       
189        samlResponseElem = ResponseElementTree.toXML(response)
190        print("SAML Response ...")
191        print(ElementTree.tostring(samlResponseElem))
192        print("Pretty print SAML Response ...")
193        print(prettyPrint(samlResponseElem))
194       
195        self.assert_(
196            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
197             
198    def test04AttributeQueryInvalidAttrName(self):
199        thisSection = 'test04AttributeQueryInvalidAttrName'
200        _cfg = self.cfg[thisSection]
201       
202        attributeQuery = AttributeQuery()
203        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
204        attributeQuery.id = str(uuid4())
205        attributeQuery.issueInstant = datetime.utcnow()
206       
207        attributeQuery.issuer = Issuer()
208        attributeQuery.issuer.format = Issuer.X509_SUBJECT
209        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
210                       
211        attributeQuery.subject = Subject() 
212        attributeQuery.subject.nameID = NameID()
213        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
214        attributeQuery.subject.nameID.value = _cfg['subject']
215        xsStringNs = SAMLConstants.XSD_NS+"#"+\
216                                        XSStringAttributeValue.TYPE_LOCAL_NAME
217
218        invalidAttribute = Attribute()
219        invalidAttribute.name = "myInvalidAttributeName"
220        invalidAttribute.nameFormat = xsStringNs
221       
222        attributeQuery.attributes.append(invalidAttribute) 
223
224        binding = SOAPBinding.fromConfig(
225                     AttributeAuthoritySAMLInterfaceTestCase.CONFIG_FILEPATH, 
226                     prefix='saml.', 
227                     section=thisSection)
228        response = binding.send(attributeQuery, _cfg['uri'])
229       
230        samlResponseElem = ResponseElementTree.toXML(response)
231       
232        print("SAML Response ...")
233        print(ElementTree.tostring(samlResponseElem))
234        print("Pretty print SAML Response ...")
235        print(prettyPrint(samlResponseElem))
236       
237        self.assert_(response.status.statusCode.value==\
238                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
239       
240    def test05AttributeQuerySOAPBindingInterface(self):
241        _cfg = self.cfg['test05AttributeQuerySOAPBindingInterface']
242       
243        binding = AttributeQuerySOAPBinding()
244       
245        binding.subjectID = AttributeAuthoritySAMLInterfaceTestCase.OPENID_URI
246        binding.subjectIdFormat = EsgSamlNamespaces.NAMEID_FORMAT
247        binding.issuerName = \
248            str(AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0])
249        binding.issuerFormat = Issuer.X509_SUBJECT
250       
251        binding.queryAttributes = EsgDefaultQueryAttributes.ATTRIBUTES
252       
253        response = binding.send(uri=_cfg['uri'])
254        samlResponseElem = ResponseElementTree.toXML(response)
255       
256        print("SAML Response ...")
257        print(ElementTree.tostring(samlResponseElem))
258        print("Pretty print SAML Response ...")
259        print(prettyPrint(samlResponseElem))
260       
261        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
262
263    def test06AttributeQueryFromConfig(self):
264        thisSection = 'test06AttributeQueryFromConfig'
265        _cfg = self.cfg[thisSection]
266       
267        binding = AttributeQuerySOAPBinding.fromConfig(self.cfgFilePath, 
268                                                       section=thisSection,
269                                                       prefix='attributeQuery.')
270        binding.subjectID = _cfg['subject']
271        response = binding.send(uri=_cfg['uri'])
272        samlResponseElem = ResponseElementTree.toXML(response)
273       
274        print("SAML Response ...")
275        print(ElementTree.tostring(samlResponseElem))
276        print("Pretty print SAML Response ...")
277        print(prettyPrint(samlResponseElem))
278       
279        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
280       
281    def test07AttributeQuerySslSOAPBindingInterface(self):
282        thisSection = 'test07AttributeQuerySslSOAPBindingInterface'
283        _cfg = self.cfg[thisSection]
284       
285        binding = AttributeQuerySslSOAPBinding.fromConfig(self.cfgFilePath, 
286                                                       section=thisSection,
287                                                       prefix='attributeQuery.')
288       
289        binding.subjectID = _cfg['subject']
290        response = binding.send(uri=_cfg['uri'])
291        samlResponseElem = ResponseElementTree.toXML(response)
292       
293        print("SAML Response ...")
294        print(ElementTree.tostring(samlResponseElem))
295        print("Pretty print SAML Response ...")
296        print(prettyPrint(samlResponseElem))
297       
298        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
299             
300    def test08AuthzDecisionQuery(self):
301        _cfg = self.cfg['test02AuthzDecisionQuery']
302       
303        query = AuthzDecisionQuery()
304        query.version = SAMLVersion(SAMLVersion.VERSION_20)
305        query.id = str(uuid4())
306        query.issueInstant = datetime.utcnow()
307       
308        query.issuer = Issuer()
309        query.issuer.format = Issuer.X509_SUBJECT
310        query.issuer.value = str(
311                AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0])
312                       
313        query.subject = Subject() 
314        query.subject.nameID = NameID()
315        query.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
316        query.subject.nameID.value = _cfg['subject']
317
318        binding = SOAPBinding()
319        binding.serialise = AuthzDecisionQueryElementTree.toXML
320        binding.deserialise = ResponseElementTree.fromXML
321        response = binding.send(query, _cfg['uri'])
322
323        samlResponseElem = ResponseElementTree.toXML(response)
324       
325        print("SAML Response ...")
326        print(ElementTree.tostring(samlResponseElem))
327        print("Pretty print SAML Response ...")
328        print(prettyPrint(samlResponseElem))
329       
330        self.assert_(
331            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
332
333       
334if __name__ == "__main__":
335    unittest.main()
Note: See TracBrowser for help on using the repository browser.