source: TI12-security/trunk/ndg_xacml/ndg/xacml/test/test_context.py @ 7443

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/ndg_xacml/ndg/xacml/test/test_context.py@7443
Revision 7443, 13.4 KB checked in by pjkersha, 11 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • Overhaul of test_context unit tests to make a more comprehensive test of policy rule evaluation.
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG XACML Context unit test package
3
4NERC DataGrid
5"""
6__author__ = "P J Kershaw"
7__date__ = "26/03/10"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__license__ = "BSD - see LICENSE file in top-level directory"
11__contact__ = "Philip.Kershaw@stfc.ac.uk"
12__revision__ = "$Id$"
13import unittest
14from os import path
15import logging
16logging.basicConfig(level=logging.DEBUG)
17
18from ndg.xacml.test import XACML_NDGTEST1_FILEPATH
19from ndg.xacml.parsers.etree.factory import ReaderFactory
20from ndg.xacml.core import Identifiers
21from ndg.xacml.core.context.pdpinterface import PDPInterface
22from ndg.xacml.core.context.pdp import PDP
23from ndg.xacml.core.context.handler import CtxHandlerInterface
24from ndg.xacml.core.attribute import Attribute
25from ndg.xacml.core.attributevalue import (AttributeValue, 
26                                           AttributeValueClassFactory)
27from ndg.xacml.core.context.request import Request
28from ndg.xacml.core.context.response import Response
29from ndg.xacml.core.context.result import Result, Decision
30from ndg.xacml.core.context.subject import Subject
31from ndg.xacml.core.context.resource import Resource
32from ndg.xacml.core.context.action import Action
33
34attributeValueFactory = AttributeValueClassFactory()
35AnyUriAttributeValue = attributeValueFactory(AttributeValue.ANY_TYPE_URI)
36StringAttributeValue = attributeValueFactory(AttributeValue.STRING_TYPE_URI)
37
38ROLE_ATTRIBUTE_ID = "urn:ndg:security:authz:1.0:attr"
39SUBJECT_ID = 'https://my.name.somewhere.ac.uk'
40
41class TestContextHandler(CtxHandlerInterface):
42    """Test implementation of Context Handler which includes an implemented PIP
43    interface"""
44   
45    def __init__(self):
46        """Add an attribute to hold a reference to a policy information point"""
47       
48        super(TestContextHandler, self).__init__()
49        self.pip = None       
50       
51    def handlePEPRequest(self, myRequest):
52        """Handle request from Policy Enforcement Point
53       
54        @param pepRequest: request from PEP, derived class determines its type
55        e.g. SAML AuthzDecisionQuery
56        @type myRequest: type
57        @return: PEP response - derived class determines type
58        @rtype: None
59        """
60       
61        # Convert myRequest to XACML context request - var assignment here is
62        # representative of this process rather than actually doing anything.
63        xacmlRequest = myRequest
64       
65        if self.pdp is None:
66            raise TypeError('No "pdp" attribute set')
67       
68        # Add a reference to this context so that the PDP can invoke queries
69        # back to the PIP
70        xacmlRequest.ctxHandler = self 
71               
72        xacmlResponse = self.pdp.evaluate(xacmlRequest)
73       
74        # Convert XACML context response to domain specific request
75        myResponse = xacmlResponse
76       
77        return myResponse
78   
79    def pipQuery(self, request, designator):
80        '''PIP adds admin attribute value for given attribute ID and for any
81        subject'''
82        if designator.attributeId == ROLE_ATTRIBUTE_ID:
83            attrVal = StringAttributeValue(value='admin')
84            return [attrVal]
85        else:
86            return None
87
88
89class XacmlContextBaseTestCase(unittest.TestCase):
90    """Base class containing common methods for test initialisation"""
91   
92    def _createRequestCtx(self, 
93                          resourceId, 
94                          includeSubject=True,
95                          subjectRoles=('staff',),
96                          roleAttributeId=ROLE_ATTRIBUTE_ID,
97                          action='read'):
98        """Create an example XACML Request Context for tests"""
99        request = Request()
100       
101        if includeSubject:
102            subject = Subject()
103            openidSubjectAttribute = Attribute()
104           
105           
106            openidSubjectAttribute.attributeId = "urn:esg:openid"
107            openidSubjectAttribute.dataType = AnyUriAttributeValue.IDENTIFIER
108           
109            openidSubjectAttribute.attributeValues.append(
110                                                        AnyUriAttributeValue())
111            openidSubjectAttribute.attributeValues[-1].value = SUBJECT_ID
112                                       
113           
114            subject.attributes.append(openidSubjectAttribute)
115   
116            for role in subjectRoles:
117                roleAttribute = Attribute()
118               
119                roleAttribute.attributeId = roleAttributeId
120                roleAttribute.dataType = StringAttributeValue.IDENTIFIER
121               
122                roleAttribute.attributeValues.append(StringAttributeValue())
123                roleAttribute.attributeValues[-1].value = role
124           
125                subject.attributes.append(roleAttribute)
126                                     
127            request.subjects.append(subject)
128       
129        resource = Resource()
130        resourceAttribute = Attribute()
131        resource.attributes.append(resourceAttribute)
132       
133        resourceAttribute.attributeId = Identifiers.Resource.RESOURCE_ID
134                           
135        resourceAttribute.dataType = AnyUriAttributeValue.IDENTIFIER
136        resourceAttribute.attributeValues.append(AnyUriAttributeValue())
137        resourceAttribute.attributeValues[-1].value = resourceId
138
139        request.resources.append(resource)
140       
141        request.action = Action()
142        actionAttribute = Attribute()
143        request.action.attributes.append(actionAttribute)
144       
145        actionAttribute.attributeId = Identifiers.Action.ACTION_ID
146        actionAttribute.dataType = StringAttributeValue.IDENTIFIER
147        actionAttribute.attributeValues.append(StringAttributeValue())
148        actionAttribute.attributeValues[-1].value = action
149       
150        return request
151       
152    def _createPDPfromPolicy(self):
153        pdp = PDP.fromPolicySource(XACML_NDGTEST1_FILEPATH, ReaderFactory)
154        return pdp   
155
156
157class XacmlContextTestCase(XacmlContextBaseTestCase):
158    """Test PDP, PAP, PIP and Context handler"""
159   
160    def test01CreateRequest(self):
161        requestCtx = self._createRequestCtx("http://localhost")
162        self.assert_(requestCtx)
163       
164    def test02CreateResponse(self):
165        response = Response()
166        result = Result()
167        response.results.append(result)
168        result.decision = Decision()
169        result.decision.value = Decision.NOT_APPLICABLE
170       
171    def test03AbstractCtxHandler(self):
172        self.assertRaises(TypeError, CtxHandlerInterface, 
173                          "Context handler is an abstract base class")
174       
175    def test04CreateCtxHandler(self):
176        ctxHandler = TestContextHandler()
177       
178    def test05PDPInterface(self):
179        self.assertRaises(TypeError, PDPInterface)
180       
181    def test06CreatePDP(self):
182        pdp = PDP()
183        self.assert_(pdp)
184       
185    def test07CreatePDPfromPolicy(self):
186        pdp = self._createPDPfromPolicy()
187        self.assert_(pdp)
188   
189   
190class XacmlEvalPdpWithPermitOverridesPolicy(XacmlContextBaseTestCase):
191    """Test PDP with permit overrides rule combining algorithm"""
192   
193    NOT_APPLICABLE_RESOURCE_ID = 'https://localhost'
194   
195    # This could be any applicable resource value, provided there's no rule to
196    # override and enable access
197    PRIVATE_RESOURCE_ID = 'http://localhost/private-resource'
198   
199    PUBLIC_RESOURCE_ID = 'http://localhost/resource-only-restricted'
200    NOT_APPLICABLE_RESOURCE_ID = 'https://localhost'
201       
202    SINGLE_SUBJECT_ROLE_RESTRICTED_ID = \
203        'http://localhost/single-subject-role-restricted'
204    ACTION_AND_SINGLE_SUBJECT_ROLE_RESTRICTED_ID = \
205        'http://localhost/action-and-single-subject-role-restricted'
206    AT_LEAST_ONE_SUBJECT_ROLE_RESTRICTED_ID = \
207        'http://localhost/at-least-of-subject-role-restricted'
208       
209    def setUp(self):
210        self.pdp = self._createPDPfromPolicy()
211       
212    def test01NotApplicable(self):
213        # Set a resource Id that doesn't match the main target
214        request = self._createRequestCtx(
215                                    self.__class__.NOT_APPLICABLE_RESOURCE_ID)
216        response = self.pdp.evaluate(request)
217        self.failIf(response is None, "Null response")
218        for result in response.results:
219            self.failIf(result.decision != Decision.NOT_APPLICABLE, 
220                        "Expecting not applicable decision")
221       
222    def test02PublicallyAccessibleResource(self):
223        # Test a resource which has no subject restrictions
224        request = self._createRequestCtx(self.__class__.PUBLIC_RESOURCE_ID,
225                                         includeSubject=False)
226        response = self.pdp.evaluate(request)
227        self.failIf(response is None, "Null response")
228        for result in response.results:
229            self.failIf(result.decision != Decision.PERMIT, 
230                        "Expecting Permit decision")
231       
232    def test03PrivateResource(self):
233        request = self._createRequestCtx(
234                                    self.__class__.PRIVATE_RESOURCE_ID)
235        response = self.pdp.evaluate(request)
236        self.failIf(response is None, "Null response")
237        for result in response.results:
238            self.failIf(result.decision != Decision.DENY, 
239                        "Expecting Deny decision")
240
241    def test04SingleSubjectRoleRestrictedResource(self):
242        # Access based on a resource ID and single subject role
243        request = self._createRequestCtx(
244                            self.__class__.SINGLE_SUBJECT_ROLE_RESTRICTED_ID)
245        response = self.pdp.evaluate(request)
246        self.failIf(response is None, "Null response")
247        for result in response.results:
248            self.failIf(result.decision != Decision.PERMIT, 
249                        "Expecting Permit decision") 
250
251    def test05SingleSubjectRoleRestrictedResourceDeniesAccess(self):
252        # Subject doesn't have the required role for access
253        request = self._createRequestCtx(
254                            self.__class__.SINGLE_SUBJECT_ROLE_RESTRICTED_ID,
255                            subjectRoles=('student',))
256        response = self.pdp.evaluate(request)
257        self.failIf(response is None, "Null response")
258        for result in response.results:
259            self.failIf(result.decision != Decision.DENY, 
260                        "Expecting Deny decision") 
261
262    def test06ActionAndSingleSubjectRoleRestrictedResource(self):
263        # Test restriction based on action type as well as subject role
264        request = self._createRequestCtx(
265                    self.__class__.ACTION_AND_SINGLE_SUBJECT_ROLE_RESTRICTED_ID)
266        response = self.pdp.evaluate(request)
267        self.failIf(response is None, "Null response")
268        for result in response.results:
269            self.failIf(result.decision != Decision.PERMIT, 
270                        "Expecting Permit decision")
271
272    def test07ActionAndSingleSubjectRoleRestrictedResourceDeniesAccess(self):
273        # Test subject requests invalid action type
274        request = self._createRequestCtx(
275                    self.__class__.ACTION_AND_SINGLE_SUBJECT_ROLE_RESTRICTED_ID,
276                    action='write')
277        response = self.pdp.evaluate(request)
278        self.failIf(response is None, "Null response")
279        for result in response.results:
280            self.failIf(result.decision != Decision.DENY, 
281                        "Expecting Deny decision") 
282
283    def test08AtLeastOneSubjectRoleResource(self):
284        # Test at least one member function
285        request = self._createRequestCtx(
286                    self.__class__.AT_LEAST_ONE_SUBJECT_ROLE_RESTRICTED_ID,
287                    action='write')
288        response = self.pdp.evaluate(request)
289        self.failIf(response is None, "Null response")
290        for result in response.results:
291            self.failIf(result.decision != Decision.PERMIT, 
292                        "Expecting Permit decision")             
293
294    def test09AtLeastOneSubjectRoleResourceDeniesAccess(self):
295        # Test at least one member function where subject doesn't have one of
296        # the required roles
297        request = self._createRequestCtx(
298                    self.__class__.AT_LEAST_ONE_SUBJECT_ROLE_RESTRICTED_ID,
299                    subjectRoles=('student',))
300        response = self.pdp.evaluate(request)
301        self.failIf(response is None, "Null response")
302        for result in response.results:
303            self.failIf(result.decision != Decision.DENY, 
304                        "Expecting Deny decision")             
305   
306    def test10PipAddsRequiredAttributeValToEnableAccess(self):
307        # The PDP is part of a context handler with a PIP which adds subject
308        # attributes under prescribed conditions on the evaluation of
309        # subject attribute designators.  In this case the addition of the PIP
310        # adds an attribute value to one of the subject's attributes which means
311        # they're granted access where otherwise access would be denied
312        ctxHandler = TestContextHandler()
313        ctxHandler.pdp = self.pdp
314       
315        request = self._createRequestCtx(
316                    self.__class__.AT_LEAST_ONE_SUBJECT_ROLE_RESTRICTED_ID,
317                    subjectRoles=('student',))
318       
319        response = ctxHandler.handlePEPRequest(request)
320        self.failIf(response is None, "Null response")
321        for result in response.results:
322            self.failIf(result.decision != Decision.PERMIT, 
323                        "Expecting PERMIT decision")         
324       
325                               
326if __name__ == "__main__":
327    unittest.main()
Note: See TracBrowser for help on using the repository browser.