source: TI12-security/trunk/python/ndg.security.saml/saml/test/test_saml.py @ 5601

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.saml/saml/test/test_saml.py@5601
Revision 5601, 13.7 KB checked in by pjkersha, 11 years ago (diff)

Adding Assertion parsing functionality - not finished!

Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "21/07/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13   
14from datetime import datetime, timedelta
15from uuid import uuid4
16from cStringIO import StringIO
17
18import unittest
19
20from xml.etree.ElementTree import iselement
21from xml.etree import ElementTree
22
23from saml.saml2.core import SAMLVersion, Attribute, AttributeStatement, \
24    Assertion, AttributeValue, AttributeQuery, Response, Issuer, Subject, \
25    NameID, StatusCode, Status, Conditions, XSStringAttributeValue, \
26    XSGroupRoleAttributeValue
27from saml.common.xml import SAMLConstants
28from saml.xml.etree import prettyPrint, AssertionElementTree, \
29    XSGroupRoleAttributeValueElementTree, AttributeQueryElementTree, \
30    ResponseElementTree, ConditionsElementTree
31
32
33class SAMLUtil(object):
34    """SAML utility class based on ANL examples for Earth System Grid:
35    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
36    """
37   
38    def __init__(self):
39        """Set-up ESG core attributes, Group/Role and miscellaneous
40        attributes lists
41        """
42        self.firstName = None
43        self.lastName = None
44        self.emailAddress = None
45       
46        self.__groupRoleList = []
47        self.__miscAttrList = []
48
49    def addGroupRole(self, group, role):
50        """Add an ESG Group/Role attribute
51        @type group: basestring
52        @param group: group name
53        @type role: basestring
54        @param role: role name
55        """
56        self.__groupRoleList.append((group, role))
57   
58    def addAttribute(self, name, value):
59        """Add a generic attribute
60        @type name: basestring
61        @param name: attribute name
62        @type value: basestring
63        @param value: attribute value
64        """
65        self.__miscAttrList.append((name, value))
66
67    def buildAssertion(self):
68        """Create a SAML Assertion containing ESG core attributes: First
69        Name, Last Name, e-mail Address; ESG Group/Role type attributes
70        and generic attributes
71        @rtype: ndg.security.common.saml.Assertion
72        @return: new SAML Assertion object
73        """
74       
75        assertion = Assertion()
76        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
77        assertion.id = str(uuid4())
78        assertion.issueInstant = datetime.utcnow()
79        attributeStatement = AttributeStatement()
80       
81        for attribute in self.createAttributes():
82            attributeStatement.attributes.append(attribute)
83           
84        assertion.attributeStatements.append(attributeStatement)
85       
86        return assertion
87
88    def buildAttributeQuery(self, issuer, subjectNameID):
89       
90        attributeQuery = AttributeQuery()
91        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
92        attributeQuery.id = str(uuid4())
93        attributeQuery.issueInstant = datetime.utcnow()
94       
95        attributeQuery.issuer = Issuer()
96        attributeQuery.issuer.format = Issuer.X509_SUBJECT
97        attributeQuery.issuer.value = issuer
98                       
99        attributeQuery.subject = Subject() 
100        attributeQuery.subject.nameID = NameID()
101        attributeQuery.subject.nameID.format = "urn:esg:openid"
102        attributeQuery.subject.nameID.value = subjectNameID
103                                   
104        attributeQuery.attributes = self.createAttributes()
105       
106        return attributeQuery
107   
108    def createAttributes(self):
109       
110        attributes = []
111        if self.firstName is not None:   
112            # special case handling for 'FirstName' attribute
113            fnAttribute = Attribute()
114            fnAttribute.name = "urn:esg:first:name"
115            fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
116            fnAttribute.friendlyName = "FirstName"
117
118            firstName = XSStringAttributeValue()
119            firstName.value = self.firstName
120            fnAttribute.attributeValues.append(firstName)
121
122            attributes.append(fnAttribute)
123       
124
125        if self.lastName is not None:
126            # special case handling for 'LastName' attribute
127            lnAttribute = Attribute()
128            lnAttribute.name = "urn:esg:last:name"
129            lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
130            lnAttribute.friendlyName = "LastName"
131
132            lastName = XSStringAttributeValue()
133            lastName.value = self.lastName
134            lnAttribute.attributeValues.append(lastName)
135
136            attributes.append(lnAttribute)
137       
138
139        if self.emailAddress is not None:
140            # special case handling for 'LastName' attribute
141            emailAddressAttribute = Attribute()
142            emailAddressAttribute.name = "urn:esg:email:address"
143            emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
144                                        XSStringAttributeValue.TYPE_LOCAL_NAME
145            emailAddressAttribute.friendlyName = "emailAddress"
146
147            emailAddress = XSStringAttributeValue()
148            emailAddress.value = self.emailAddress
149            emailAddressAttribute.attributeValues.append(emailAddress)
150
151            attributes.append(emailAddressAttribute)
152       
153        if len(self.__groupRoleList) > 0:
154            # custom group/role attribute to be added to attr statement
155            groupRoleAttribute = Attribute()
156            groupRoleAttribute.name = "GroupRole"
157            groupRoleAttribute.nameFormat = \
158                                    XSGroupRoleAttributeValue.TYPE_LOCAL_NAME
159
160            for group, role in self.__groupRoleList:
161                groupRole = XSGroupRoleAttributeValue()
162                groupRole.group = group
163                groupRole.role = role
164
165                groupRoleAttribute.attributeValues.append(groupRole)
166           
167            attributes.append(groupRoleAttribute)
168       
169        for name, value in self.__miscAttrList:
170            attribute = Attribute()
171            attribute.name = name
172            attribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
173
174            stringAttributeValue = XSStringAttributeValue()
175            stringAttributeValue.value = value
176            attribute.attributeValues.append(stringAttributeValue)
177
178            attributes.append(attribute)
179           
180        return attributes
181
182
183class SAMLTestCase(unittest.TestCase):
184    """Test SAML implementation for use with CMIP5 federation"""
185   
186    def _createAssertionHelper(self):
187        samlUtil = SAMLUtil()
188       
189        # ESG core attributes
190        samlUtil.firstName = "Philip"
191        samlUtil.lastName = "Kershaw"
192        samlUtil.emailAddress = "p.j.k@somewhere"
193       
194        # BADC specific attributes
195        badcRoleList = (
196            'urn:badc:security:authz:1.0:attr:admin', 
197            'urn:badc:security:authz:1.0:attr:rapid', 
198            'urn:badc:security:authz:1.0:attr:coapec', 
199            'urn:badc:security:authz:1.0:attr:midas', 
200            'urn:badc:security:authz:1.0:attr:quest', 
201            'urn:badc:security:authz:1.0:attr:staff'
202        )
203        for role in badcRoleList:
204            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
205       
206        # ESG Group/Role type list
207        esgGroupRoleList = (
208            ("ESG-NCAR", "admin"),
209            ("ESG-PCMDI", "testUser"),
210        )
211        for group, role in esgGroupRoleList:
212            samlUtil.addGroupRole(group, role)
213       
214        # Make an assertion object
215        assertion = samlUtil.buildAssertion()
216       
217        return assertion
218       
219    def test01CreateAssertion(self):
220         
221        assertion = self._createAssertionHelper()
222       
223        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
224        # Attribute Value factory to render the XML output
225        attributeValueElementTreeClassMap = {
226            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
227        }
228       
229        # Create ElementTree Assertion Element
230        assertionElem = AssertionElementTree.create(assertion,
231                            customClassMap=attributeValueElementTreeClassMap)
232       
233        self.assert_(iselement(assertionElem))
234       
235        # Serialise to output
236        xmlOutput = prettyPrint(assertionElem)       
237        self.assert_(len(xmlOutput))
238       
239        print("\n"+"_"*80)
240        print(xmlOutput)
241        print("_"*80)
242
243    def test02ParseAssertion(self):
244        assertion = self._createAssertionHelper()
245       
246        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
247        # Attribute Value factory to render the XML output
248        attributeValueElementTreeClassMap = {
249            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
250        }
251       
252        # Create ElementTree Assertion Element
253        assertionElem = AssertionElementTree.create(assertion,
254                            customClassMap=attributeValueElementTreeClassMap)
255       
256        self.assert_(iselement(assertionElem))
257       
258        # Serialise to output
259        xmlOutput = prettyPrint(assertionElem)       
260               
261        assertionStream = StringIO()
262        assertionStream.write(xmlOutput)
263        assertionStream.seek(0)
264
265        tree = ElementTree.parse(assertionStream)
266        elem2 = tree.getroot()
267       
268        assertionElem2 = AssertionElementTree.parse(elem2)
269       
270    def test03CreateAttributeQuery(self):
271        samlUtil = SAMLUtil()
272        samlUtil.firstName = ''
273        samlUtil.lastName = ''
274        samlUtil.emailAddress = ''
275        attributeQuery = samlUtil.buildAttributeQuery(
276                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
277                        "https://openid.localhost/philip.kershaw")
278       
279        elem = AttributeQueryElementTree.create(attributeQuery)       
280        xmlOutput = prettyPrint(elem)
281           
282        print("\n"+"_"*80)
283        print(xmlOutput)
284        print("_"*80)
285
286    def test04ParseAttributeQuery(self):
287        samlUtil = SAMLUtil()
288        samlUtil.firstName = ''
289        samlUtil.lastName = ''
290        samlUtil.emailAddress = ''
291        attributeQuery = samlUtil.buildAttributeQuery(
292                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
293                        "https://openid.localhost/philip.kershaw")
294       
295        elem = AttributeQueryElementTree.create(attributeQuery)       
296        xmlOutput = prettyPrint(elem)       
297        print("\n"+"_"*80)
298        print(xmlOutput)
299               
300        attributeQueryStream = StringIO()
301        attributeQueryStream.write(xmlOutput)
302        attributeQueryStream.seek(0)
303
304        tree = ElementTree.parse(attributeQueryStream)
305        elem2 = tree.getroot()
306       
307        attributeQuery2 = AttributeQueryElementTree.parse(elem2)
308        self.assert_(attributeQuery2.id == attributeQuery.id)
309        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
310        self.assert_(attributeQuery2.subject.nameID.value == \
311                     attributeQuery.subject.nameID.value)
312       
313        self.assert_(attributeQuery2.attributes[1].name == \
314                     attributeQuery.attributes[1].name)
315       
316        xmlOutput2 = prettyPrint(elem2)       
317        print("_"*80)
318        print(xmlOutput2)
319        print("_"*80)
320
321    def test05CreateResponse(self):
322        response = Response()
323        response.issueInstant = datetime.utcnow()
324       
325        # Make up a request ID that this response is responding to
326        response.inResponseTo = str(uuid4())
327        response.id = str(uuid4())
328        response.version = SAMLVersion(SAMLVersion.VERSION_20)
329           
330        response.issuer = Issuer()
331        response.issuer.format = Issuer.X509_SUBJECT
332        response.issuer.value = \
333                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
334       
335        response.status = Status()
336        response.status.statusCode = StatusCode()
337        response.status.statusCode.value = StatusCode.SUCCESS_URI       
338               
339        assertion = self._createAssertionHelper()
340       
341        # Add a conditions statement for a validity of 8 hours
342        assertion.conditions = Conditions()
343        assertion.conditions.notBefore = datetime.utcnow()
344        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
345            timedelta(seconds=60*60*8)
346       
347        assertion.subject = Subject() 
348        assertion.subject.nameID = NameID()
349        assertion.subject.nameID.format = "urn:esg:openid"
350        assertion.subject.nameID.value = \
351                        "https://openid.localhost/philip.kershaw"   
352           
353        assertion.issuer = Issuer()
354        assertion.issuer.format = Issuer.X509_SUBJECT
355        assertion.issuer.value = \
356                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
357
358        response.assertions.append(assertion)
359       
360        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
361        # Attribute Value factory to render the XML output
362        attributeValueElementTreeClassMap = {
363            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
364        }
365       
366        # Create ElementTree Assertion Element
367        responseElem = ResponseElementTree.create(response,
368                            customClassMap=attributeValueElementTreeClassMap)
369       
370        self.assert_(iselement(responseElem))
371       
372        # Serialise to output       
373        xmlOutput = prettyPrint(responseElem)       
374        self.assert_(len(xmlOutput))
375        print("\n"+"_"*80)
376        print(xmlOutput)
377        print("_"*80)
378   
379if __name__ == "__main__":
380    unittest.main()       
Note: See TracBrowser for help on using the repository browser.