source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py @ 6571

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py@6571
Revision 6571, 11.9 KB checked in by pjkersha, 11 years ago (diff)

Refactored SAML SOAP Binding unit test class into a separate module test_samlattributeauthorityclient

Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SAML SOAP Binding client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/10 (moved from test_attributeauthorityclient)"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14from datetime import datetime
15from uuid import uuid4
16from xml.etree import ElementTree
17
18from saml.common import SAMLVersion
19from saml.common.xml import SAMLConstants
20from saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
21from saml.saml2.core import (Subject, Issuer, Attribute, NameID, AttributeQuery,
22                             StatusCode, XSStringAttributeValue, )
23
24from ndg.security.common.saml_utils.binding.soap import SOAPBinding
25from ndg.security.common.saml_utils.binding.soap.attributequery import (
26                                        AttributeQuerySOAPBinding, 
27                                        AttributeQuerySslSOAPBinding)
28from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
29from ndg.security.test.unit.attributeauthorityclient import \
30                                        AttributeAuthorityClientBaseTestCase
31from ndg.security.common.utils.etree import prettyPrint
32
33   
34class AttributeAuthoritySAMLInterfaceTestCase(
35                                        AttributeAuthorityClientBaseTestCase):
36    """NDG Attribute Authority SAML SOAP Binding client unit tests"""
37    CONFIG_FILENAME = 'test_samlattributeauthorityclient.cfg'
38   
39    def __init__(self, *arg, **kw):
40        super(AttributeAuthoritySAMLInterfaceTestCase, self).__init__(*arg, 
41                                                                      **kw)
42        self.startSiteAAttributeAuthority(withSSL=True, port=5443)
43       
44    def test01SAMLAttributeQuery(self):
45        _cfg = self.cfg['test01SAMLAttributeQuery']
46       
47        attributeQuery = AttributeQuery()
48        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
49        attributeQuery.id = str(uuid4())
50        attributeQuery.issueInstant = datetime.utcnow()
51       
52        attributeQuery.issuer = Issuer()
53        attributeQuery.issuer.format = Issuer.X509_SUBJECT
54        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
55                       
56        attributeQuery.subject = Subject() 
57        attributeQuery.subject.nameID = NameID()
58        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
59        attributeQuery.subject.nameID.value = _cfg['subject']
60        xsStringNs = SAMLConstants.XSD_NS+"#"+\
61                                        XSStringAttributeValue.TYPE_LOCAL_NAME
62        fnAttribute = Attribute()
63        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
64        fnAttribute.nameFormat = xsStringNs
65        fnAttribute.friendlyName = "FirstName"
66
67        attributeQuery.attributes.append(fnAttribute)
68   
69        lnAttribute = Attribute()
70        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
71        lnAttribute.nameFormat = xsStringNs
72        lnAttribute.friendlyName = "LastName"
73
74        attributeQuery.attributes.append(lnAttribute)
75   
76        emailAddressAttribute = Attribute()
77        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
78        emailAddressAttribute.nameFormat = xsStringNs
79        emailAddressAttribute.friendlyName = "emailAddress"
80       
81        attributeQuery.attributes.append(emailAddressAttribute) 
82
83        siteAAttribute = Attribute()
84        siteAAttribute.name = _cfg['siteAttributeName']
85        siteAAttribute.nameFormat = xsStringNs
86       
87        attributeQuery.attributes.append(siteAAttribute) 
88
89        binding = SOAPBinding()
90        binding.serialise = AttributeQueryElementTree.toXML
91        binding.deserialise = ResponseElementTree.fromXML
92        response = binding.send(attributeQuery, _cfg['uri'])
93       
94        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
95       
96        # Check Query ID matches the query ID the service received
97        self.assert_(response.inResponseTo == attributeQuery.id)
98       
99        now = datetime.utcnow()
100        self.assert_(response.issueInstant < now)
101        self.assert_(response.assertions[-1].issueInstant < now)       
102        self.assert_(response.assertions[-1].conditions.notBefore < now) 
103        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
104         
105        samlResponseElem = ResponseElementTree.toXML(response)
106       
107        print("SAML Response ...")
108        print(ElementTree.tostring(samlResponseElem))
109        print("Pretty print SAML Response ...")
110        print(prettyPrint(samlResponseElem))
111             
112    def test02SAMLAttributeQueryInvalidIssuer(self):
113        _cfg = self.cfg['test02SAMLAttributeQueryInvalidIssuer']
114       
115        attributeQuery = AttributeQuery()
116        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
117        attributeQuery.id = str(uuid4())
118        attributeQuery.issueInstant = datetime.utcnow()
119       
120        attributeQuery.issuer = Issuer()
121        attributeQuery.issuer.format = Issuer.X509_SUBJECT
122        attributeQuery.issuer.value = "Invalid Site"   
123                       
124        attributeQuery.subject = Subject() 
125        attributeQuery.subject.nameID = NameID()
126        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
127        attributeQuery.subject.nameID.value = _cfg['subject']
128        xsStringNs = SAMLConstants.XSD_NS+"#"+\
129                                        XSStringAttributeValue.TYPE_LOCAL_NAME
130
131        siteAAttribute = Attribute()
132        siteAAttribute.name = _cfg['siteAttributeName']
133        siteAAttribute.nameFormat = xsStringNs
134       
135        attributeQuery.attributes.append(siteAAttribute) 
136
137        binding = SOAPBinding()
138        response = binding.send(attributeQuery, _cfg['uri'])
139
140        samlResponseElem = ResponseElementTree.toXML(response)
141       
142        print("SAML Response ...")
143        print(ElementTree.tostring(samlResponseElem))
144        print("Pretty print SAML Response ...")
145        print(prettyPrint(samlResponseElem))
146       
147        self.assert_(
148            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
149                   
150    def test03SAMLAttributeQueryUnknownSubject(self):
151        _cfg = self.cfg['test03SAMLAttributeQueryUnknownSubject']
152       
153        attributeQuery = AttributeQuery()
154        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
155        attributeQuery.id = str(uuid4())
156        attributeQuery.issueInstant = datetime.utcnow()
157       
158        attributeQuery.issuer = Issuer()
159        attributeQuery.issuer.format = Issuer.X509_SUBJECT
160        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
161                       
162        attributeQuery.subject = Subject() 
163        attributeQuery.subject.nameID = NameID()
164        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
165        attributeQuery.subject.nameID.value = _cfg['subject']
166        xsStringNs = SAMLConstants.XSD_NS+"#"+\
167                                        XSStringAttributeValue.TYPE_LOCAL_NAME
168
169        siteAAttribute = Attribute()
170        siteAAttribute.name = _cfg['siteAttributeName']
171        siteAAttribute.nameFormat = xsStringNs
172       
173        attributeQuery.attributes.append(siteAAttribute) 
174
175        binding = SOAPBinding()
176        response = binding.send(attributeQuery, _cfg['uri'])
177       
178        samlResponseElem = ResponseElementTree.toXML(response)
179        print("SAML Response ...")
180        print(ElementTree.tostring(samlResponseElem))
181        print("Pretty print SAML Response ...")
182        print(prettyPrint(samlResponseElem))
183       
184        self.assert_(
185            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
186             
187    def test04SAMLAttributeQueryInvalidAttrName(self):
188        _cfg = self.cfg['test04SAMLAttributeQueryInvalidAttrName']
189       
190        attributeQuery = AttributeQuery()
191        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
192        attributeQuery.id = str(uuid4())
193        attributeQuery.issueInstant = datetime.utcnow()
194       
195        attributeQuery.issuer = Issuer()
196        attributeQuery.issuer.format = Issuer.X509_SUBJECT
197        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
198                       
199        attributeQuery.subject = Subject() 
200        attributeQuery.subject.nameID = NameID()
201        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
202        attributeQuery.subject.nameID.value = _cfg['subject']
203        xsStringNs = SAMLConstants.XSD_NS+"#"+\
204                                        XSStringAttributeValue.TYPE_LOCAL_NAME
205
206        invalidAttribute = Attribute()
207        invalidAttribute.name = "myInvalidAttributeName"
208        invalidAttribute.nameFormat = xsStringNs
209       
210        attributeQuery.attributes.append(invalidAttribute) 
211
212        binding = SOAPBinding()
213        response = binding.send(attributeQuery, _cfg['uri'])
214       
215        samlResponseElem = ResponseElementTree.toXML(response)
216       
217        print("SAML Response ...")
218        print(ElementTree.tostring(samlResponseElem))
219        print("Pretty print SAML Response ...")
220        print(prettyPrint(samlResponseElem))
221       
222        self.assert_(response.status.statusCode.value==\
223                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
224       
225    def test05AttributeQuerySOAPBindingInterface(self):
226        _cfg = self.cfg['test05AttributeQuerySOAPBindingInterface']
227       
228        binding = AttributeQuerySOAPBinding()
229       
230        binding.subjectID = AttributeAuthoritySAMLInterfaceTestCase.OPENID_URI
231        binding.issuerDN = \
232            AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0]       
233       
234        binding.queryAttributes = EsgDefaultQueryAttributes.ATTRIBUTES
235       
236        response = binding.send(uri=_cfg['uri'])
237        samlResponseElem = ResponseElementTree.toXML(response)
238       
239        print("SAML Response ...")
240        print(ElementTree.tostring(samlResponseElem))
241        print("Pretty print SAML Response ...")
242        print(prettyPrint(samlResponseElem))
243       
244        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
245
246    def test06AttributeQueryFromConfig(self):
247        thisSection = 'test06AttributeQueryFromConfig'
248        _cfg = self.cfg[thisSection]
249       
250        binding = AttributeQuerySOAPBinding.fromConfig(self.cfgFilePath, 
251                                                       section=thisSection,
252                                                       prefix='attributeQuery.')
253        binding.subjectID = _cfg['subject']
254        response = binding.send(uri=_cfg['uri'])
255        samlResponseElem = ResponseElementTree.toXML(response)
256       
257        print("SAML Response ...")
258        print(ElementTree.tostring(samlResponseElem))
259        print("Pretty print SAML Response ...")
260        print(prettyPrint(samlResponseElem))
261       
262        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
263       
264    def test07AttributeQuerySslSOAPBindingInterface(self):
265        thisSection = 'test07AttributeQuerySslSOAPBindingInterface'
266        _cfg = self.cfg[thisSection]
267       
268        binding = AttributeQuerySslSOAPBinding.fromConfig(self.cfgFilePath, 
269                                                       section=thisSection,
270                                                       prefix='attributeQuery.')
271       
272        binding.subjectID = _cfg['subject']
273        response = binding.send(uri=_cfg['uri'])
274        samlResponseElem = ResponseElementTree.toXML(response)
275       
276        print("SAML Response ...")
277        print(ElementTree.tostring(samlResponseElem))
278        print("Pretty print SAML Response ...")
279        print(prettyPrint(samlResponseElem))
280       
281        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
282
283       
284if __name__ == "__main__":
285    unittest.main()
Note: See TracBrowser for help on using the repository browser.