source: TI12-security/trunk/python/ndg.security.saml/saml/test/test_saml.py @ 5620

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.saml/saml/test/test_saml.py@5620
Revision 5620, 14.1 KB checked in by pjkersha, 12 years ago (diff)

Assertion parsing near complete but fixes needed to XSGroupRoleElementTree

Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "21/07/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13   
14from datetime import datetime, timedelta
15from uuid import uuid4
16from cStringIO import StringIO
17
18import unittest
19
20from xml.etree.ElementTree import iselement
21from xml.etree import ElementTree
22
23from saml.saml2.core import SAMLVersion, Attribute, AttributeStatement, \
24    Assertion, AttributeQuery, Response, Issuer, Subject, NameID, StatusCode, \
25    Status, Conditions, XSStringAttributeValue, XSGroupRoleAttributeValue
26from saml.common.xml import SAMLConstants
27from saml.xml.etree import prettyPrint, AssertionElementTree, \
28    XSGroupRoleAttributeValueElementTree, AttributeQueryElementTree, \
29    ResponseElementTree
30
31
32class SAMLUtil(object):
33    """SAML utility class based on ANL examples for Earth System Grid:
34    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
35    """
36   
37    def __init__(self):
38        """Set-up ESG core attributes, Group/Role and miscellaneous
39        attributes lists
40        """
41        self.firstName = None
42        self.lastName = None
43        self.emailAddress = None
44       
45        # ESG Group/Role attribute type
46        self.__groupRoleList = []
47        self.__miscAttrList = []
48
49    def addGroupRole(self, group, role):
50        """Add an ESG Group/Role attribute
51        @type group: basestring
52        @param group: group name
53        @type role: basestring
54        @param role: role name
55        """
56        self.__groupRoleList.append((group, role))
57   
58    def addAttribute(self, name, value):
59        """Add a generic attribute
60        @type name: basestring
61        @param name: attribute name
62        @type value: basestring
63        @param value: attribute value
64        """
65        self.__miscAttrList.append((name, value))
66
67    def buildAssertion(self):
68        """Create a SAML Assertion containing ESG core attributes: First
69        Name, Last Name, e-mail Address; ESG Group/Role type attributes
70        and generic attributes
71        @rtype: ndg.security.common.saml.Assertion
72        @return: new SAML Assertion object
73        """
74       
75        assertion = Assertion()
76        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
77        assertion.id = str(uuid4())
78        assertion.issueInstant = datetime.utcnow()
79        attributeStatement = AttributeStatement()
80       
81        for attribute in self.createAttributes():
82            attributeStatement.attributes.append(attribute)
83           
84        assertion.attributeStatements.append(attributeStatement)
85       
86        return assertion
87
88    def buildAttributeQuery(self, issuer, subjectNameID):
89        """Make a SAML Attribute Query
90        @type issuer: basestring
91        @param issuer: attribute issuer name
92        @type subjectNameID: basestring
93        @param subjectNameID: identity to query attributes for
94        """
95        attributeQuery = AttributeQuery()
96        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
97        attributeQuery.id = str(uuid4())
98        attributeQuery.issueInstant = datetime.utcnow()
99       
100        attributeQuery.issuer = Issuer()
101        attributeQuery.issuer.format = Issuer.X509_SUBJECT
102        attributeQuery.issuer.value = issuer
103                       
104        attributeQuery.subject = Subject() 
105        attributeQuery.subject.nameID = NameID()
106        attributeQuery.subject.nameID.format = "urn:esg:openid"
107        attributeQuery.subject.nameID.value = subjectNameID
108                                   
109        attributeQuery.attributes = self.createAttributes()
110       
111        return attributeQuery
112   
113    def createAttributes(self):
114        """Create SAML Attributes for use in an Assertion or AttributeQuery"""
115       
116        attributes = []
117        if self.firstName is not None:   
118            # special case handling for 'FirstName' attribute
119            fnAttribute = Attribute()
120            fnAttribute.name = "urn:esg:first:name"
121            fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
122            fnAttribute.friendlyName = "FirstName"
123
124            firstName = XSStringAttributeValue()
125            firstName.value = self.firstName
126            fnAttribute.attributeValues.append(firstName)
127
128            attributes.append(fnAttribute)
129       
130
131        if self.lastName is not None:
132            # special case handling for 'LastName' attribute
133            lnAttribute = Attribute()
134            lnAttribute.name = "urn:esg:last:name"
135            lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
136            lnAttribute.friendlyName = "LastName"
137
138            lastName = XSStringAttributeValue()
139            lastName.value = self.lastName
140            lnAttribute.attributeValues.append(lastName)
141
142            attributes.append(lnAttribute)
143       
144
145        if self.emailAddress is not None:
146            # special case handling for 'LastName' attribute
147            emailAddressAttribute = Attribute()
148            emailAddressAttribute.name = "urn:esg:email:address"
149            emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
150                                        XSStringAttributeValue.TYPE_LOCAL_NAME
151            emailAddressAttribute.friendlyName = "emailAddress"
152
153            emailAddress = XSStringAttributeValue()
154            emailAddress.value = self.emailAddress
155            emailAddressAttribute.attributeValues.append(emailAddress)
156
157            attributes.append(emailAddressAttribute)
158       
159        if len(self.__groupRoleList) > 0:
160            # custom group/role attribute to be added to attr statement
161            groupRoleAttribute = Attribute()
162            groupRoleAttribute.name = "GroupRole"
163            groupRoleAttribute.nameFormat = \
164                                    XSGroupRoleAttributeValue.TYPE_LOCAL_NAME
165
166            for group, role in self.__groupRoleList:
167                groupRole = XSGroupRoleAttributeValue()
168                groupRole.group = group
169                groupRole.role = role
170
171                groupRoleAttribute.attributeValues.append(groupRole)
172           
173            attributes.append(groupRoleAttribute)
174       
175        for name, value in self.__miscAttrList:
176            attribute = Attribute()
177            attribute.name = name
178            attribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
179
180            stringAttributeValue = XSStringAttributeValue()
181            stringAttributeValue.value = value
182            attribute.attributeValues.append(stringAttributeValue)
183
184            attributes.append(attribute)
185           
186        return attributes
187
188
189class SAMLTestCase(unittest.TestCase):
190    """Test SAML implementation for use with CMIP5 federation"""
191   
192    def _createAssertionHelper(self):
193        samlUtil = SAMLUtil()
194       
195        # ESG core attributes
196        samlUtil.firstName = "Philip"
197        samlUtil.lastName = "Kershaw"
198        samlUtil.emailAddress = "p.j.k@somewhere"
199       
200        # BADC specific attributes
201        badcRoleList = (
202            'urn:badc:security:authz:1.0:attr:admin', 
203            'urn:badc:security:authz:1.0:attr:rapid', 
204            'urn:badc:security:authz:1.0:attr:coapec', 
205            'urn:badc:security:authz:1.0:attr:midas', 
206            'urn:badc:security:authz:1.0:attr:quest', 
207            'urn:badc:security:authz:1.0:attr:staff'
208        )
209        for role in badcRoleList:
210            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
211       
212        # ESG Group/Role type list
213        esgGroupRoleList = (
214            ("ESG-NCAR", "admin"),
215            ("ESG-PCMDI", "testUser"),
216        )
217        for group, role in esgGroupRoleList:
218            samlUtil.addGroupRole(group, role)
219       
220        # Make an assertion object
221        assertion = samlUtil.buildAssertion()
222       
223        return assertion
224       
225    def test01CreateAssertion(self):
226         
227        assertion = self._createAssertionHelper()
228       
229        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
230        # Attribute Value factory to render the XML output
231        toXMLTypeMap = {
232            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
233        }
234       
235        # Create ElementTree Assertion Element
236        assertionElem = AssertionElementTree.toXML(assertion,
237                                            customToXMLTypeMap=toXMLTypeMap)
238       
239        self.assert_(iselement(assertionElem))
240       
241        # Serialise to output
242        xmlOutput = prettyPrint(assertionElem)       
243        self.assert_(len(xmlOutput))
244       
245        print("\n"+"_"*80)
246        print(xmlOutput)
247        print("_"*80)
248
249    def test02ParseAssertion(self):
250        assertion = self._createAssertionHelper()
251       
252        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
253        # Attribute Value factory to render the XML output
254        toXMLTypeMap = {
255            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
256        }
257       
258        # Create ElementTree Assertion Element
259        assertionElem = AssertionElementTree.toXML(assertion,
260                                            customToXMLTypeMap=toXMLTypeMap)
261       
262        self.assert_(iselement(assertionElem))
263       
264        # Serialise to output
265        xmlOutput = prettyPrint(assertionElem)       
266               
267        assertionStream = StringIO()
268        assertionStream.write(xmlOutput)
269        assertionStream.seek(0)
270
271        tree = ElementTree.parse(assertionStream)
272        elem2 = tree.getroot()
273       
274        toSAMLTypeMap = [XSGroupRoleAttributeValueElementTree.factoryMatchFunc]
275       
276        assertionElem2 = AssertionElementTree.fromXML(elem2,
277                                            customToSAMLTypeMap=toSAMLTypeMap)
278        self.assert_(assertionElem2)
279       
280    def test03CreateAttributeQuery(self):
281        samlUtil = SAMLUtil()
282        samlUtil.firstName = ''
283        samlUtil.lastName = ''
284        samlUtil.emailAddress = ''
285        attributeQuery = samlUtil.buildAttributeQuery(
286                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
287                        "https://openid.localhost/philip.kershaw")
288       
289        elem = AttributeQueryElementTree.toXML(attributeQuery)       
290        xmlOutput = prettyPrint(elem)
291           
292        print("\n"+"_"*80)
293        print(xmlOutput)
294        print("_"*80)
295
296    def test04ParseAttributeQuery(self):
297        samlUtil = SAMLUtil()
298        samlUtil.firstName = ''
299        samlUtil.lastName = ''
300        samlUtil.emailAddress = ''
301        attributeQuery = samlUtil.buildAttributeQuery(
302                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
303                        "https://openid.localhost/philip.kershaw")
304       
305        elem = AttributeQueryElementTree.toXML(attributeQuery)       
306        xmlOutput = prettyPrint(elem)       
307        print("\n"+"_"*80)
308        print(xmlOutput)
309               
310        attributeQueryStream = StringIO()
311        attributeQueryStream.write(xmlOutput)
312        attributeQueryStream.seek(0)
313
314        tree = ElementTree.parse(attributeQueryStream)
315        elem2 = tree.getroot()
316       
317        attributeQuery2 = AttributeQueryElementTree.fromXML(elem2)
318        self.assert_(attributeQuery2.id == attributeQuery.id)
319        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
320        self.assert_(attributeQuery2.subject.nameID.value == \
321                     attributeQuery.subject.nameID.value)
322       
323        self.assert_(attributeQuery2.attributes[1].name == \
324                     attributeQuery.attributes[1].name)
325       
326        xmlOutput2 = prettyPrint(elem2)       
327        print("_"*80)
328        print(xmlOutput2)
329        print("_"*80)
330
331    def test05CreateResponse(self):
332        response = Response()
333        response.issueInstant = datetime.utcnow()
334       
335        # Make up a request ID that this response is responding to
336        response.inResponseTo = str(uuid4())
337        response.id = str(uuid4())
338        response.version = SAMLVersion(SAMLVersion.VERSION_20)
339           
340        response.issuer = Issuer()
341        response.issuer.format = Issuer.X509_SUBJECT
342        response.issuer.value = \
343                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
344       
345        response.status = Status()
346        response.status.statusCode = StatusCode()
347        response.status.statusCode.value = StatusCode.SUCCESS_URI       
348               
349        assertion = self._createAssertionHelper()
350       
351        # Add a conditions statement for a validity of 8 hours
352        assertion.conditions = Conditions()
353        assertion.conditions.notBefore = datetime.utcnow()
354        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
355            timedelta(seconds=60*60*8)
356       
357        assertion.subject = Subject() 
358        assertion.subject.nameID = NameID()
359        assertion.subject.nameID.format = "urn:esg:openid"
360        assertion.subject.nameID.value = \
361                        "https://openid.localhost/philip.kershaw"   
362           
363        assertion.issuer = Issuer()
364        assertion.issuer.format = Issuer.X509_SUBJECT
365        assertion.issuer.value = \
366                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
367
368        response.assertions.append(assertion)
369       
370        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
371        # Attribute Value factory to render the XML output
372        toXMLTypeMap = {
373            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
374        }
375       
376        # Create ElementTree Assertion Element
377        responseElem = ResponseElementTree.toXML(response,
378                                            customToXMLTypeMap=toXMLTypeMap)
379       
380        self.assert_(iselement(responseElem))
381       
382        # Serialise to output       
383        xmlOutput = prettyPrint(responseElem)       
384        self.assert_(len(xmlOutput))
385        print("\n"+"_"*80)
386        print(xmlOutput)
387        print("_"*80)
388   
389if __name__ == "__main__":
390    unittest.main()       
Note: See TracBrowser for help on using the repository browser.