source: TI12-security/trunk/python/ndg.security.saml/saml/test/test_saml.py @ 5596

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.saml/saml/test/test_saml.py@5596
Revision 5596, 12.7 KB checked in by pjkersha, 11 years ago (diff)

test_saml unit tests fixed running in new saml egg

Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "21/07/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13   
14from datetime import datetime, timedelta
15from uuid import uuid4
16from cStringIO import StringIO
17
18import unittest
19
20from xml.etree.ElementTree import iselement
21from xml.etree import ElementTree
22from saml.xml.etree import prettyPrint
23
24from saml import Assertion, Attribute, AttributeValue, AttributeStatement, \
25    SAMLVersion, XSStringAttributeValue, XSGroupRoleAttributeValue, \
26    AttributeQuery, Response, Issuer, Subject, NameID, StatusCode, Status, \
27    Conditions
28from saml.xml import XMLConstants
29from saml.xml.etree import AssertionElementTree, \
30    XSGroupRoleAttributeValueElementTree, AttributeQueryElementTree, \
31    ResponseElementTree, ConditionsElementTree
32
33
34class SAMLUtil(object):
35    """SAML utility class based on ANL examples for Earth System Grid:
36    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
37    """
38   
39    def __init__(self):
40        """Set-up ESG core attributes, Group/Role and miscellaneous
41        attributes lists
42        """
43        self.firstName = None
44        self.lastName = None
45        self.emailAddress = None
46       
47        self.__groupRoleList = []
48        self.__miscAttrList = []
49
50    def addGroupRole(self, group, role):
51        """Add an ESG Group/Role attribute
52        @type group: basestring
53        @param group: group name
54        @type role: basestring
55        @param role: role name
56        """
57        self.__groupRoleList.append((group, role))
58   
59    def addAttribute(self, name, value):
60        """Add a generic attribute
61        @type name: basestring
62        @param name: attribute name
63        @type value: basestring
64        @param value: attribute value
65        """
66        self.__miscAttrList.append((name, value))
67
68    def buildAssertion(self):
69        """Create a SAML Assertion containing ESG core attributes: First
70        Name, Last Name, e-mail Address; ESG Group/Role type attributes
71        and generic attributes
72        @rtype: ndg.security.common.saml.Assertion
73        @return: new SAML Assertion object
74        """
75       
76        assertion = Assertion()
77        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
78        assertion.id = str(uuid4())
79        assertion.issueInstant = datetime.utcnow()
80        attributeStatement = AttributeStatement()
81       
82        for attribute in self.createAttributes():
83            attributeStatement.attributes.append(attribute)
84           
85        assertion.attributeStatements.append(attributeStatement)
86       
87        return assertion
88
89    def buildAttributeQuery(self, issuer, subjectNameID):
90       
91        attributeQuery = AttributeQuery()
92        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
93        attributeQuery.id = str(uuid4())
94        attributeQuery.issueInstant = datetime.utcnow()
95       
96        attributeQuery.issuer = Issuer()
97        attributeQuery.issuer.format = Issuer.X509_SUBJECT
98        attributeQuery.issuer.value = issuer
99                       
100        attributeQuery.subject = Subject() 
101        attributeQuery.subject.nameID = NameID()
102        attributeQuery.subject.nameID.format = "urn:esg:openid"
103        attributeQuery.subject.nameID.value = subjectNameID
104                                   
105        attributeQuery.attributes = self.createAttributes()
106       
107        return attributeQuery
108   
109    def createAttributes(self):
110       
111        attributes = []
112        if self.firstName is not None:   
113            # special case handling for 'FirstName' attribute
114            fnAttribute = Attribute()
115            fnAttribute.name = "urn:esg:first:name"
116            fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
117            fnAttribute.friendlyName = "FirstName"
118
119            firstName = XSStringAttributeValue()
120            firstName.value = self.firstName
121            fnAttribute.attributeValues.append(firstName)
122
123            attributes.append(fnAttribute)
124       
125
126        if self.lastName is not None:
127            # special case handling for 'LastName' attribute
128            lnAttribute = Attribute()
129            lnAttribute.name = "urn:esg:last:name"
130            lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
131            lnAttribute.friendlyName = "LastName"
132
133            lastName = XSStringAttributeValue()
134            lastName.value = self.lastName
135            lnAttribute.attributeValues.append(lastName)
136
137            attributes.append(lnAttribute)
138       
139
140        if self.emailAddress is not None:
141            # special case handling for 'LastName' attribute
142            emailAddressAttribute = Attribute()
143            emailAddressAttribute.name = "urn:esg:email:address"
144            emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\
145                                        XSStringAttributeValue.TYPE_LOCAL_NAME
146            emailAddressAttribute.friendlyName = "emailAddress"
147
148            emailAddress = XSStringAttributeValue()
149            emailAddress.value = self.emailAddress
150            emailAddressAttribute.attributeValues.append(emailAddress)
151
152            attributes.append(emailAddressAttribute)
153       
154        if len(self.__groupRoleList) > 0:
155            # custom group/role attribute to be added to attr statement
156            groupRoleAttribute = Attribute()
157            groupRoleAttribute.name = "GroupRole"
158            groupRoleAttribute.nameFormat = \
159                                    XSGroupRoleAttributeValue.TYPE_LOCAL_NAME
160
161            for group, role in self.__groupRoleList:
162                groupRole = XSGroupRoleAttributeValue()
163                groupRole.group = group
164                groupRole.role = role
165
166                groupRoleAttribute.attributeValues.append(groupRole)
167           
168            attributes.append(groupRoleAttribute)
169       
170        for name, value in self.__miscAttrList:
171            attribute = Attribute()
172            attribute.name = name
173            attribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
174
175            stringAttributeValue = XSStringAttributeValue()
176            stringAttributeValue.value = value
177            attribute.attributeValues.append(stringAttributeValue)
178
179            attributes.append(attribute)
180           
181        return attributes
182
183
184class SAMLTestCase(unittest.TestCase):
185    """Test SAML implementation for use with CMIP5 federation"""
186   
187    def _createAssertionHelper(self):
188        samlUtil = SAMLUtil()
189       
190        # ESG core attributes
191        samlUtil.firstName = "Philip"
192        samlUtil.lastName = "Kershaw"
193        samlUtil.emailAddress = "p.j.k@somewhere"
194       
195        # BADC specific attributes
196        badcRoleList = (
197            'urn:badc:security:authz:1.0:attr:admin', 
198            'urn:badc:security:authz:1.0:attr:rapid', 
199            'urn:badc:security:authz:1.0:attr:coapec', 
200            'urn:badc:security:authz:1.0:attr:midas', 
201            'urn:badc:security:authz:1.0:attr:quest', 
202            'urn:badc:security:authz:1.0:attr:staff'
203        )
204        for role in badcRoleList:
205            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
206       
207        # ESG Group/Role type list
208        esgGroupRoleList = (
209            ("ESG-NCAR", "admin"),
210            ("ESG-PCMDI", "testUser"),
211        )
212        for group, role in esgGroupRoleList:
213            samlUtil.addGroupRole(group, role)
214       
215        # Make an assertion object
216        assertion = samlUtil.buildAssertion()
217       
218        return assertion
219       
220    def test01CreateAssertion(self):
221         
222        assertion = self._createAssertionHelper()
223       
224        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
225        # Attribute Value factory to render the XML output
226        attributeValueElementTreeClassMap = {
227            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
228        }
229       
230        # Create ElementTree Assertion Element
231        assertionElem = AssertionElementTree.create(assertion,
232                            customClassMap=attributeValueElementTreeClassMap)
233       
234        self.assert_(iselement(assertionElem))
235       
236        # Serialise to output
237        xmlOutput = prettyPrint(assertionElem)       
238        self.assert_(len(xmlOutput))
239       
240        print("\n"+"_"*80)
241        print(xmlOutput)
242        print("_"*80)
243
244    def test02CreateAttributeQuery(self):
245        samlUtil = SAMLUtil()
246        samlUtil.firstName = ''
247        samlUtil.lastName = ''
248        samlUtil.emailAddress = ''
249        attributeQuery = samlUtil.buildAttributeQuery(
250                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
251                        "https://openid.localhost/philip.kershaw")
252       
253        elem = AttributeQueryElementTree.create(attributeQuery)       
254        xmlOutput = prettyPrint(elem)
255           
256        print("\n"+"_"*80)
257        print(xmlOutput)
258        print("_"*80)
259
260    def test03ParseAttributeQuery(self):
261        samlUtil = SAMLUtil()
262        samlUtil.firstName = ''
263        samlUtil.lastName = ''
264        samlUtil.emailAddress = ''
265        attributeQuery = samlUtil.buildAttributeQuery(
266                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
267                        "https://openid.localhost/philip.kershaw")
268       
269        elem = AttributeQueryElementTree.create(attributeQuery)       
270        xmlOutput = prettyPrint(elem)       
271        print("\n"+"_"*80)
272        print(xmlOutput)
273               
274        attributeQueryStream = StringIO()
275        attributeQueryStream.write(xmlOutput)
276        attributeQueryStream.seek(0)
277
278        tree = ElementTree.parse(attributeQueryStream)
279        elem2 = tree.getroot()
280       
281        attributeQuery2 = AttributeQueryElementTree.parse(elem2)
282        self.assert_(attributeQuery2.id == attributeQuery.id)
283        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
284        self.assert_(attributeQuery2.subject.nameID.value == \
285                     attributeQuery.subject.nameID.value)
286       
287        self.assert_(attributeQuery2.attributes[1].name == \
288                     attributeQuery.attributes[1].name)
289       
290        xmlOutput2 = prettyPrint(elem2)       
291        print("_"*80)
292        print(xmlOutput2)
293        print("_"*80)
294
295    def test04createResponse(self):
296        response = Response()
297        response.issueInstant = datetime.utcnow()
298       
299        # Make up a request ID that this response is responding to
300        response.inResponseTo = str(uuid4())
301        response.id = str(uuid4())
302        response.version = SAMLVersion(SAMLVersion.VERSION_20)
303           
304        response.issuer = Issuer()
305        response.issuer.format = Issuer.X509_SUBJECT
306        response.issuer.value = \
307                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
308       
309        response.status = Status()
310        response.status.statusCode = StatusCode()
311        response.status.statusCode.value = StatusCode.SUCCESS_URI       
312               
313        assertion = self._createAssertionHelper()
314       
315        # Add a conditions statement for a validity of 8 hours
316        assertion.conditions = Conditions()
317        assertion.conditions.notBefore = datetime.utcnow()
318        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
319            timedelta(seconds=60*60*8)
320       
321        assertion.subject = Subject() 
322        assertion.subject.nameID = NameID()
323        assertion.subject.nameID.format = "urn:esg:openid"
324        assertion.subject.nameID.value = \
325                        "https://openid.localhost/philip.kershaw"   
326           
327        assertion.issuer = Issuer()
328        assertion.issuer.format = Issuer.X509_SUBJECT
329        assertion.issuer.value = \
330                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
331
332        response.assertions.append(assertion)
333       
334        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
335        # Attribute Value factory to render the XML output
336        attributeValueElementTreeClassMap = {
337            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
338        }
339       
340        # Create ElementTree Assertion Element
341        responseElem = ResponseElementTree.create(response,
342                            customClassMap=attributeValueElementTreeClassMap)
343       
344        self.assert_(iselement(responseElem))
345       
346        # Serialise to output       
347        xmlOutput = prettyPrint(responseElem)       
348        self.assert_(len(xmlOutput))
349        print("\n"+"_"*80)
350        print(xmlOutput)
351        print("_"*80)
352   
353if __name__ == "__main__":
354    unittest.main()       
Note: See TracBrowser for help on using the repository browser.