source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py @ 7077

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py@7077
Revision 7077, 12.8 KB checked in by pjkersha, 9 years ago (diff)
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SAML SOAP Binding client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/10 (moved from test_attributeauthorityclient)"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14import os
15from datetime import datetime
16from uuid import uuid4
17from xml.etree import ElementTree
18
19from ndg.saml.common import SAMLVersion
20from ndg.saml.common.xml import SAMLConstants
21from ndg.saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
22from ndg.saml.saml2.core import (Subject, Issuer, Attribute, NameID, 
23                                 AttributeQuery, StatusCode, 
24                                 XSStringAttributeValue)
25
26from ndg.security.common.saml_utils.binding.soap import SOAPBinding
27from ndg.security.common.saml_utils.binding.soap.attributequery import (
28                                        AttributeQuerySOAPBinding, 
29                                        AttributeQuerySslSOAPBinding)
30from ndg.security.common.saml_utils.esg import (EsgSamlNamespaces,
31                                                EsgDefaultQueryAttributes)
32from ndg.security.test.unit.attributeauthorityclient import \
33                                        AttributeAuthorityClientBaseTestCase
34from ndg.security.common.utils.etree import prettyPrint
35
36   
37class AttributeAuthoritySAMLInterfaceTestCase(
38                                        AttributeAuthorityClientBaseTestCase):
39    """NDG Attribute Authority SAML SOAP Binding client unit tests"""
40    HERE_DIR = os.path.dirname(__file__)
41    CONFIG_FILENAME = 'test_samlattributeauthorityclient.cfg'
42    CONFIG_FILEPATH = os.path.join(HERE_DIR, CONFIG_FILENAME)
43   
44    def __init__(self, *arg, **kw):
45        super(AttributeAuthoritySAMLInterfaceTestCase, self).__init__(*arg, 
46                                                                      **kw)
47       
48        # Run same config but on two different ports - one HTTP and one HTTPS
49        self.startSiteAAttributeAuthority()
50        self.startSiteAAttributeAuthority(withSSL=True, port=5443)
51       
52    def test01AttributeQuery(self):
53        _cfg = self.cfg['test01AttributeQuery']
54       
55        attributeQuery = AttributeQuery()
56        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
57        attributeQuery.id = str(uuid4())
58        attributeQuery.issueInstant = datetime.utcnow()
59       
60        attributeQuery.issuer = Issuer()
61        attributeQuery.issuer.format = Issuer.X509_SUBJECT
62        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
63                       
64        attributeQuery.subject = Subject()
65        attributeQuery.subject.nameID = NameID()
66        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
67        attributeQuery.subject.nameID.value = _cfg['subject']
68        xsStringNs = SAMLConstants.XSD_NS+"#"+\
69                                        XSStringAttributeValue.TYPE_LOCAL_NAME
70        fnAttribute = Attribute()
71        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
72        fnAttribute.nameFormat = xsStringNs
73        fnAttribute.friendlyName = "FirstName"
74
75        attributeQuery.attributes.append(fnAttribute)
76   
77        lnAttribute = Attribute()
78        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
79        lnAttribute.nameFormat = xsStringNs
80        lnAttribute.friendlyName = "LastName"
81
82        attributeQuery.attributes.append(lnAttribute)
83   
84        emailAddressAttribute = Attribute()
85        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
86        emailAddressAttribute.nameFormat = xsStringNs
87        emailAddressAttribute.friendlyName = "emailAddress"
88       
89        attributeQuery.attributes.append(emailAddressAttribute) 
90
91        siteAAttribute = Attribute()
92        siteAAttribute.name = _cfg['siteAttributeName']
93        siteAAttribute.nameFormat = xsStringNs
94       
95        attributeQuery.attributes.append(siteAAttribute) 
96
97        binding = SOAPBinding()
98        binding.serialise = AttributeQueryElementTree.toXML
99        binding.deserialise = ResponseElementTree.fromXML
100        response = binding.send(attributeQuery, _cfg['uri'])
101       
102        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
103       
104        # Check Query ID matches the query ID the service received
105        self.assert_(response.inResponseTo == attributeQuery.id)
106       
107        now = datetime.utcnow()
108        self.assert_(response.issueInstant < now)
109        self.assert_(response.assertions[-1].issueInstant < now)       
110        self.assert_(response.assertions[-1].conditions.notBefore < now) 
111        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
112         
113        samlResponseElem = ResponseElementTree.toXML(response)
114       
115        print("SAML Response ...")
116        print(ElementTree.tostring(samlResponseElem))
117        print("Pretty print SAML Response ...")
118        print(prettyPrint(samlResponseElem))
119             
120    def test02AttributeQueryInvalidIssuer(self):
121        _cfg = self.cfg['test02AttributeQueryInvalidIssuer']
122       
123        attributeQuery = AttributeQuery()
124        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
125        attributeQuery.id = str(uuid4())
126        attributeQuery.issueInstant = datetime.utcnow()
127       
128        attributeQuery.issuer = Issuer()
129        attributeQuery.issuer.format = Issuer.X509_SUBJECT
130        attributeQuery.issuer.value = "/O=Invalid Site/CN=PDP"   
131                       
132        attributeQuery.subject = Subject() 
133        attributeQuery.subject.nameID = NameID()
134        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
135        attributeQuery.subject.nameID.value = _cfg['subject']
136        xsStringNs = SAMLConstants.XSD_NS+"#"+\
137                                        XSStringAttributeValue.TYPE_LOCAL_NAME
138
139        siteAAttribute = Attribute()
140        siteAAttribute.name = _cfg['siteAttributeName']
141        siteAAttribute.nameFormat = xsStringNs
142       
143        attributeQuery.attributes.append(siteAAttribute) 
144
145        binding = SOAPBinding()
146        binding.serialise = AttributeQueryElementTree.toXML
147        binding.deserialise = ResponseElementTree.fromXML
148        response = binding.send(attributeQuery, _cfg['uri'])
149
150        samlResponseElem = ResponseElementTree.toXML(response)
151       
152        print("SAML Response ...")
153        print(ElementTree.tostring(samlResponseElem))
154        print("Pretty print SAML Response ...")
155        print(prettyPrint(samlResponseElem))
156       
157        self.assert_(
158            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
159                   
160    def test03AttributeQueryUnknownSubject(self):
161        _cfg = self.cfg['test03AttributeQueryUnknownSubject']
162       
163        attributeQuery = AttributeQuery()
164        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
165        attributeQuery.id = str(uuid4())
166        attributeQuery.issueInstant = datetime.utcnow()
167       
168        attributeQuery.issuer = Issuer()
169        attributeQuery.issuer.format = Issuer.X509_SUBJECT
170        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
171                       
172        attributeQuery.subject = Subject() 
173        attributeQuery.subject.nameID = NameID()
174        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
175        attributeQuery.subject.nameID.value = _cfg['subject']
176        xsStringNs = SAMLConstants.XSD_NS+"#"+\
177                                        XSStringAttributeValue.TYPE_LOCAL_NAME
178
179        siteAAttribute = Attribute()
180        siteAAttribute.name = _cfg['siteAttributeName']
181        siteAAttribute.nameFormat = xsStringNs
182       
183        attributeQuery.attributes.append(siteAAttribute) 
184
185        binding = SOAPBinding()
186        binding.serialise = AttributeQueryElementTree.toXML
187        binding.deserialise = ResponseElementTree.fromXML
188        response = binding.send(attributeQuery, _cfg['uri'])
189       
190        samlResponseElem = ResponseElementTree.toXML(response)
191        print("SAML Response ...")
192        print(ElementTree.tostring(samlResponseElem))
193        print("Pretty print SAML Response ...")
194        print(prettyPrint(samlResponseElem))
195       
196        self.assert_(
197            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
198             
199    def test04AttributeQueryInvalidAttrName(self):
200        thisSection = 'test04AttributeQueryInvalidAttrName'
201        _cfg = self.cfg[thisSection]
202       
203        attributeQuery = AttributeQuery()
204        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
205        attributeQuery.id = str(uuid4())
206        attributeQuery.issueInstant = datetime.utcnow()
207       
208        attributeQuery.issuer = Issuer()
209        attributeQuery.issuer.format = Issuer.X509_SUBJECT
210        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
211                       
212        attributeQuery.subject = Subject() 
213        attributeQuery.subject.nameID = NameID()
214        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
215        attributeQuery.subject.nameID.value = _cfg['subject']
216        xsStringNs = SAMLConstants.XSD_NS+"#"+\
217                                        XSStringAttributeValue.TYPE_LOCAL_NAME
218
219        invalidAttribute = Attribute()
220        invalidAttribute.name = "myInvalidAttributeName"
221        invalidAttribute.nameFormat = xsStringNs
222       
223        attributeQuery.attributes.append(invalidAttribute) 
224
225        binding = SOAPBinding.fromConfig(
226                     AttributeAuthoritySAMLInterfaceTestCase.CONFIG_FILEPATH, 
227                     prefix='saml.', 
228                     section=thisSection)
229        response = binding.send(attributeQuery, _cfg['uri'])
230       
231        samlResponseElem = ResponseElementTree.toXML(response)
232       
233        print("SAML Response ...")
234        print(ElementTree.tostring(samlResponseElem))
235        print("Pretty print SAML Response ...")
236        print(prettyPrint(samlResponseElem))
237       
238        self.assert_(response.status.statusCode.value==\
239                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
240       
241    def test05AttributeQuerySOAPBindingInterface(self):
242        _cfg = self.cfg['test05AttributeQuerySOAPBindingInterface']
243       
244        binding = AttributeQuerySOAPBinding()
245       
246        binding.subjectID = AttributeAuthoritySAMLInterfaceTestCase.OPENID_URI
247        binding.subjectIdFormat = EsgSamlNamespaces.NAMEID_FORMAT
248        binding.issuerName = \
249            str(AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0])
250        binding.issuerFormat = Issuer.X509_SUBJECT
251       
252        binding.queryAttributes = EsgDefaultQueryAttributes.ATTRIBUTES
253       
254        response = binding.send(uri=_cfg['uri'])
255        samlResponseElem = ResponseElementTree.toXML(response)
256       
257        print("SAML Response ...")
258        print(ElementTree.tostring(samlResponseElem))
259        print("Pretty print SAML Response ...")
260        print(prettyPrint(samlResponseElem))
261       
262        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
263
264    def test06AttributeQueryFromConfig(self):
265        thisSection = 'test06AttributeQueryFromConfig'
266        _cfg = self.cfg[thisSection]
267       
268        binding = AttributeQuerySOAPBinding.fromConfig(self.cfgFilePath, 
269                                                       section=thisSection,
270                                                       prefix='attributeQuery.')
271        binding.subjectID = _cfg['subject']
272        response = binding.send(uri=_cfg['uri'])
273        samlResponseElem = ResponseElementTree.toXML(response)
274       
275        print("SAML Response ...")
276        print(ElementTree.tostring(samlResponseElem))
277        print("Pretty print SAML Response ...")
278        print(prettyPrint(samlResponseElem))
279       
280        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
281       
282    def test07AttributeQuerySslSOAPBindingInterface(self):
283        thisSection = 'test07AttributeQuerySslSOAPBindingInterface'
284        _cfg = self.cfg[thisSection]
285       
286        binding = AttributeQuerySslSOAPBinding.fromConfig(self.cfgFilePath, 
287                                                       section=thisSection,
288                                                       prefix='attributeQuery.')
289       
290        binding.subjectID = _cfg['subject']
291        response = binding.send(uri=_cfg['uri'])
292        samlResponseElem = ResponseElementTree.toXML(response)
293       
294        print("SAML Response ...")
295        print(ElementTree.tostring(samlResponseElem))
296        print("Pretty print SAML Response ...")
297        print(prettyPrint(samlResponseElem))
298       
299        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
300
301       
302if __name__ == "__main__":
303    unittest.main()
Note: See TracBrowser for help on using the repository browser.