source: TI12-security/trunk/ndg_xacml/ndg/xacml/test/context/test_pdp_with_custom_attributevalue_types.py @ 7666

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/ndg_xacml/ndg/xacml/test/context/test_pdp_with_custom_attributevalue_types.py@7666
Revision 7666, 5.5 KB checked in by pjkersha, 10 years ago (diff)

Started testing PDP with custom Group/Role? Attribute Value type. Custom match and Bag functions are needed to enable evaluation of rules using the custom type.

Line 
1#!/usr/bin/env python
2"""NDG unit tests for PDP working with a policy and request context containing
3custom AttributeValue data types
4
5NERC DataGrid
6"""
7__author__ = "P J Kershaw"
8__date__ = "28/10/10"
9__copyright__ = "(C) 2010 Science and Technology Facilities Council"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__license__ = "BSD - see LICENSE file in top-level directory"
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = "$Id$"
14import unittest
15import logging
16logging.basicConfig(level=logging.DEBUG)
17
18from ndg.xacml.core import Identifiers
19from ndg.xacml.core.attribute import Attribute
20from ndg.xacml.core.attributevalue import AttributeValueClassFactory
21
22from ndg.xacml.core.context.request import Request
23from ndg.xacml.core.context.subject import Subject
24from ndg.xacml.core.context.resource import Resource
25from ndg.xacml.core.context.action import Action
26from ndg.xacml.parsers.etree.factory import ReaderFactory
27from ndg.xacml.parsers.etree.attributevaluereader import (
28                                                DataTypeReaderClassFactory)
29from ndg.xacml.core.context.pdp import PDP
30from ndg.xacml.core.context.result import Decision
31from ndg.xacml.test import (XACML_ESGFTEST1_FILEPATH, 
32                            GroupRoleAttributeValue, 
33                            ETreeGroupRoleDataTypeReader)
34from ndg.xacml.test.context import (AnyUriAttributeValue, StringAttributeValue,
35                                    SUBJECT_ID)
36
37
38class XacmlEvalPdpWithCustomAttrTypes(unittest.TestCase):
39    """Evaluate a policy which contains custom XACML Attribute Value Data types
40    """
41    AT_LEAST_ONE_SUBJECT_ROLE_RESTRICTED_ID = \
42        'http://localhost/at-least-of-subject-role-restricted'       
43         
44    @staticmethod
45    def _createRequestCtx(resourceId, 
46                          includeSubject=True,
47                          subjectGroupRoles=None,
48                          groupRoleAttributeId='urn:esg:attr',
49                          action='read'):
50        """Create an example XACML Request Context for tests"""
51        if subjectGroupRoles is None:
52            subjectGroupRoles = [('ACME', 'default')]
53           
54        request = Request()
55       
56        if includeSubject:
57            subject = Subject()
58            openidSubjectAttribute = Attribute()
59           
60            openidSubjectAttribute.attributeId = "urn:esg:openid"
61            openidSubjectAttribute.dataType = AnyUriAttributeValue.IDENTIFIER
62           
63            openidSubjectAttribute.attributeValues.append(
64                                                        AnyUriAttributeValue())
65            openidSubjectAttribute.attributeValues[-1].value = SUBJECT_ID
66                                       
67           
68            subject.attributes.append(openidSubjectAttribute)
69   
70            for group, role in subjectGroupRoles:
71                groupRoleAttribute = Attribute()
72               
73                groupRoleAttribute.attributeId = groupRoleAttributeId
74                groupRoleAttribute.dataType = 'urn:grouprole'
75               
76                groupRoleAttribute.attributeValues.append(
77                                                    GroupRoleAttributeValue())
78                groupRoleAttribute.attributeValues[-1].group = group
79                groupRoleAttribute.attributeValues[-1].role = role
80           
81                subject.attributes.append(groupRoleAttribute)
82                                     
83            request.subjects.append(subject)
84       
85        resource = Resource()
86        resourceAttribute = Attribute()
87        resource.attributes.append(resourceAttribute)
88       
89        resourceAttribute.attributeId = Identifiers.Resource.RESOURCE_ID
90                           
91        resourceAttribute.dataType = AnyUriAttributeValue.IDENTIFIER
92        resourceAttribute.attributeValues.append(AnyUriAttributeValue())
93        resourceAttribute.attributeValues[-1].value = resourceId
94
95        request.resources.append(resource)
96       
97        request.action = Action()
98        actionAttribute = Attribute()
99        request.action.attributes.append(actionAttribute)
100       
101        actionAttribute.attributeId = Identifiers.Action.ACTION_ID
102        actionAttribute.dataType = StringAttributeValue.IDENTIFIER
103        actionAttribute.attributeValues.append(StringAttributeValue())
104        actionAttribute.attributeValues[-1].value = action
105       
106        return request
107       
108    def setUp(self):
109        """Use ESG sample policy"""
110        # Add new type
111        AttributeValueClassFactory.addClass('urn:grouprole', 
112                                            GroupRoleAttributeValue)
113       
114        # Add new parser for this type
115        DataTypeReaderClassFactory.addReader('urn:grouprole', 
116                                ETreeGroupRoleDataTypeReader)
117       
118        # Example policy with custom attribute value type used with ESGF
119        self.pdp = PDP.fromPolicySource(XACML_ESGFTEST1_FILEPATH, ReaderFactory)
120                   
121    def test01AtLeastOneSubjectRoleResource(self):
122        # Test at least one member function
123        request = self._createRequestCtx(
124                    self.__class__.AT_LEAST_ONE_SUBJECT_ROLE_RESTRICTED_ID,
125                    action='write')
126        response = self.pdp.evaluate(request)
127        self.failIf(response is None, "Null response")
128        for result in response.results:
129            self.failIf(result.decision != Decision.PERMIT, 
130                        "Expecting Permit decision")   
131           
132           
133if __name__ == "__main__":
134    unittest.main()
Note: See TracBrowser for help on using the repository browser.