source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py @ 7827

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py@7827
Revision 7827, 14.8 KB checked in by pjkersha, 10 years ago (diff)

Incomplete - task 16: NDG Security 2.x.x - incl. updated Paster templates

  • integrating SQLite test user db into 'Site A' test Attribute Authority
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SAML SOAP Binding client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/10 (moved from test_attributeauthorityclient)"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14import os
15from datetime import datetime
16from uuid import uuid4
17from xml.etree import ElementTree
18
19from ndg.saml.common import SAMLVersion
20from ndg.saml.common.xml import SAMLConstants
21from ndg.saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
22from ndg.saml.saml2.core import (Subject, Issuer, Attribute, NameID, 
23                                 AttributeQuery, StatusCode, 
24                                 XSStringAttributeValue)
25
26from ndg.saml.saml2.binding.soap.client import SOAPBinding
27from ndg.saml.saml2.binding.soap.client.attributequery import (
28                                        AttributeQuerySOAPBinding, 
29                                        AttributeQuerySslSOAPBinding)
30from ndg.security.common.saml_utils.esgf import (ESGFSamlNamespaces,
31                                                 ESGFDefaultQueryAttributes,
32                                                 ESGFGroupRoleAttributeValue)
33from ndg.security.common.utils.etree import prettyPrint
34from ndg.security.test.unit.attributeauthorityclient import \
35                                        AttributeAuthorityClientBaseTestCase
36   
37   
38class AttributeAuthoritySAMLInterfaceTestCase(
39                                        AttributeAuthorityClientBaseTestCase):
40    """NDG Attribute Authority SAML SOAP Binding client unit tests"""
41    HERE_DIR = os.path.dirname(__file__)
42    CONFIG_FILENAME = 'test_samlattributeauthorityclient.cfg'
43    CONFIG_FILEPATH = os.path.join(HERE_DIR, CONFIG_FILENAME)
44   
45    def __init__(self, *arg, **kw):
46        super(AttributeAuthoritySAMLInterfaceTestCase, self).__init__(*arg, 
47                                                                      **kw)
48       
49        # Run same config but on two different ports - one HTTP and one HTTPS
50        self.startSiteAAttributeAuthority()
51        self.startSiteAAttributeAuthority(withSSL=True, port=5443)
52       
53    def test01AttributeQuery(self):
54        _cfg = self.cfg['test01AttributeQuery']
55       
56        attributeQuery = AttributeQuery()
57        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
58        attributeQuery.id = str(uuid4())
59        attributeQuery.issueInstant = datetime.utcnow()
60       
61        attributeQuery.issuer = Issuer()
62        attributeQuery.issuer.format = Issuer.X509_SUBJECT
63        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
64                       
65        attributeQuery.subject = Subject()
66        attributeQuery.subject.nameID = NameID()
67        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
68        attributeQuery.subject.nameID.value = _cfg['subject']
69        xsStringNs = SAMLConstants.XSD_NS+"#"+\
70                                        XSStringAttributeValue.TYPE_LOCAL_NAME
71        fnAttribute = Attribute()
72        fnAttribute.name = ESGFSamlNamespaces.FIRSTNAME_ATTRNAME
73        fnAttribute.nameFormat = xsStringNs
74        fnAttribute.friendlyName = "FirstName"
75
76        attributeQuery.attributes.append(fnAttribute)
77   
78        lnAttribute = Attribute()
79        lnAttribute.name = ESGFSamlNamespaces.LASTNAME_ATTRNAME
80        lnAttribute.nameFormat = xsStringNs
81        lnAttribute.friendlyName = "LastName"
82
83        attributeQuery.attributes.append(lnAttribute)
84   
85        emailAddressAttribute = Attribute()
86        emailAddressAttribute.name = ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME
87        emailAddressAttribute.nameFormat = xsStringNs
88        emailAddressAttribute.friendlyName = "emailAddress"
89       
90        attributeQuery.attributes.append(emailAddressAttribute) 
91
92        siteAAttribute = Attribute()
93        siteAAttribute.name = _cfg['siteAttributeName']
94        siteAAttribute.nameFormat = xsStringNs
95       
96        attributeQuery.attributes.append(siteAAttribute) 
97
98        binding = SOAPBinding()
99        binding.serialise = AttributeQueryElementTree.toXML
100        binding.deserialise = ResponseElementTree.fromXML
101        response = binding.send(attributeQuery, _cfg['uri'])
102       
103        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
104       
105        # Check Query ID matches the query ID the service received
106        self.assert_(response.inResponseTo == attributeQuery.id)
107       
108        now = datetime.utcnow()
109        self.assert_(response.issueInstant < now)
110        self.assert_(response.assertions[-1].issueInstant < now)       
111        self.assert_(response.assertions[-1].conditions.notBefore < now) 
112        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
113         
114        samlResponseElem = ResponseElementTree.toXML(response)
115       
116        print("SAML Response ...")
117        print(ElementTree.tostring(samlResponseElem))
118        print("Pretty print SAML Response ...")
119        print(prettyPrint(samlResponseElem))
120             
121    def test02AttributeQueryInvalidIssuer(self):
122        _cfg = self.cfg['test02AttributeQueryInvalidIssuer']
123       
124        attributeQuery = AttributeQuery()
125        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
126        attributeQuery.id = str(uuid4())
127        attributeQuery.issueInstant = datetime.utcnow()
128       
129        attributeQuery.issuer = Issuer()
130        attributeQuery.issuer.format = Issuer.X509_SUBJECT
131        attributeQuery.issuer.value = "/O=Invalid Site/CN=PDP"   
132                       
133        attributeQuery.subject = Subject() 
134        attributeQuery.subject.nameID = NameID()
135        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
136        attributeQuery.subject.nameID.value = _cfg['subject']
137        xsStringNs = SAMLConstants.XSD_NS+"#"+\
138                                        XSStringAttributeValue.TYPE_LOCAL_NAME
139
140        siteAAttribute = Attribute()
141        siteAAttribute.name = _cfg['siteAttributeName']
142        siteAAttribute.nameFormat = xsStringNs
143       
144        attributeQuery.attributes.append(siteAAttribute) 
145
146        binding = SOAPBinding()
147        binding.serialise = AttributeQueryElementTree.toXML
148        binding.deserialise = ResponseElementTree.fromXML
149        response = binding.send(attributeQuery, _cfg['uri'])
150
151        samlResponseElem = ResponseElementTree.toXML(response)
152       
153        print("SAML Response ...")
154        print(ElementTree.tostring(samlResponseElem))
155        print("Pretty print SAML Response ...")
156        print(prettyPrint(samlResponseElem))
157       
158        self.assert_(
159            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
160                   
161    def test03AttributeQueryUnknownSubject(self):
162        _cfg = self.cfg['test03AttributeQueryUnknownSubject']
163       
164        attributeQuery = AttributeQuery()
165        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
166        attributeQuery.id = str(uuid4())
167        attributeQuery.issueInstant = datetime.utcnow()
168       
169        attributeQuery.issuer = Issuer()
170        attributeQuery.issuer.format = Issuer.X509_SUBJECT
171        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
172                       
173        attributeQuery.subject = Subject() 
174        attributeQuery.subject.nameID = NameID()
175        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
176        attributeQuery.subject.nameID.value = _cfg['subject']
177        xsStringNs = SAMLConstants.XSD_NS+"#"+\
178                                        XSStringAttributeValue.TYPE_LOCAL_NAME
179
180        siteAAttribute = Attribute()
181        siteAAttribute.name = _cfg['siteAttributeName']
182        siteAAttribute.nameFormat = xsStringNs
183       
184        attributeQuery.attributes.append(siteAAttribute) 
185
186        binding = SOAPBinding()
187        binding.serialise = AttributeQueryElementTree.toXML
188        binding.deserialise = ResponseElementTree.fromXML
189        response = binding.send(attributeQuery, _cfg['uri'])
190       
191        samlResponseElem = ResponseElementTree.toXML(response)
192        print("SAML Response ...")
193        print(ElementTree.tostring(samlResponseElem))
194        print("Pretty print SAML Response ...")
195        print(prettyPrint(samlResponseElem))
196       
197        self.assert_(
198            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
199             
200    def test04AttributeQueryInvalidAttrName(self):
201        thisSection = 'test04AttributeQueryInvalidAttrName'
202        _cfg = self.cfg[thisSection]
203       
204        attributeQuery = AttributeQuery()
205        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
206        attributeQuery.id = str(uuid4())
207        attributeQuery.issueInstant = datetime.utcnow()
208       
209        attributeQuery.issuer = Issuer()
210        attributeQuery.issuer.format = Issuer.X509_SUBJECT
211        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
212                       
213        attributeQuery.subject = Subject() 
214        attributeQuery.subject.nameID = NameID()
215        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
216        attributeQuery.subject.nameID.value = _cfg['subject']
217        xsStringNs = SAMLConstants.XSD_NS+"#"+\
218                                        XSStringAttributeValue.TYPE_LOCAL_NAME
219
220        invalidAttribute = Attribute()
221        invalidAttribute.name = "myInvalidAttributeName"
222        invalidAttribute.nameFormat = xsStringNs
223       
224        attributeQuery.attributes.append(invalidAttribute) 
225
226        binding = SOAPBinding.fromConfig(
227                     AttributeAuthoritySAMLInterfaceTestCase.CONFIG_FILEPATH, 
228                     prefix='saml.', 
229                     section=thisSection)
230        response = binding.send(attributeQuery, _cfg['uri'])
231       
232        samlResponseElem = ResponseElementTree.toXML(response)
233       
234        print("SAML Response ...")
235        print(ElementTree.tostring(samlResponseElem))
236        print("Pretty print SAML Response ...")
237        print(prettyPrint(samlResponseElem))
238       
239        self.assert_(response.status.statusCode.value==\
240                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
241             
242    def test05AttributeQueryWithESGFAttributeType(self):
243        # Test interface with custom ESGF Group/Role attribute type
244        thisSection = 'test05AttributeQueryWithESGFAttributeType'
245        _cfg = self.cfg[thisSection]
246       
247        attributeQuery = AttributeQuery()
248        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
249        attributeQuery.id = str(uuid4())
250        attributeQuery.issueInstant = datetime.utcnow()
251       
252        attributeQuery.issuer = Issuer()
253        attributeQuery.issuer.format = Issuer.X509_SUBJECT
254        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
255                       
256        attributeQuery.subject = Subject() 
257        attributeQuery.subject.nameID = NameID()
258        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
259        attributeQuery.subject.nameID.value = _cfg['subject']
260       
261        groupRoleAttribute = Attribute()
262        groupRoleAttribute.name = self.__class__.ATTRIBUTE_NAMES[-1]
263        groupRoleAttribute.nameFormat = \
264            ESGFGroupRoleAttributeValue.TYPE_LOCAL_NAME
265       
266        attributeQuery.attributes.append(groupRoleAttribute) 
267
268        binding = SOAPBinding.fromConfig(
269                     AttributeAuthoritySAMLInterfaceTestCase.CONFIG_FILEPATH, 
270                     prefix='saml.',
271                     section=thisSection)
272       
273        response = binding.send(attributeQuery, _cfg['uri'])
274       
275        samlResponseElem = ResponseElementTree.toXML(response)
276       
277        print("SAML Response ...")
278        print(ElementTree.tostring(samlResponseElem))
279        print("Pretty print SAML Response ...")
280        print(prettyPrint(samlResponseElem))
281       
282        self.assert_(response.assertions[0].attributeStatements[0].attributes[0
283            ].attributeValues[0].value == ('siteagroup', 'default'))
284       
285        self.assert_(response.status.statusCode.value == StatusCode.SUCCESS_URI)
286       
287    def test06AttributeQuerySOAPBindingInterface(self):
288        _cfg = self.cfg['test06AttributeQuerySOAPBindingInterface']
289       
290        binding = AttributeQuerySOAPBinding()
291       
292        binding.subjectID = AttributeAuthoritySAMLInterfaceTestCase.OPENID_URI
293        binding.subjectIdFormat = ESGFSamlNamespaces.NAMEID_FORMAT
294        binding.issuerName = \
295            str(AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0])
296        binding.issuerFormat = Issuer.X509_SUBJECT
297       
298        binding.queryAttributes = ESGFDefaultQueryAttributes.ATTRIBUTES
299       
300        response = binding.send(uri=_cfg['uri'])
301        samlResponseElem = ResponseElementTree.toXML(response)
302       
303        print("SAML Response ...")
304        print(ElementTree.tostring(samlResponseElem))
305        print("Pretty print SAML Response ...")
306        print(prettyPrint(samlResponseElem))
307       
308        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
309
310    def test07AttributeQueryFromConfig(self):
311        thisSection = 'test07AttributeQueryFromConfig'
312        _cfg = self.cfg[thisSection]
313       
314        binding = AttributeQuerySOAPBinding.fromConfig(self.cfgFilePath, 
315                                                       section=thisSection,
316                                                       prefix='attributeQuery.')
317        binding.subjectID = _cfg['subject']
318        response = binding.send(uri=_cfg['uri'])
319        samlResponseElem = ResponseElementTree.toXML(response)
320       
321        print("SAML Response ...")
322        print(ElementTree.tostring(samlResponseElem))
323        print("Pretty print SAML Response ...")
324        print(prettyPrint(samlResponseElem))
325       
326        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
327       
328    def test08AttributeQuerySslSOAPBindingInterface(self):
329        thisSection = 'test08AttributeQuerySslSOAPBindingInterface'
330        _cfg = self.cfg[thisSection]
331       
332        binding = AttributeQuerySslSOAPBinding.fromConfig(self.cfgFilePath, 
333                                                       section=thisSection,
334                                                       prefix='attributeQuery.')
335       
336        binding.subjectID = _cfg['subject']
337        response = binding.send(uri=_cfg['uri'])
338        samlResponseElem = ResponseElementTree.toXML(response)
339       
340        print("SAML Response ...")
341        print(ElementTree.tostring(samlResponseElem))
342        print("Pretty print SAML Response ...")
343        print(prettyPrint(samlResponseElem))
344       
345        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
346
347       
348if __name__ == "__main__":
349    unittest.main()
Note: See TracBrowser for help on using the repository browser.