source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapattributeinterface.py @ 6069

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapattributeinterface.py@6069
Revision 6069, 9.2 KB checked in by pjkersha, 10 years ago (diff)

Re-release as rc1

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI SAML 2.0 SOAP Attribute Query Interface
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/08/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12import unittest
13import os
14from uuid import uuid4
15from datetime import datetime
16
17import paste.fixture
18from paste.deploy import loadapp
19
20from cStringIO import StringIO
21
22from saml.saml2.core import (Attribute, SAMLVersion, Subject, NameID, Issuer, 
23                             AttributeQuery, XSStringAttributeValue, 
24                             StatusCode)
25from saml.xml import XMLConstants
26from saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
27
28from ndg.security.common.soap.etree import SOAPEnvelope
29from ndg.security.common.utils.etree import prettyPrint
30from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
31 
32   
33class TestApp(object):
34    def __init__(self, global_conf, **app_conf):
35        pass
36   
37    def __call__(self, environ, start_response):
38        response = "404 Not Found"
39        start_response(response,
40                       [('Content-length', str(len(response))),
41                        ('Content-type', 'text/plain')])
42                           
43        return [response]
44
45
46class SOAPAttributeInterfaceMiddlewareTestCase(unittest.TestCase):
47   
48    def __init__(self, *args, **kwargs):
49        here_dir = os.path.dirname(os.path.abspath(__file__))
50        wsgiapp = loadapp('config:test.ini', relative_to=here_dir)
51        self.app = paste.fixture.TestApp(wsgiapp)
52         
53        unittest.TestCase.__init__(self, *args, **kwargs)
54
55    def _createAttributeQuery(self, 
56                        issuer="/O=Site A/CN=Authorisation Service",
57                        subject="https://openid.localhost/philip.kershaw"):
58        attributeQuery = AttributeQuery()
59        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
60        attributeQuery.id = str(uuid4())
61        attributeQuery.issueInstant = datetime.utcnow()
62       
63        attributeQuery.issuer = Issuer()
64        attributeQuery.issuer.format = Issuer.X509_SUBJECT
65        attributeQuery.issuer.value = issuer
66                       
67        attributeQuery.subject = Subject() 
68        attributeQuery.subject.nameID = NameID()
69        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
70        attributeQuery.subject.nameID.value = subject
71                                   
72       
73        # special case handling for 'FirstName' attribute
74        fnAttribute = Attribute()
75        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
76        fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
77        fnAttribute.friendlyName = "FirstName"
78
79        attributeQuery.attributes.append(fnAttribute)
80   
81        # special case handling for 'LastName' attribute
82        lnAttribute = Attribute()
83        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
84        lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
85        lnAttribute.friendlyName = "LastName"
86
87        attributeQuery.attributes.append(lnAttribute)
88   
89        # special case handling for 'LastName' attribute
90        emailAddressAttribute = Attribute()
91        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
92        emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\
93                                    XSStringAttributeValue.TYPE_LOCAL_NAME
94        emailAddressAttribute.friendlyName = "emailAddress"
95
96        attributeQuery.attributes.append(emailAddressAttribute) 
97
98        return attributeQuery
99   
100    def _makeRequest(self, attributeQuery=None, **kw):
101        """Convenience method to construct queries for tests"""
102       
103        if attributeQuery is None:
104            attributeQuery = self._createAttributeQuery(**kw)
105           
106        elem = AttributeQueryElementTree.toXML(attributeQuery)
107        soapRequest = SOAPEnvelope()
108        soapRequest.create()
109        soapRequest.body.elem.append(elem)
110       
111        request = soapRequest.serialize()
112       
113        return request
114   
115    def _getSAMLResponse(self, responseBody):
116        """Deserialise response string into ElementTree element"""
117        soapResponse = SOAPEnvelope()
118       
119        responseStream = StringIO()
120        responseStream.write(responseBody)
121        responseStream.seek(0)
122       
123        soapResponse.parse(responseStream)
124       
125        print("Parsed response ...")
126        print(soapResponse.serialize())
127#        print(prettyPrint(soapResponse.elem))
128       
129        response = ResponseElementTree.fromXML(soapResponse.body.elem[0])
130       
131        return response
132   
133    def test01ValidQuery(self):
134        attributeQuery = self._createAttributeQuery()
135        request = self._makeRequest(attributeQuery=attributeQuery)
136       
137        header = {
138            'soapAction': "http://www.oasis-open.org/committees/security",
139            'Content-length': str(len(request)),
140            'Content-type': 'text/xml'
141        }
142        response = self.app.post('/attributeauthority/saml', 
143                                 params=request, 
144                                 headers=header, 
145                                 status=200)
146        print("Response status=%d" % response.status)
147        samlResponse = self._getSAMLResponse(response.body)
148
149        self.assert_(samlResponse.status.statusCode.value == \
150                     StatusCode.SUCCESS_URI)
151        self.assert_(samlResponse.inResponseTo == attributeQuery.id)
152        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
153                     attributeQuery.subject.nameID.value)
154
155    def test02AttributeReleaseDenied(self):
156        request = self._makeRequest(issuer="/O=Site B/CN=Authorisation Service")
157       
158        header = {
159            'soapAction': "http://www.oasis-open.org/committees/security",
160            'Content-length': str(len(request)),
161            'Content-type': 'text/xml'
162        }
163       
164        response = self.app.post('/attributeauthority/saml', 
165                                 params=request, 
166                                 headers=header, 
167                                 status=200)
168       
169        print("Response status=%d" % response.status)
170       
171        samlResponse = self._getSAMLResponse(response.body)
172
173        self.assert_(samlResponse.status.statusCode.value == \
174                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
175
176    def test03InvalidAttributesRequested(self):
177        attributeQuery = self._createAttributeQuery()
178       
179        # Add an unsupported Attribute name
180        attribute = Attribute()
181        attribute.name = "urn:my:attribute"
182        attribute.nameFormat = XMLConstants.XSD_NS+"#"+\
183                                    XSStringAttributeValue.TYPE_LOCAL_NAME
184        attribute.friendlyName = "myAttribute"
185        attributeQuery.attributes.append(attribute)     
186       
187        request = self._makeRequest(attributeQuery=attributeQuery)
188           
189        header = {
190            'soapAction': "http://www.oasis-open.org/committees/security",
191            'Content-length': str(len(request)),
192            'Content-type': 'text/xml'
193        }
194       
195        response = self.app.post('/attributeauthority/saml', 
196                                 params=request, 
197                                 headers=header, 
198                                 status=200)
199       
200        print("Response status=%d" % response.status)
201       
202        samlResponse = self._getSAMLResponse(response.body)
203
204        self.assert_(samlResponse.status.statusCode.value == \
205                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
206       
207    def test04InvalidIssuer(self):
208        request = self._makeRequest(issuer="My Attribute Query Issuer")
209       
210        header = {
211            'soapAction': "http://www.oasis-open.org/committees/security",
212            'Content-length': str(len(request)),
213            'Content-type': 'text/xml'
214        }
215       
216        response = self.app.post('/attributeauthority/saml', 
217                                 params=request, 
218                                 headers=header, 
219                                 status=200)
220       
221        print("Response status=%d" % response.status)
222       
223        samlResponse = self._getSAMLResponse(response.body)
224
225        self.assert_(samlResponse.status.statusCode.value == \
226                     StatusCode.REQUEST_DENIED_URI)
227
228    def test05UnknownPrincipal(self):
229        request = self._makeRequest(subject="Joe.Bloggs")
230       
231        header = {
232            'soapAction': "http://www.oasis-open.org/committees/security",
233            'Content-length': str(len(request)),
234            'Content-type': 'text/xml'
235        }
236       
237        response = self.app.post('/attributeauthority/saml', 
238                                 params=request, 
239                                 headers=header, 
240                                 status=200)
241       
242        print("Response status=%d" % response.status)
243       
244        samlResponse = self._getSAMLResponse(response.body)
245
246        self.assert_(samlResponse.status.statusCode.value == \
247                     StatusCode.UNKNOWN_PRINCIPAL_URI)
248           
249if __name__ == "__main__":
250    unittest.main()
Note: See TracBrowser for help on using the repository browser.