source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapattributeinterface.py @ 7077

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapattributeinterface.py@7077
Revision 7077, 8.6 KB checked in by pjkersha, 9 years ago (diff)
  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI SAML 2.0 SOAP Attribute Query Interface
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/08/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import unittest
13from uuid import uuid4
14from datetime import datetime
15from cStringIO import StringIO
16
17from ndg.saml.saml2.core import (Attribute, SAMLVersion, Subject, NameID, Issuer, 
18                             AttributeQuery, XSStringAttributeValue, 
19                             StatusCode)
20from ndg.saml.xml import XMLConstants
21from ndg.saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
22
23from ndg.security.common.soap.etree import SOAPEnvelope
24from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
25from ndg.security.test.unit.wsgi.saml import SoapSamlInterfaceMiddlewareTestCase
26
27
28class SOAPAttributeInterfaceMiddlewareTestCase(
29                                        SoapSamlInterfaceMiddlewareTestCase):
30    CONFIG_FILENAME = 'attribute-interface.ini'
31   
32    def _createAttributeQuery(self, 
33                        issuer="/O=Site A/CN=Authorisation Service",
34                        subject="https://openid.localhost/philip.kershaw"):
35        attributeQuery = AttributeQuery()
36        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
37        attributeQuery.id = str(uuid4())
38        attributeQuery.issueInstant = datetime.utcnow()
39       
40        attributeQuery.issuer = Issuer()
41        attributeQuery.issuer.format = Issuer.X509_SUBJECT
42        attributeQuery.issuer.value = issuer
43                       
44        attributeQuery.subject = Subject() 
45        attributeQuery.subject.nameID = NameID()
46        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
47        attributeQuery.subject.nameID.value = subject
48                                   
49       
50        # special case handling for 'FirstName' attribute
51        fnAttribute = Attribute()
52        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
53        fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
54        fnAttribute.friendlyName = "FirstName"
55
56        attributeQuery.attributes.append(fnAttribute)
57   
58        # special case handling for 'LastName' attribute
59        lnAttribute = Attribute()
60        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
61        lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
62        lnAttribute.friendlyName = "LastName"
63
64        attributeQuery.attributes.append(lnAttribute)
65   
66        # special case handling for 'LastName' attribute
67        emailAddressAttribute = Attribute()
68        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
69        emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\
70                                    XSStringAttributeValue.TYPE_LOCAL_NAME
71        emailAddressAttribute.friendlyName = "emailAddress"
72
73        attributeQuery.attributes.append(emailAddressAttribute) 
74
75        return attributeQuery
76   
77    def _makeRequest(self, attributeQuery=None, **kw):
78        """Convenience method to construct queries for tests"""
79       
80        if attributeQuery is None:
81            attributeQuery = self._createAttributeQuery(**kw)
82           
83        elem = AttributeQueryElementTree.toXML(attributeQuery)
84        soapRequest = SOAPEnvelope()
85        soapRequest.create()
86        soapRequest.body.elem.append(elem)
87       
88        request = soapRequest.serialize()
89       
90        return request
91   
92    def _getSAMLResponse(self, responseBody):
93        """Deserialise response string into ElementTree element"""
94        soapResponse = SOAPEnvelope()
95       
96        responseStream = StringIO()
97        responseStream.write(responseBody)
98        responseStream.seek(0)
99       
100        soapResponse.parse(responseStream)
101       
102        print("Parsed response ...")
103        print(soapResponse.serialize())
104#        print(prettyPrint(soapResponse.elem))
105       
106        response = ResponseElementTree.fromXML(soapResponse.body.elem[0])
107       
108        return response
109   
110    def test01ValidQuery(self):
111        attributeQuery = self._createAttributeQuery()
112        request = self._makeRequest(attributeQuery=attributeQuery)
113       
114        header = {
115            'soapAction': "http://www.oasis-open.org/committees/security",
116            'Content-length': str(len(request)),
117            'Content-type': 'text/xml'
118        }
119        response = self.app.post('/attributeauthority/saml', 
120                                 params=request, 
121                                 headers=header, 
122                                 status=200)
123        print("Response status=%d" % response.status)
124        samlResponse = self._getSAMLResponse(response.body)
125
126        self.assert_(samlResponse.status.statusCode.value == \
127                     StatusCode.SUCCESS_URI)
128        self.assert_(samlResponse.inResponseTo == attributeQuery.id)
129        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
130                     attributeQuery.subject.nameID.value)
131
132    def test02AttributeReleaseDenied(self):
133        request = self._makeRequest(issuer="/O=Site B/CN=Authorisation Service")
134       
135        header = {
136            'soapAction': "http://www.oasis-open.org/committees/security",
137            'Content-length': str(len(request)),
138            'Content-type': 'text/xml'
139        }
140       
141        response = self.app.post('/attributeauthority/saml', 
142                                 params=request, 
143                                 headers=header, 
144                                 status=200)
145       
146        print("Response status=%d" % response.status)
147       
148        samlResponse = self._getSAMLResponse(response.body)
149
150        self.assert_(samlResponse.status.statusCode.value == \
151                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
152
153    def test03InvalidAttributesRequested(self):
154        attributeQuery = self._createAttributeQuery()
155       
156        # Add an unsupported Attribute name
157        attribute = Attribute()
158        attribute.name = "urn:my:attribute"
159        attribute.nameFormat = XMLConstants.XSD_NS+"#"+\
160                                    XSStringAttributeValue.TYPE_LOCAL_NAME
161        attribute.friendlyName = "myAttribute"
162        attributeQuery.attributes.append(attribute)     
163       
164        request = self._makeRequest(attributeQuery=attributeQuery)
165           
166        header = {
167            'soapAction': "http://www.oasis-open.org/committees/security",
168            'Content-length': str(len(request)),
169            'Content-type': 'text/xml'
170        }
171       
172        response = self.app.post('/attributeauthority/saml', 
173                                 params=request, 
174                                 headers=header, 
175                                 status=200)
176       
177        print("Response status=%d" % response.status)
178       
179        samlResponse = self._getSAMLResponse(response.body)
180
181        self.assert_(samlResponse.status.statusCode.value == \
182                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
183       
184    def test04InvalidQueryIssuer(self):
185        request = self._makeRequest(issuer="/CN=My Attribute Query Issuer")
186       
187        header = {
188            'soapAction': "http://www.oasis-open.org/committees/security",
189            'Content-length': str(len(request)),
190            'Content-type': 'text/xml'
191        }
192       
193        response = self.app.post('/attributeauthority/saml', 
194                                 params=request, 
195                                 headers=header, 
196                                 status=200)
197       
198        print("Response status=%d" % response.status)
199       
200        samlResponse = self._getSAMLResponse(response.body)
201
202        self.assert_(samlResponse.status.statusCode.value == \
203                     StatusCode.REQUEST_DENIED_URI)
204
205    def test05UnknownPrincipal(self):
206        request = self._makeRequest(subject="Joe.Bloggs")
207       
208        header = {
209            'soapAction': "http://www.oasis-open.org/committees/security",
210            'Content-length': str(len(request)),
211            'Content-type': 'text/xml'
212        }
213       
214        response = self.app.post('/attributeauthority/saml', 
215                                 params=request, 
216                                 headers=header, 
217                                 status=200)
218       
219        print("Response status=%d" % response.status)
220       
221        samlResponse = self._getSAMLResponse(response.body)
222
223        self.assert_(samlResponse.status.statusCode.value == \
224                     StatusCode.UNKNOWN_PRINCIPAL_URI)
225
226 
227if __name__ == "__main__":
228    unittest.main()
Note: See TracBrowser for help on using the repository browser.