source: TI12-security/trunk/ndg_security_saml/saml/test/test_saml.py @ 6546

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/ndg_security_saml/saml/test/test_saml.py@6546
Revision 6546, 12.8 KB checked in by pjkersha, 11 years ago (diff)

started unit testing for SAML AuthzDecisionQuery?

Line 
1"""SAML unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "21/07/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id: $'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13   
14from datetime import datetime, timedelta
15from uuid import uuid4
16from cStringIO import StringIO
17
18import unittest
19
20from xml.etree.ElementTree import iselement
21from xml.etree import ElementTree
22
23from saml.saml2.core import Action
24from saml.saml2.core import (SAMLVersion, Attribute, AttributeStatement, 
25                             Assertion, AttributeQuery, Response, Issuer, 
26                             Subject, NameID, StatusCode, 
27                             StatusMessage, Status, Conditions, 
28                             XSStringAttributeValue,
29                             AuthzDecisionQuery)
30
31from saml.common.xml import SAMLConstants
32from saml.xml.etree import (prettyPrint, AssertionElementTree, 
33                            AttributeQueryElementTree, ResponseElementTree)
34
35
36class SAMLUtil(object):
37    """SAML utility class based on ANL examples for Earth System Grid:
38    http://www.ci.uchicago.edu/wiki/bin/view/ESGProject/ESGSAMLAttributes#ESG_Attribute_Service
39    """
40   
41    def __init__(self):
42        """Set-up ESG core attributes, Group/Role and miscellaneous
43        attributes lists
44        """
45        self.firstName = None
46        self.lastName = None
47        self.emailAddress = None
48       
49        self.__miscAttrList = []
50   
51    def addAttribute(self, name, value):
52        """Add a generic attribute
53        @type name: basestring
54        @param name: attribute name
55        @type value: basestring
56        @param value: attribute value
57        """
58        self.__miscAttrList.append((name, value))
59
60    def buildAssertion(self):
61        """Create a SAML Assertion containing ESG core attributes: First
62        Name, Last Name, e-mail Address; ESG Group/Role type attributes
63        and generic attributes
64        @rtype: ndg.security.common.saml.Assertion
65        @return: new SAML Assertion object
66        """
67       
68        assertion = Assertion()
69        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
70        assertion.id = str(uuid4())
71        assertion.issueInstant = datetime.utcnow()
72        attributeStatement = AttributeStatement()
73       
74        for attribute in self.createAttributes():
75            attributeStatement.attributes.append(attribute)
76           
77        assertion.attributeStatements.append(attributeStatement)
78       
79        return assertion
80
81    def buildAttributeQuery(self, issuer, subjectNameID):
82        """Make a SAML Attribute Query
83        @type issuer: basestring
84        @param issuer: attribute issuer name
85        @type subjectNameID: basestring
86        @param subjectNameID: identity to query attributes for
87        """
88        attributeQuery = AttributeQuery()
89        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
90        attributeQuery.id = str(uuid4())
91        attributeQuery.issueInstant = datetime.utcnow()
92       
93        attributeQuery.issuer = Issuer()
94        attributeQuery.issuer.format = Issuer.X509_SUBJECT
95        attributeQuery.issuer.value = issuer
96                       
97        attributeQuery.subject = Subject() 
98        attributeQuery.subject.nameID = NameID()
99        attributeQuery.subject.nameID.format = "urn:esg:openid"
100        attributeQuery.subject.nameID.value = subjectNameID
101                                   
102        attributeQuery.attributes = self.createAttributes()
103       
104        return attributeQuery
105   
106    def createAttributes(self):
107        """Create SAML Attributes for use in an Assertion or AttributeQuery"""
108       
109        attributes = []
110        if self.firstName is not None:   
111            # special case handling for 'FirstName' attribute
112            fnAttribute = Attribute()
113            fnAttribute.name = "urn:esg:first:name"
114            fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
115            fnAttribute.friendlyName = "FirstName"
116
117            firstName = XSStringAttributeValue()
118            firstName.value = self.firstName
119            fnAttribute.attributeValues.append(firstName)
120
121            attributes.append(fnAttribute)
122       
123
124        if self.lastName is not None:
125            # special case handling for 'LastName' attribute
126            lnAttribute = Attribute()
127            lnAttribute.name = "urn:esg:last:name"
128            lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
129            lnAttribute.friendlyName = "LastName"
130
131            lastName = XSStringAttributeValue()
132            lastName.value = self.lastName
133            lnAttribute.attributeValues.append(lastName)
134
135            attributes.append(lnAttribute)
136       
137
138        if self.emailAddress is not None:
139            # special case handling for 'LastName' attribute
140            emailAddressAttribute = Attribute()
141            emailAddressAttribute.name = "urn:esg:email:address"
142            emailAddressAttribute.nameFormat = SAMLConstants.XSD_NS+"#"+\
143                                        XSStringAttributeValue.TYPE_LOCAL_NAME
144            emailAddressAttribute.friendlyName = "emailAddress"
145
146            emailAddress = XSStringAttributeValue()
147            emailAddress.value = self.emailAddress
148            emailAddressAttribute.attributeValues.append(emailAddress)
149
150            attributes.append(emailAddressAttribute)
151       
152        for name, value in self.__miscAttrList:
153            attribute = Attribute()
154            attribute.name = name
155            attribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"
156
157            stringAttributeValue = XSStringAttributeValue()
158            stringAttributeValue.value = value
159            attribute.attributeValues.append(stringAttributeValue)
160
161            attributes.append(attribute)
162           
163        return attributes
164
165
166class SAMLTestCase(unittest.TestCase):
167    """Test SAML implementation for use with CMIP5 federation"""
168   
169    def _createAssertionHelper(self):
170        samlUtil = SAMLUtil()
171       
172        # ESG core attributes
173        samlUtil.firstName = "Philip"
174        samlUtil.lastName = "Kershaw"
175        samlUtil.emailAddress = "p.j.k@somewhere"
176       
177        # BADC specific attributes
178        badcRoleList = (
179            'urn:badc:security:authz:1.0:attr:admin', 
180            'urn:badc:security:authz:1.0:attr:rapid', 
181            'urn:badc:security:authz:1.0:attr:coapec', 
182            'urn:badc:security:authz:1.0:attr:midas', 
183            'urn:badc:security:authz:1.0:attr:quest', 
184            'urn:badc:security:authz:1.0:attr:staff'
185        )
186        for role in badcRoleList:
187            samlUtil.addAttribute("urn:badc:security:authz:1.0:attr", role)
188       
189        # Make an assertion object
190        assertion = samlUtil.buildAssertion()
191       
192        return assertion
193       
194    def test01CreateAssertion(self):
195         
196        assertion = self._createAssertionHelper()
197
198       
199        # Create ElementTree Assertion Element
200        assertionElem = AssertionElementTree.toXML(assertion)
201       
202        self.assert_(iselement(assertionElem))
203       
204        # Serialise to output
205        xmlOutput = prettyPrint(assertionElem)       
206        self.assert_(len(xmlOutput))
207       
208        print("\n"+"_"*80)
209        print(xmlOutput)
210        print("_"*80)
211
212    def test02ParseAssertion(self):
213        assertion = self._createAssertionHelper()
214       
215        # Create ElementTree Assertion Element
216        assertionElem = AssertionElementTree.toXML(assertion)
217       
218        self.assert_(iselement(assertionElem))
219       
220        # Serialise to output
221        xmlOutput = prettyPrint(assertionElem)       
222           
223        print("\n"+"_"*80)
224        print(xmlOutput)
225        print("_"*80)
226               
227        assertionStream = StringIO()
228        assertionStream.write(xmlOutput)
229        assertionStream.seek(0)
230
231        tree = ElementTree.parse(assertionStream)
232        elem2 = tree.getroot()
233       
234        assertion2 = AssertionElementTree.fromXML(elem2)
235        self.assert_(assertion2)
236       
237    def test03CreateAttributeQuery(self):
238        samlUtil = SAMLUtil()
239        samlUtil.firstName = ''
240        samlUtil.lastName = ''
241        samlUtil.emailAddress = ''
242        attributeQuery = samlUtil.buildAttributeQuery(
243                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
244                        "https://openid.localhost/philip.kershaw")
245       
246        elem = AttributeQueryElementTree.toXML(attributeQuery)       
247        xmlOutput = prettyPrint(elem)
248           
249        print("\n"+"_"*80)
250        print(xmlOutput)
251        print("_"*80)
252
253    def test04ParseAttributeQuery(self):
254        samlUtil = SAMLUtil()
255        samlUtil.firstName = ''
256        samlUtil.lastName = ''
257        samlUtil.emailAddress = ''
258        attributeQuery = samlUtil.buildAttributeQuery(
259                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk",
260                        "https://openid.localhost/philip.kershaw")
261       
262        elem = AttributeQueryElementTree.toXML(attributeQuery)       
263        xmlOutput = prettyPrint(elem)       
264        print("\n"+"_"*80)
265        print(xmlOutput)
266               
267        attributeQueryStream = StringIO()
268        attributeQueryStream.write(xmlOutput)
269        attributeQueryStream.seek(0)
270
271        tree = ElementTree.parse(attributeQueryStream)
272        elem2 = tree.getroot()
273       
274        attributeQuery2 = AttributeQueryElementTree.fromXML(elem2)
275        self.assert_(attributeQuery2.id == attributeQuery.id)
276        self.assert_(attributeQuery2.issuer.value==attributeQuery.issuer.value)
277        self.assert_(attributeQuery2.subject.nameID.value == \
278                     attributeQuery.subject.nameID.value)
279       
280        self.assert_(attributeQuery2.attributes[1].name == \
281                     attributeQuery.attributes[1].name)
282       
283        xmlOutput2 = prettyPrint(elem2)       
284        print("_"*80)
285        print(xmlOutput2)
286        print("_"*80)
287
288    def test05CreateResponse(self):
289        response = Response()
290        response.issueInstant = datetime.utcnow()
291       
292        # Make up a request ID that this response is responding to
293        response.inResponseTo = str(uuid4())
294        response.id = str(uuid4())
295        response.version = SAMLVersion(SAMLVersion.VERSION_20)
296           
297        response.issuer = Issuer()
298        response.issuer.format = Issuer.X509_SUBJECT
299        response.issuer.value = \
300                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
301       
302        response.status = Status()
303        response.status.statusCode = StatusCode()
304        response.status.statusCode.value = StatusCode.SUCCESS_URI
305        response.status.statusMessage = StatusMessage()       
306        response.status.statusMessage.value = "Response created successfully"
307           
308        assertion = self._createAssertionHelper()
309       
310        # Add a conditions statement for a validity of 8 hours
311        assertion.conditions = Conditions()
312        assertion.conditions.notBefore = datetime.utcnow()
313        assertion.conditions.notOnOrAfter = (assertion.conditions.notBefore + 
314                                             timedelta(seconds=60*60*8))
315       
316        assertion.subject = Subject() 
317        assertion.subject.nameID = NameID()
318        assertion.subject.nameID.format = "urn:esg:openid"
319        assertion.subject.nameID.value = \
320                        "https://openid.localhost/philip.kershaw"   
321           
322        assertion.issuer = Issuer()
323        assertion.issuer.format = Issuer.X509_SUBJECT
324        assertion.issuer.value = \
325                        "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk"
326
327        response.assertions.append(assertion)
328       
329        # Create ElementTree Assertion Element
330        responseElem = ResponseElementTree.toXML(response)
331       
332        self.assert_(iselement(responseElem))
333       
334        # Serialise to output       
335        xmlOutput = prettyPrint(responseElem)       
336        self.assert_(len(xmlOutput))
337        print("\n"+"_"*80)
338        print(xmlOutput)
339        print("_"*80)
340       
341    def test06CreateAuthzDecisionQuery(self):
342        authzDecisionQuery = AuthzDecisionQuery()
343        authzDecisionQuery.resource = "http://LOCALHOST:80/My Secured URI"
344        self.assert_(":80" not in authzDecisionQuery.resource)
345        self.assert_("localhost" in authzDecisionQuery.resource)
346        self.assert_(" " not in authzDecisionQuery.resource)
347       
348        authzDecisionQuery.resource = \
349            "https://Somewhere.ac.uk:443/My Secured URI?blah=4&yes=True"
350           
351        self.assert_(":443" not in authzDecisionQuery.resource)
352        self.assert_("somewhere.ac.uk" in authzDecisionQuery.resource)
353        self.assert_("yes=True" in authzDecisionQuery.resource)
354   
355if __name__ == "__main__":
356    unittest.main()       
Note: See TracBrowser for help on using the repository browser.