source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapattributeinterface.py @ 5791

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapattributeinterface.py@5791
Revision 5791, 9.2 KB checked in by pjkersha, 12 years ago (diff)

Updates and fix for new ndg3beta release:

  • Change Attribute Authority SAML interface issuer format to saml.saml2.core.Issuer.X509_SUBJECT as agreed with Luca for ESG. Updated unit tests.
  • Fix Attribute Authority clockSkew attribute initialisation - create as a deltatime not a float.
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI SAML 2.0 SOAP Attribute Query Interface
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/08/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13
14import unittest
15import os
16from uuid import uuid4
17from datetime import datetime
18
19import paste.fixture
20from paste.deploy import loadapp
21
22from cStringIO import StringIO
23from xml.etree import ElementTree
24
25from ndg.security.common.soap.etree import SOAPEnvelope
26from ndg.security.common.utils.etree import prettyPrint
27
28from saml.saml2.core import Response, Assertion, Attribute, AttributeValue, \
29    AttributeStatement, SAMLVersion, Subject, NameID, Issuer, AttributeQuery, \
30    XSStringAttributeValue, XSGroupRoleAttributeValue, Conditions, Status, \
31    StatusCode
32from saml.xml import XMLConstants
33from saml.xml.etree import AssertionElementTree, AttributeQueryElementTree, \
34    ResponseElementTree, XSGroupRoleAttributeValueElementTree
35 
36   
37class TestApp(object):
38    def __init__(self, global_conf, **app_conf):
39        pass
40   
41    def __call__(self, environ, start_response):
42        response = "404 Not Found"
43        start_response(response,
44                       [('Content-length', str(len(response))),
45                        ('Content-type', 'text/plain')])
46                           
47        return [response]
48
49
50class SOAPAttributeInterfaceMiddlewareTestCase(unittest.TestCase):
51   
52    def __init__(self, *args, **kwargs):
53        here_dir = os.path.dirname(os.path.abspath(__file__))
54        wsgiapp = loadapp('config:test.ini', relative_to=here_dir)
55        self.app = paste.fixture.TestApp(wsgiapp)
56         
57        unittest.TestCase.__init__(self, *args, **kwargs)
58
59    def _createAttributeQuery(self, 
60                        issuer="Site A",
61                        subject="https://openid.localhost/philip.kershaw"):
62        attributeQuery = AttributeQuery()
63        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
64        attributeQuery.id = str(uuid4())
65        attributeQuery.issueInstant = datetime.utcnow()
66       
67        attributeQuery.issuer = Issuer()
68        attributeQuery.issuer.format = Issuer.X509_SUBJECT
69        attributeQuery.issuer.value = issuer
70                       
71        attributeQuery.subject = Subject() 
72        attributeQuery.subject.nameID = NameID()
73        attributeQuery.subject.nameID.format = "urn:esg:openid"
74        attributeQuery.subject.nameID.value = subject
75                                   
76       
77        # special case handling for 'FirstName' attribute
78        fnAttribute = Attribute()
79        fnAttribute.name = "urn:esg:first:name"
80        fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
81        fnAttribute.friendlyName = "FirstName"
82
83        attributeQuery.attributes.append(fnAttribute)
84   
85        # special case handling for 'LastName' attribute
86        lnAttribute = Attribute()
87        lnAttribute.name = "urn:esg:last:name"
88        lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
89        lnAttribute.friendlyName = "LastName"
90
91        attributeQuery.attributes.append(lnAttribute)
92   
93        # special case handling for 'LastName' attribute
94        emailAddressAttribute = Attribute()
95        emailAddressAttribute.name = "urn:esg:email:address"
96        emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\
97                                    XSStringAttributeValue.TYPE_LOCAL_NAME
98        emailAddressAttribute.friendlyName = "emailAddress"
99
100        attributeQuery.attributes.append(emailAddressAttribute) 
101
102        return attributeQuery
103   
104    def _makeRequest(self, attributeQuery=None, **kw):
105        """Convenience method to construct queries for tests"""
106       
107        if attributeQuery is None:
108            attributeQuery = self._createAttributeQuery(**kw)
109           
110        elem = AttributeQueryElementTree.toXML(attributeQuery)
111        soapRequest = SOAPEnvelope()
112        soapRequest.create()
113        soapRequest.body.elem.append(elem)
114       
115        request = soapRequest.serialize()
116       
117        return request
118   
119    def _getSAMLResponse(self, responseBody):
120        """Deserialise response string into ElementTree element"""
121        soapResponse = SOAPEnvelope()
122       
123        responseStream = StringIO()
124        responseStream.write(responseBody)
125        responseStream.seek(0)
126       
127        soapResponse.parse(responseStream)
128       
129        print("Parsed response ...")
130        print(soapResponse.serialize())
131#        print(prettyPrint(soapResponse.elem))
132       
133        response = ResponseElementTree.fromXML(soapResponse.body.elem[0])
134       
135        return response
136   
137    def test01ValidQuery(self):
138        attributeQuery = self._createAttributeQuery()
139        request = self._makeRequest(attributeQuery=attributeQuery)
140       
141        header = {
142            'soapAction': "http://www.oasis-open.org/committees/security",
143            'Content-length': str(len(request)),
144            'Content-type': 'text/xml'
145        }
146        response = self.app.post('/attributeauthority/saml', 
147                                 params=request, 
148                                 headers=header, 
149                                 status=200)
150        print("Response status=%d" % response.status)
151        samlResponse = self._getSAMLResponse(response.body)
152
153        self.assert_(samlResponse.status.statusCode.value == \
154                     StatusCode.SUCCESS_URI)
155        self.assert_(samlResponse.inResponseTo == attributeQuery.id)
156        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
157                     attributeQuery.subject.nameID.value)
158
159    def test02AttributeReleaseDenied(self):
160        request = self._makeRequest(issuer="Site B")
161       
162        header = {
163            'soapAction': "http://www.oasis-open.org/committees/security",
164            'Content-length': str(len(request)),
165            'Content-type': 'text/xml'
166        }
167       
168        response = self.app.post('/attributeauthority/saml', 
169                                 params=request, 
170                                 headers=header, 
171                                 status=200)
172       
173        print("Response status=%d" % response.status)
174       
175        samlResponse = self._getSAMLResponse(response.body)
176
177        self.assert_(samlResponse.status.statusCode.value == \
178                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
179
180    def test03InvalidAttributesRequested(self):
181        attributeQuery = self._createAttributeQuery()
182       
183        # Add an unsupported Attribute name
184        attribute = Attribute()
185        attribute.name = "urn:my:attribute"
186        attribute.nameFormat = XMLConstants.XSD_NS+"#"+\
187                                    XSStringAttributeValue.TYPE_LOCAL_NAME
188        attribute.friendlyName = "myAttribute"
189        attributeQuery.attributes.append(attribute)     
190       
191        request = self._makeRequest(attributeQuery=attributeQuery)
192           
193        header = {
194            'soapAction': "http://www.oasis-open.org/committees/security",
195            'Content-length': str(len(request)),
196            'Content-type': 'text/xml'
197        }
198       
199        response = self.app.post('/attributeauthority/saml', 
200                                 params=request, 
201                                 headers=header, 
202                                 status=200)
203       
204        print("Response status=%d" % response.status)
205       
206        samlResponse = self._getSAMLResponse(response.body)
207
208        self.assert_(samlResponse.status.statusCode.value == \
209                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
210       
211    def test04InvalidIssuer(self):
212        request = self._makeRequest(issuer="My Attribute Query Issuer")
213       
214        header = {
215            'soapAction': "http://www.oasis-open.org/committees/security",
216            'Content-length': str(len(request)),
217            'Content-type': 'text/xml'
218        }
219       
220        response = self.app.post('/attributeauthority/saml', 
221                                 params=request, 
222                                 headers=header, 
223                                 status=200)
224       
225        print("Response status=%d" % response.status)
226       
227        samlResponse = self._getSAMLResponse(response.body)
228
229        self.assert_(samlResponse.status.statusCode.value == \
230                     StatusCode.REQUEST_DENIED_URI)
231
232    def test05UnknownPrincipal(self):
233        request = self._makeRequest(subject="Joe.Bloggs")
234       
235        header = {
236            'soapAction': "http://www.oasis-open.org/committees/security",
237            'Content-length': str(len(request)),
238            'Content-type': 'text/xml'
239        }
240       
241        response = self.app.post('/attributeauthority/saml', 
242                                 params=request, 
243                                 headers=header, 
244                                 status=200)
245       
246        print("Response status=%d" % response.status)
247       
248        samlResponse = self._getSAMLResponse(response.body)
249
250        self.assert_(samlResponse.status.statusCode.value == \
251                     StatusCode.UNKNOWN_PRINCIPAL_URI)
252           
253if __name__ == "__main__":
254    unittest.main()
Note: See TracBrowser for help on using the repository browser.