source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py @ 7829

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py@7829
Revision 7829, 15.0 KB checked in by pjkersha, 10 years ago (diff)

Incomplete - task 16: NDG Security 2.x.x - incl. updated Paster templates

  • Completed Attribute Service template and tested standalone
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SAML SOAP Binding client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/10 (moved from test_attributeauthorityclient)"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14import os
15from datetime import datetime
16from uuid import uuid4
17from xml.etree import ElementTree
18
19from ndg.saml.common import SAMLVersion
20from ndg.saml.common.xml import SAMLConstants
21from ndg.saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
22from ndg.saml.saml2.core import (Subject, Issuer, Attribute, NameID, 
23                                 AttributeQuery, StatusCode, 
24                                 XSStringAttributeValue)
25
26from ndg.saml.saml2.binding.soap.client import SOAPBinding
27from ndg.saml.saml2.binding.soap.client.attributequery import (
28                                        AttributeQuerySOAPBinding, 
29                                        AttributeQuerySslSOAPBinding)
30from ndg.security.common.saml_utils.esgf import (ESGFSamlNamespaces,
31                                                 ESGFDefaultQueryAttributes,
32                                                 ESGFGroupRoleAttributeValue)
33from ndg.security.common.saml_utils.esgf.xml.etree import \
34                                        ESGFResponseElementTree
35from ndg.security.common.utils.etree import prettyPrint
36from ndg.security.test.unit.attributeauthorityclient import \
37                                        AttributeAuthorityClientBaseTestCase
38   
39   
40class AttributeAuthoritySAMLInterfaceTestCase(
41                                        AttributeAuthorityClientBaseTestCase):
42    """NDG Attribute Authority SAML SOAP Binding client unit tests"""
43    HERE_DIR = os.path.dirname(__file__)
44    CONFIG_FILENAME = 'test_samlattributeauthorityclient.cfg'
45    CONFIG_FILEPATH = os.path.join(HERE_DIR, CONFIG_FILENAME)
46   
47    def __init__(self, *arg, **kw):
48        super(AttributeAuthoritySAMLInterfaceTestCase, self).__init__(*arg, 
49                                                                      **kw)
50       
51        # Run same config but on two different ports - one HTTP and one HTTPS
52        self.startSiteAAttributeAuthority()
53        self.startSiteAAttributeAuthority(withSSL=True, port=5443)
54       
55    def test01AttributeQuery(self):
56        _cfg = self.cfg['test01AttributeQuery']
57       
58        attributeQuery = AttributeQuery()
59        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
60        attributeQuery.id = str(uuid4())
61        attributeQuery.issueInstant = datetime.utcnow()
62       
63        attributeQuery.issuer = Issuer()
64        attributeQuery.issuer.format = Issuer.X509_SUBJECT
65        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
66                       
67        attributeQuery.subject = Subject()
68        attributeQuery.subject.nameID = NameID()
69        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
70        attributeQuery.subject.nameID.value = _cfg['subject']
71        xsStringNs = SAMLConstants.XSD_NS+"#"+\
72                                        XSStringAttributeValue.TYPE_LOCAL_NAME
73        fnAttribute = Attribute()
74        fnAttribute.name = ESGFSamlNamespaces.FIRSTNAME_ATTRNAME
75        fnAttribute.nameFormat = xsStringNs
76        fnAttribute.friendlyName = "FirstName"
77
78        attributeQuery.attributes.append(fnAttribute)
79   
80        lnAttribute = Attribute()
81        lnAttribute.name = ESGFSamlNamespaces.LASTNAME_ATTRNAME
82        lnAttribute.nameFormat = xsStringNs
83        lnAttribute.friendlyName = "LastName"
84
85        attributeQuery.attributes.append(lnAttribute)
86   
87        emailAddressAttribute = Attribute()
88        emailAddressAttribute.name = ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME
89        emailAddressAttribute.nameFormat = xsStringNs
90        emailAddressAttribute.friendlyName = "emailAddress"
91       
92        attributeQuery.attributes.append(emailAddressAttribute) 
93
94        siteAAttribute = Attribute()
95        siteAAttribute.name = _cfg['siteAttributeName']
96        siteAAttribute.nameFormat = xsStringNs
97       
98        attributeQuery.attributes.append(siteAAttribute) 
99
100        binding = SOAPBinding()
101        binding.serialise = AttributeQueryElementTree.toXML
102        binding.deserialise = ResponseElementTree.fromXML
103        response = binding.send(attributeQuery, _cfg['uri'])
104       
105        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
106       
107        # Check Query ID matches the query ID the service received
108        self.assert_(response.inResponseTo == attributeQuery.id)
109       
110        now = datetime.utcnow()
111        self.assert_(response.issueInstant < now)
112        self.assert_(response.assertions[-1].issueInstant < now)       
113        self.assert_(response.assertions[-1].conditions.notBefore < now) 
114        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
115         
116        samlResponseElem = ResponseElementTree.toXML(response)
117       
118        print("SAML Response ...")
119        print(ElementTree.tostring(samlResponseElem))
120        print("Pretty print SAML Response ...")
121        print(prettyPrint(samlResponseElem))
122             
123    def test02AttributeQueryInvalidIssuer(self):
124        _cfg = self.cfg['test02AttributeQueryInvalidIssuer']
125       
126        attributeQuery = AttributeQuery()
127        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
128        attributeQuery.id = str(uuid4())
129        attributeQuery.issueInstant = datetime.utcnow()
130       
131        attributeQuery.issuer = Issuer()
132        attributeQuery.issuer.format = Issuer.X509_SUBJECT
133        attributeQuery.issuer.value = "/O=Invalid Site/CN=PDP"   
134                       
135        attributeQuery.subject = Subject() 
136        attributeQuery.subject.nameID = NameID()
137        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
138        attributeQuery.subject.nameID.value = _cfg['subject']
139        xsStringNs = SAMLConstants.XSD_NS+"#"+\
140                                        XSStringAttributeValue.TYPE_LOCAL_NAME
141
142        siteAAttribute = Attribute()
143        siteAAttribute.name = _cfg['siteAttributeName']
144        siteAAttribute.nameFormat = xsStringNs
145       
146        attributeQuery.attributes.append(siteAAttribute) 
147
148        binding = SOAPBinding()
149        binding.serialise = AttributeQueryElementTree.toXML
150        binding.deserialise = ResponseElementTree.fromXML
151        response = binding.send(attributeQuery, _cfg['uri'])
152
153        samlResponseElem = ResponseElementTree.toXML(response)
154       
155        print("SAML Response ...")
156        print(ElementTree.tostring(samlResponseElem))
157        print("Pretty print SAML Response ...")
158        print(prettyPrint(samlResponseElem))
159       
160        self.assert_(
161            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
162                   
163    def test03AttributeQueryUnknownSubject(self):
164        _cfg = self.cfg['test03AttributeQueryUnknownSubject']
165       
166        attributeQuery = AttributeQuery()
167        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
168        attributeQuery.id = str(uuid4())
169        attributeQuery.issueInstant = datetime.utcnow()
170       
171        attributeQuery.issuer = Issuer()
172        attributeQuery.issuer.format = Issuer.X509_SUBJECT
173        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
174                       
175        attributeQuery.subject = Subject() 
176        attributeQuery.subject.nameID = NameID()
177        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
178        attributeQuery.subject.nameID.value = _cfg['subject']
179        xsStringNs = SAMLConstants.XSD_NS+"#"+\
180                                        XSStringAttributeValue.TYPE_LOCAL_NAME
181
182        siteAAttribute = Attribute()
183        siteAAttribute.name = _cfg['siteAttributeName']
184        siteAAttribute.nameFormat = xsStringNs
185       
186        attributeQuery.attributes.append(siteAAttribute) 
187
188        binding = SOAPBinding()
189        binding.serialise = AttributeQueryElementTree.toXML
190        binding.deserialise = ResponseElementTree.fromXML
191        response = binding.send(attributeQuery, _cfg['uri'])
192       
193        samlResponseElem = ResponseElementTree.toXML(response)
194        print("SAML Response ...")
195        print(ElementTree.tostring(samlResponseElem))
196        print("Pretty print SAML Response ...")
197        print(prettyPrint(samlResponseElem))
198       
199        self.assert_(
200            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
201             
202    def test04AttributeQueryInvalidAttrName(self):
203        thisSection = 'test04AttributeQueryInvalidAttrName'
204        _cfg = self.cfg[thisSection]
205       
206        attributeQuery = AttributeQuery()
207        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
208        attributeQuery.id = str(uuid4())
209        attributeQuery.issueInstant = datetime.utcnow()
210       
211        attributeQuery.issuer = Issuer()
212        attributeQuery.issuer.format = Issuer.X509_SUBJECT
213        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
214                       
215        attributeQuery.subject = Subject() 
216        attributeQuery.subject.nameID = NameID()
217        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
218        attributeQuery.subject.nameID.value = _cfg['subject']
219        xsStringNs = SAMLConstants.XSD_NS+"#"+\
220                                        XSStringAttributeValue.TYPE_LOCAL_NAME
221
222        invalidAttribute = Attribute()
223        invalidAttribute.name = "myInvalidAttributeName"
224        invalidAttribute.nameFormat = xsStringNs
225       
226        attributeQuery.attributes.append(invalidAttribute) 
227
228        binding = SOAPBinding.fromConfig(
229                     AttributeAuthoritySAMLInterfaceTestCase.CONFIG_FILEPATH, 
230                     prefix='saml.', 
231                     section=thisSection)
232        response = binding.send(attributeQuery, _cfg['uri'])
233       
234        samlResponseElem = ResponseElementTree.toXML(response)
235       
236        print("SAML Response ...")
237        print(ElementTree.tostring(samlResponseElem))
238        print("Pretty print SAML Response ...")
239        print(prettyPrint(samlResponseElem))
240       
241        self.assert_(response.status.statusCode.value==\
242                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
243             
244    def test05AttributeQueryWithESGFAttributeType(self):
245        # Test interface with custom ESGF Group/Role attribute type
246        thisSection = 'test05AttributeQueryWithESGFAttributeType'
247        _cfg = self.cfg[thisSection]
248       
249        attributeQuery = AttributeQuery()
250        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
251        attributeQuery.id = str(uuid4())
252        attributeQuery.issueInstant = datetime.utcnow()
253       
254        attributeQuery.issuer = Issuer()
255        attributeQuery.issuer.format = Issuer.X509_SUBJECT
256        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
257                       
258        attributeQuery.subject = Subject() 
259        attributeQuery.subject.nameID = NameID()
260        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
261        attributeQuery.subject.nameID.value = _cfg['subject']
262       
263        groupRoleAttribute = Attribute()
264        groupRoleAttribute.name = self.__class__.ATTRIBUTE_NAMES[-1]
265        groupRoleAttribute.nameFormat = \
266            ESGFGroupRoleAttributeValue.TYPE_LOCAL_NAME
267       
268        attributeQuery.attributes.append(groupRoleAttribute) 
269
270        binding = SOAPBinding.fromConfig(
271                     AttributeAuthoritySAMLInterfaceTestCase.CONFIG_FILEPATH, 
272                     prefix='saml.',
273                     section=thisSection)
274       
275        response = binding.send(attributeQuery, _cfg['uri'])
276       
277        samlResponseElem = ESGFResponseElementTree.toXML(response)
278       
279        print("SAML Response ...")
280        print(ElementTree.tostring(samlResponseElem))
281        print("Pretty print SAML Response ...")
282        print(prettyPrint(samlResponseElem))
283       
284        self.assert_(response.assertions[0].attributeStatements[0].attributes[0
285            ].attributeValues[0].value == ('siteagroup', 'default'))
286       
287        self.assert_(response.status.statusCode.value == StatusCode.SUCCESS_URI)
288       
289    def test06AttributeQuerySOAPBindingInterface(self):
290        _cfg = self.cfg['test06AttributeQuerySOAPBindingInterface']
291       
292        binding = AttributeQuerySOAPBinding()
293       
294        binding.subjectID = AttributeAuthoritySAMLInterfaceTestCase.OPENID_URI
295        binding.subjectIdFormat = ESGFSamlNamespaces.NAMEID_FORMAT
296        binding.issuerName = \
297            str(AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0])
298        binding.issuerFormat = Issuer.X509_SUBJECT
299       
300        binding.queryAttributes = ESGFDefaultQueryAttributes.ATTRIBUTES
301       
302        response = binding.send(uri=_cfg['uri'])
303        samlResponseElem = ResponseElementTree.toXML(response)
304       
305        print("SAML Response ...")
306        print(ElementTree.tostring(samlResponseElem))
307        print("Pretty print SAML Response ...")
308        print(prettyPrint(samlResponseElem))
309       
310        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
311
312    def test07AttributeQueryFromConfig(self):
313        thisSection = 'test07AttributeQueryFromConfig'
314        _cfg = self.cfg[thisSection]
315       
316        binding = AttributeQuerySOAPBinding.fromConfig(self.cfgFilePath, 
317                                                       section=thisSection,
318                                                       prefix='attributeQuery.')
319        binding.subjectID = _cfg['subject']
320        response = binding.send(uri=_cfg['uri'])
321        samlResponseElem = ResponseElementTree.toXML(response)
322       
323        print("SAML Response ...")
324        print(ElementTree.tostring(samlResponseElem))
325        print("Pretty print SAML Response ...")
326        print(prettyPrint(samlResponseElem))
327       
328        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
329       
330    def test08AttributeQuerySslSOAPBindingInterface(self):
331        thisSection = 'test08AttributeQuerySslSOAPBindingInterface'
332        _cfg = self.cfg[thisSection]
333       
334        binding = AttributeQuerySslSOAPBinding.fromConfig(self.cfgFilePath, 
335                                                       section=thisSection,
336                                                       prefix='attributeQuery.')
337       
338        binding.subjectID = _cfg['subject']
339        response = binding.send(uri=_cfg['uri'])
340        samlResponseElem = ResponseElementTree.toXML(response)
341       
342        print("SAML Response ...")
343        print(ElementTree.tostring(samlResponseElem))
344        print("Pretty print SAML Response ...")
345        print(prettyPrint(samlResponseElem))
346       
347        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
348
349       
350if __name__ == "__main__":
351    unittest.main()
Note: See TracBrowser for help on using the repository browser.