source: TI12-security/trunk/python/ndg.security.saml/saml/test/test_saml.py @ 5597

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.saml/saml/test/test_saml.py@5597
Revision 5597, 12.8 KB checked in by pjkersha, 11 years ago (diff)

Refactored SAML package structure into saml2, core and common sub-packages

Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "21/07/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13   
14from datetime import datetime, timedelta
15from uuid import uuid4
16from cStringIO import StringIO
17
18import unittest
19
20from xml.etree.ElementTree import iselement
21from xml.etree import ElementTree
22from saml.xml.etree import prettyPrint
23
24from saml import XSStringAttributeValue, XSGroupRoleAttributeValue
25   
26from saml.saml2.core import SAMLVersion, Attribute, AttributeStatement, \
27    Assertion, AttributeValue, AttributeQuery, Response, Issuer, Subject, \
28    NameID, StatusCode, Status, Conditions
29from saml.common.xml import SAMLConstants
30from saml.xml.etree import AssertionElementTree, \
31    XSGroupRoleAttributeValueElementTree, AttributeQueryElementTree, \
32    ResponseElementTree, ConditionsElementTree
33
34
35class SAMLUtil(object):
36    """SAML utility class based on ANL examples for Earth System Grid:
37    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
38    """
39   
40    def __init__(self):
41        """Set-up ESG core attributes, Group/Role and miscellaneous
42        attributes lists
43        """
44        self.firstName = None
45        self.lastName = None
46        self.emailAddress = None
47       
48        self.__groupRoleList = []
49        self.__miscAttrList = []
50
51    def addGroupRole(self, group, role):
52        """Add an ESG Group/Role attribute
53        @type group: basestring
54        @param group: group name
55        @type role: basestring
56        @param role: role name
57        """
58        self.__groupRoleList.append((group, role))
59   
60    def addAttribute(self, name, value):
61        """Add a generic attribute
62        @type name: basestring
63        @param name: attribute name
64        @type value: basestring
65        @param value: attribute value
66        """
67        self.__miscAttrList.append((name, value))
68
69    def buildAssertion(self):
70        """Create a SAML Assertion containing ESG core attributes: First
71        Name, Last Name, e-mail Address; ESG Group/Role type attributes
72        and generic attributes
73        @rtype: ndg.security.common.saml.Assertion
74        @return: new SAML Assertion object
75        """
76       
77        assertion = Assertion()
78        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
79        assertion.id = str(uuid4())
80        assertion.issueInstant = datetime.utcnow()
81        attributeStatement = AttributeStatement()
82       
83        for attribute in self.createAttributes():
84            attributeStatement.attributes.append(attribute)
85           
86        assertion.attributeStatements.append(attributeStatement)
87       
88        return assertion
89
90    def buildAttributeQuery(self, issuer, subjectNameID):
91       
92        attributeQuery = AttributeQuery()
93        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
94        attributeQuery.id = str(uuid4())
95        attributeQuery.issueInstant = datetime.utcnow()
96       
97        attributeQuery.issuer = Issuer()
98        attributeQuery.issuer.format = Issuer.X509_SUBJECT
99        attributeQuery.issuer.value = issuer
100                       
101        attributeQuery.subject = Subject() 
102        attributeQuery.subject.nameID = NameID()
103        attributeQuery.subject.nameID.format = "urn:esg:openid"
104        attributeQuery.subject.nameID.value = subjectNameID
105                                   
106        attributeQuery.attributes = self.createAttributes()
107       
108        return attributeQuery
109   
110    def createAttributes(self):
111       
112        attributes = []
113        if self.firstName is not None:   
114            # special case handling for 'FirstName' attribute
115            fnAttribute = Attribute()
116            fnAttribute.name = "urn:esg:first:name"
117            fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
118            fnAttribute.friendlyName = "FirstName"
119
120            firstName = XSStringAttributeValue()
121            firstName.value = self.firstName
122            fnAttribute.attributeValues.append(firstName)
123
124            attributes.append(fnAttribute)
125       
126
127        if self.lastName is not None:
128            # special case handling for 'LastName' attribute
129            lnAttribute = Attribute()
130            lnAttribute.name = "urn:esg:last:name"
131            lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
132            lnAttribute.friendlyName = "LastName"
133
134            lastName = XSStringAttributeValue()
135            lastName.value = self.lastName
136            lnAttribute.attributeValues.append(lastName)
137
138            attributes.append(lnAttribute)
139       
140
141        if self.emailAddress is not None:
142            # special case handling for 'LastName' attribute
143            emailAddressAttribute = Attribute()
144            emailAddressAttribute.name = "urn:esg:email:address"
145            emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
146                                        XSStringAttributeValue.TYPE_LOCAL_NAME
147            emailAddressAttribute.friendlyName = "emailAddress"
148
149            emailAddress = XSStringAttributeValue()
150            emailAddress.value = self.emailAddress
151            emailAddressAttribute.attributeValues.append(emailAddress)
152
153            attributes.append(emailAddressAttribute)
154       
155        if len(self.__groupRoleList) > 0:
156            # custom group/role attribute to be added to attr statement
157            groupRoleAttribute = Attribute()
158            groupRoleAttribute.name = "GroupRole"
159            groupRoleAttribute.nameFormat = \
160                                    XSGroupRoleAttributeValue.TYPE_LOCAL_NAME
161
162            for group, role in self.__groupRoleList:
163                groupRole = XSGroupRoleAttributeValue()
164                groupRole.group = group
165                groupRole.role = role
166
167                groupRoleAttribute.attributeValues.append(groupRole)
168           
169            attributes.append(groupRoleAttribute)
170       
171        for name, value in self.__miscAttrList:
172            attribute = Attribute()
173            attribute.name = name
174            attribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
175
176            stringAttributeValue = XSStringAttributeValue()
177            stringAttributeValue.value = value
178            attribute.attributeValues.append(stringAttributeValue)
179
180            attributes.append(attribute)
181           
182        return attributes
183
184
185class SAMLTestCase(unittest.TestCase):
186    """Test SAML implementation for use with CMIP5 federation"""
187   
188    def _createAssertionHelper(self):
189        samlUtil = SAMLUtil()
190       
191        # ESG core attributes
192        samlUtil.firstName = "Philip"
193        samlUtil.lastName = "Kershaw"
194        samlUtil.emailAddress = "p.j.k@somewhere"
195       
196        # BADC specific attributes
197        badcRoleList = (
198            'urn:badc:security:authz:1.0:attr:admin', 
199            'urn:badc:security:authz:1.0:attr:rapid', 
200            'urn:badc:security:authz:1.0:attr:coapec', 
201            'urn:badc:security:authz:1.0:attr:midas', 
202            'urn:badc:security:authz:1.0:attr:quest', 
203            'urn:badc:security:authz:1.0:attr:staff'
204        )
205        for role in badcRoleList:
206            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
207       
208        # ESG Group/Role type list
209        esgGroupRoleList = (
210            ("ESG-NCAR", "admin"),
211            ("ESG-PCMDI", "testUser"),
212        )
213        for group, role in esgGroupRoleList:
214            samlUtil.addGroupRole(group, role)
215       
216        # Make an assertion object
217        assertion = samlUtil.buildAssertion()
218       
219        return assertion
220       
221    def test01CreateAssertion(self):
222         
223        assertion = self._createAssertionHelper()
224       
225        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
226        # Attribute Value factory to render the XML output
227        attributeValueElementTreeClassMap = {
228            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
229        }
230       
231        # Create ElementTree Assertion Element
232        assertionElem = AssertionElementTree.create(assertion,
233                            customClassMap=attributeValueElementTreeClassMap)
234       
235        self.assert_(iselement(assertionElem))
236       
237        # Serialise to output
238        xmlOutput = prettyPrint(assertionElem)       
239        self.assert_(len(xmlOutput))
240       
241        print("\n"+"_"*80)
242        print(xmlOutput)
243        print("_"*80)
244
245    def test02CreateAttributeQuery(self):
246        samlUtil = SAMLUtil()
247        samlUtil.firstName = ''
248        samlUtil.lastName = ''
249        samlUtil.emailAddress = ''
250        attributeQuery = samlUtil.buildAttributeQuery(
251                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
252                        "https://openid.localhost/philip.kershaw")
253       
254        elem = AttributeQueryElementTree.create(attributeQuery)       
255        xmlOutput = prettyPrint(elem)
256           
257        print("\n"+"_"*80)
258        print(xmlOutput)
259        print("_"*80)
260
261    def test03ParseAttributeQuery(self):
262        samlUtil = SAMLUtil()
263        samlUtil.firstName = ''
264        samlUtil.lastName = ''
265        samlUtil.emailAddress = ''
266        attributeQuery = samlUtil.buildAttributeQuery(
267                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
268                        "https://openid.localhost/philip.kershaw")
269       
270        elem = AttributeQueryElementTree.create(attributeQuery)       
271        xmlOutput = prettyPrint(elem)       
272        print("\n"+"_"*80)
273        print(xmlOutput)
274               
275        attributeQueryStream = StringIO()
276        attributeQueryStream.write(xmlOutput)
277        attributeQueryStream.seek(0)
278
279        tree = ElementTree.parse(attributeQueryStream)
280        elem2 = tree.getroot()
281       
282        attributeQuery2 = AttributeQueryElementTree.parse(elem2)
283        self.assert_(attributeQuery2.id == attributeQuery.id)
284        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
285        self.assert_(attributeQuery2.subject.nameID.value == \
286                     attributeQuery.subject.nameID.value)
287       
288        self.assert_(attributeQuery2.attributes[1].name == \
289                     attributeQuery.attributes[1].name)
290       
291        xmlOutput2 = prettyPrint(elem2)       
292        print("_"*80)
293        print(xmlOutput2)
294        print("_"*80)
295
296    def test04createResponse(self):
297        response = Response()
298        response.issueInstant = datetime.utcnow()
299       
300        # Make up a request ID that this response is responding to
301        response.inResponseTo = str(uuid4())
302        response.id = str(uuid4())
303        response.version = SAMLVersion(SAMLVersion.VERSION_20)
304           
305        response.issuer = Issuer()
306        response.issuer.format = Issuer.X509_SUBJECT
307        response.issuer.value = \
308                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
309       
310        response.status = Status()
311        response.status.statusCode = StatusCode()
312        response.status.statusCode.value = StatusCode.SUCCESS_URI       
313               
314        assertion = self._createAssertionHelper()
315       
316        # Add a conditions statement for a validity of 8 hours
317        assertion.conditions = Conditions()
318        assertion.conditions.notBefore = datetime.utcnow()
319        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
320            timedelta(seconds=60*60*8)
321       
322        assertion.subject = Subject() 
323        assertion.subject.nameID = NameID()
324        assertion.subject.nameID.format = "urn:esg:openid"
325        assertion.subject.nameID.value = \
326                        "https://openid.localhost/philip.kershaw"   
327           
328        assertion.issuer = Issuer()
329        assertion.issuer.format = Issuer.X509_SUBJECT
330        assertion.issuer.value = \
331                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
332
333        response.assertions.append(assertion)
334       
335        # Add mapping for ESG Group/Role Attribute Value to enable ElementTree
336        # Attribute Value factory to render the XML output
337        attributeValueElementTreeClassMap = {
338            XSGroupRoleAttributeValue: XSGroupRoleAttributeValueElementTree           
339        }
340       
341        # Create ElementTree Assertion Element
342        responseElem = ResponseElementTree.create(response,
343                            customClassMap=attributeValueElementTreeClassMap)
344       
345        self.assert_(iselement(responseElem))
346       
347        # Serialise to output       
348        xmlOutput = prettyPrint(responseElem)       
349        self.assert_(len(xmlOutput))
350        print("\n"+"_"*80)
351        print(xmlOutput)
352        print("_"*80)
353   
354if __name__ == "__main__":
355    unittest.main()       
Note: See TracBrowser for help on using the repository browser.