source: TI12-security/trunk/python/ndg_security_saml/saml/test/test_saml.py @ 5738

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_saml/saml/test/test_saml.py@5738
Revision 5738, 14.8 KB checked in by pjkersha, 11 years ago (diff)

saml.xml.etree: important fixes to ElementTree based Status element serialisation and de-serialisation
ndg.security.server.attributeauthority: added clockSkew parameter to provide some leeway in SAML attribute query clock checks. Also added StatusMessage? element for additional error info in responses.
ndg.security.common.soap.client: added check of HTTP Content-type in SOAP responses.

Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "21/07/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13   
14from datetime import datetime, timedelta
15from uuid import uuid4
16from cStringIO import StringIO
17
18import unittest
19
20from xml.etree.ElementTree import iselement
21from xml.etree import ElementTree
22
23from saml.saml2.core import SAMLVersion, Attribute, AttributeStatement, \
24    Assertion, AttributeQuery, Response, Issuer, Subject, NameID, StatusCode, \
25    StatusMessage, Status, Conditions, XSStringAttributeValue, \
26    XSGroupRoleAttributeValue
27from saml.common.xml import SAMLConstants
28from saml.xml.etree import prettyPrint, AssertionElementTree, \
29    XSGroupRoleAttributeValueElementTree, AttributeQueryElementTree, \
30    ResponseElementTree
31
32
33class SAMLUtil(object):
34    """SAML utility class based on ANL examples for Earth System Grid:
35    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
36    """
37   
38    def __init__(self):
39        """Set-up ESG core attributes, Group/Role and miscellaneous
40        attributes lists
41        """
42        self.firstName = None
43        self.lastName = None
44        self.emailAddress = None
45       
46        # ESG Group/Role attribute type
47        self.__groupRoleList = []
48        self.__miscAttrList = []
49
50    def addGroupRole(self, group, role):
51        """Add an ESG Group/Role attribute
52        @type group: basestring
53        @param group: group name
54        @type role: basestring
55        @param role: role name
56        """
57        self.__groupRoleList.append((group, role))
58   
59    def addAttribute(self, name, value):
60        """Add a generic attribute
61        @type name: basestring
62        @param name: attribute name
63        @type value: basestring
64        @param value: attribute value
65        """
66        self.__miscAttrList.append((name, value))
67
68    def buildAssertion(self):
69        """Create a SAML Assertion containing ESG core attributes: First
70        Name, Last Name, e-mail Address; ESG Group/Role type attributes
71        and generic attributes
72        @rtype: ndg.security.common.saml.Assertion
73        @return: new SAML Assertion object
74        """
75       
76        assertion = Assertion()
77        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
78        assertion.id = str(uuid4())
79        assertion.issueInstant = datetime.utcnow()
80        attributeStatement = AttributeStatement()
81       
82        for attribute in self.createAttributes():
83            attributeStatement.attributes.append(attribute)
84           
85        assertion.attributeStatements.append(attributeStatement)
86       
87        return assertion
88
89    def buildAttributeQuery(self, issuer, subjectNameID):
90        """Make a SAML Attribute Query
91        @type issuer: basestring
92        @param issuer: attribute issuer name
93        @type subjectNameID: basestring
94        @param subjectNameID: identity to query attributes for
95        """
96        attributeQuery = AttributeQuery()
97        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
98        attributeQuery.id = str(uuid4())
99        attributeQuery.issueInstant = datetime.utcnow()
100       
101        attributeQuery.issuer = Issuer()
102        attributeQuery.issuer.format = Issuer.X509_SUBJECT
103        attributeQuery.issuer.value = issuer
104                       
105        attributeQuery.subject = Subject() 
106        attributeQuery.subject.nameID = NameID()
107        attributeQuery.subject.nameID.format = "urn:esg:openid"
108        attributeQuery.subject.nameID.value = subjectNameID
109                                   
110        attributeQuery.attributes = self.createAttributes()
111       
112        return attributeQuery
113   
114    def createAttributes(self):
115        """Create SAML Attributes for use in an Assertion or AttributeQuery"""
116       
117        attributes = []
118        if self.firstName is not None:   
119            # special case handling for 'FirstName' attribute
120            fnAttribute = Attribute()
121            fnAttribute.name = "urn:esg:first:name"
122            fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
123            fnAttribute.friendlyName = "FirstName"
124
125            firstName = XSStringAttributeValue()
126            firstName.value = self.firstName
127            fnAttribute.attributeValues.append(firstName)
128
129            attributes.append(fnAttribute)
130       
131
132        if self.lastName is not None:
133            # special case handling for 'LastName' attribute
134            lnAttribute = Attribute()
135            lnAttribute.name = "urn:esg:last:name"
136            lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
137            lnAttribute.friendlyName = "LastName"
138
139            lastName = XSStringAttributeValue()
140            lastName.value = self.lastName
141            lnAttribute.attributeValues.append(lastName)
142
143            attributes.append(lnAttribute)
144       
145
146        if self.emailAddress is not None:
147            # special case handling for 'LastName' attribute
148            emailAddressAttribute = Attribute()
149            emailAddressAttribute.name = "urn:esg:email:address"
150            emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
151                                        XSStringAttributeValue.TYPE_LOCAL_NAME
152            emailAddressAttribute.friendlyName = "emailAddress"
153
154            emailAddress = XSStringAttributeValue()
155            emailAddress.value = self.emailAddress
156            emailAddressAttribute.attributeValues.append(emailAddress)
157
158            attributes.append(emailAddressAttribute)
159       
160        if len(self.__groupRoleList) > 0:
161            # custom group/role attribute to be added to attr statement
162            groupRoleAttribute = Attribute()
163            groupRoleAttribute.name = "GroupRole"
164            groupRoleAttribute.nameFormat = \
165                                    XSGroupRoleAttributeValue.TYPE_LOCAL_NAME
166
167            for group, role in self.__groupRoleList:
168                groupRole = XSGroupRoleAttributeValue()
169                groupRole.group = group
170                groupRole.role = role
171
172                groupRoleAttribute.attributeValues.append(groupRole)
173           
174            attributes.append(groupRoleAttribute)
175       
176        for name, value in self.__miscAttrList:
177            attribute = Attribute()
178            attribute.name = name
179            attribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
180
181            stringAttributeValue = XSStringAttributeValue()
182            stringAttributeValue.value = value
183            attribute.attributeValues.append(stringAttributeValue)
184
185            attributes.append(attribute)
186           
187        return attributes
188
189
190class SAMLTestCase(unittest.TestCase):
191    """Test SAML implementation for use with CMIP5 federation"""
192   
193    def _createAssertionHelper(self):
194        samlUtil = SAMLUtil()
195       
196        # ESG core attributes
197        samlUtil.firstName = "Philip"
198        samlUtil.lastName = "Kershaw"
199        samlUtil.emailAddress = "p.j.k@somewhere"
200       
201        # BADC specific attributes
202        badcRoleList = (
203            'urn:badc:security:authz:1.0:attr:admin', 
204            'urn:badc:security:authz:1.0:attr:rapid', 
205            'urn:badc:security:authz:1.0:attr:coapec', 
206            'urn:badc:security:authz:1.0:attr:midas', 
207            'urn:badc:security:authz:1.0:attr:quest', 
208            'urn:badc:security:authz:1.0:attr:staff'
209        )
210        for role in badcRoleList:
211            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
212       
213        # ESG Group/Role type list
214        esgGroupRoleList = (
215            ("ESG-NCAR", "admin"),
216            ("ESG-PCMDI", "testUser"),
217        )
218        for group, role in esgGroupRoleList:
219            samlUtil.addGroupRole(group, role)
220       
221        # Make an assertion object
222        assertion = samlUtil.buildAssertion()
223       
224        return assertion
225       
226    def test01CreateAssertion(self):
227         
228        assertion = self._createAssertionHelper()
229       
230        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
231        # Attribute Value factory to render the XML output
232        toXMLTypeMap = {
233            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
234        }
235       
236        # Create ElementTree Assertion Element
237        assertionElem = AssertionElementTree.toXML(assertion,
238                                            customToXMLTypeMap=toXMLTypeMap)
239       
240        self.assert_(iselement(assertionElem))
241       
242        # Serialise to output
243        xmlOutput = prettyPrint(assertionElem)       
244        self.assert_(len(xmlOutput))
245       
246        print("\n"+"_"*80)
247        print(xmlOutput)
248        print("_"*80)
249
250    def test02ParseAssertion(self):
251        assertion = self._createAssertionHelper()
252       
253        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
254        # Attribute Value factory to render the XML output
255        toXMLTypeMap = {
256            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
257        }
258       
259        # Create ElementTree Assertion Element
260        assertionElem = AssertionElementTree.toXML(assertion,
261                                            customToXMLTypeMap=toXMLTypeMap)
262       
263        self.assert_(iselement(assertionElem))
264       
265        # Serialise to output
266        xmlOutput = prettyPrint(assertionElem)       
267           
268        print("\n"+"_"*80)
269        print(xmlOutput)
270        print("_"*80)
271               
272        assertionStream = StringIO()
273        assertionStream.write(xmlOutput)
274        assertionStream.seek(0)
275
276        tree = ElementTree.parse(assertionStream)
277        elem2 = tree.getroot()
278       
279        toSAMLTypeMap = [XSGroupRoleAttributeValueElementTree.factoryMatchFunc]
280       
281        assertion2 = AssertionElementTree.fromXML(elem2,
282                                            customToSAMLTypeMap=toSAMLTypeMap)
283        self.assert_(assertion2)
284        xsGroupRoleAttrFound = False
285        for attr in assertion2.attributeStatements[0].attributes:
286            for attrValue in attr.attributeValues:
287                if isinstance(attrValue, XSGroupRoleAttributeValue):
288                    self.assert_(attrValue.group)
289                    self.assert_(attrValue.role)
290                    xsGroupRoleAttrFound = True
291       
292        self.assert_(xsGroupRoleAttrFound)
293       
294    def test03CreateAttributeQuery(self):
295        samlUtil = SAMLUtil()
296        samlUtil.firstName = ''
297        samlUtil.lastName = ''
298        samlUtil.emailAddress = ''
299        attributeQuery = samlUtil.buildAttributeQuery(
300                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
301                        "https://openid.localhost/philip.kershaw")
302       
303        elem = AttributeQueryElementTree.toXML(attributeQuery)       
304        xmlOutput = prettyPrint(elem)
305           
306        print("\n"+"_"*80)
307        print(xmlOutput)
308        print("_"*80)
309
310    def test04ParseAttributeQuery(self):
311        samlUtil = SAMLUtil()
312        samlUtil.firstName = ''
313        samlUtil.lastName = ''
314        samlUtil.emailAddress = ''
315        attributeQuery = samlUtil.buildAttributeQuery(
316                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
317                        "https://openid.localhost/philip.kershaw")
318       
319        elem = AttributeQueryElementTree.toXML(attributeQuery)       
320        xmlOutput = prettyPrint(elem)       
321        print("\n"+"_"*80)
322        print(xmlOutput)
323               
324        attributeQueryStream = StringIO()
325        attributeQueryStream.write(xmlOutput)
326        attributeQueryStream.seek(0)
327
328        tree = ElementTree.parse(attributeQueryStream)
329        elem2 = tree.getroot()
330       
331        attributeQuery2 = AttributeQueryElementTree.fromXML(elem2)
332        self.assert_(attributeQuery2.id == attributeQuery.id)
333        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
334        self.assert_(attributeQuery2.subject.nameID.value == \
335                     attributeQuery.subject.nameID.value)
336       
337        self.assert_(attributeQuery2.attributes[1].name == \
338                     attributeQuery.attributes[1].name)
339       
340        xmlOutput2 = prettyPrint(elem2)       
341        print("_"*80)
342        print(xmlOutput2)
343        print("_"*80)
344
345    def test05CreateResponse(self):
346        response = Response()
347        response.issueInstant = datetime.utcnow()
348       
349        # Make up a request ID that this response is responding to
350        response.inResponseTo = str(uuid4())
351        response.id = str(uuid4())
352        response.version = SAMLVersion(SAMLVersion.VERSION_20)
353           
354        response.issuer = Issuer()
355        response.issuer.format = Issuer.X509_SUBJECT
356        response.issuer.value = \
357                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
358       
359        response.status = Status()
360        response.status.statusCode = StatusCode()
361        response.status.statusCode.value = StatusCode.SUCCESS_URI
362        response.status.statusMessage = StatusMessage()       
363        response.status.statusMessage.value = "Response created successfully"
364           
365        assertion = self._createAssertionHelper()
366       
367        # Add a conditions statement for a validity of 8 hours
368        assertion.conditions = Conditions()
369        assertion.conditions.notBefore = datetime.utcnow()
370        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
371            timedelta(seconds=60*60*8)
372       
373        assertion.subject = Subject() 
374        assertion.subject.nameID = NameID()
375        assertion.subject.nameID.format = "urn:esg:openid"
376        assertion.subject.nameID.value = \
377                        "https://openid.localhost/philip.kershaw"   
378           
379        assertion.issuer = Issuer()
380        assertion.issuer.format = Issuer.X509_SUBJECT
381        assertion.issuer.value = \
382                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
383
384        response.assertions.append(assertion)
385       
386        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
387        # Attribute Value factory to render the XML output
388        toXMLTypeMap = {
389            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
390        }
391       
392        # Create ElementTree Assertion Element
393        responseElem = ResponseElementTree.toXML(response,
394                                            customToXMLTypeMap=toXMLTypeMap)
395       
396        self.assert_(iselement(responseElem))
397       
398        # Serialise to output       
399        xmlOutput = prettyPrint(responseElem)       
400        self.assert_(len(xmlOutput))
401        print("\n"+"_"*80)
402        print(xmlOutput)
403        print("_"*80)
404   
405if __name__ == "__main__":
406    unittest.main()       
Note: See TracBrowser for help on using the repository browser.