source: TI12-security/trunk/ndg_xacml/ndg/xacml/test/context/__init__.py @ 7666

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/ndg_xacml/ndg/xacml/test/context/__init__.py@7666
Revision 7666, 5.8 KB checked in by pjkersha, 10 years ago (diff)

Started testing PDP with custom Group/Role? Attribute Value type. Custom match and Bag functions are needed to enable evaluation of rules using the custom type.

Line 
1#!/usr/bin/env python
2"""NDG XACML Context unit test package
3
4NERC DataGrid
5"""
6__author__ = "P J Kershaw"
7__date__ = "28/10/10"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__license__ = "BSD - see LICENSE file in top-level directory"
11__contact__ = "Philip.Kershaw@stfc.ac.uk"
12__revision__ = "$Id$"
13import unittest
14from os import path
15
16from ndg.xacml.core import Identifiers
17from ndg.xacml.core.attribute import Attribute
18from ndg.xacml.core.attributevalue import (AttributeValue, 
19                                           AttributeValueClassFactory)
20
21from ndg.xacml.core.context.request import Request
22from ndg.xacml.core.context.subject import Subject
23from ndg.xacml.core.context.resource import Resource
24from ndg.xacml.core.context.action import Action
25from ndg.xacml.core.context.pdp import PDP
26from ndg.xacml.core.context.handler import CtxHandlerBase
27from ndg.xacml.parsers.etree.factory import ReaderFactory
28
29from ndg.xacml.test import XACML_NDGTEST1_FILEPATH
30 
31ROLE_ATTRIBUTE_ID = "urn:ndg:security:authz:1.0:attr"
32SUBJECT_ID = 'https://my.name.somewhere.ac.uk'
33
34attributeValueFactory = AttributeValueClassFactory()
35AnyUriAttributeValue = attributeValueFactory(AttributeValue.ANY_TYPE_URI)
36StringAttributeValue = attributeValueFactory(AttributeValue.STRING_TYPE_URI)
37
38
39class TestContextHandler(CtxHandlerBase):
40    """Test implementation of Context Handler which includes an implemented PIP
41    interface"""
42   
43    def __init__(self):
44        """Add an attribute to hold a reference to a policy information point"""
45       
46        super(TestContextHandler, self).__init__()
47       
48    def handlePEPRequest(self, myRequest):
49        """Handle request from Policy Enforcement Point
50       
51        @param pepRequest: request from PEP, derived class determines its type
52        e.g. SAML AuthzDecisionQuery
53        @type myRequest: type
54        @return: PEP response - derived class determines type
55        @rtype: None
56        """
57       
58        # Convert myRequest to XACML context request - var assignment here is
59        # representative of this process rather than actually doing anything.
60        xacmlRequest = myRequest
61       
62        if self.pdp is None:
63            raise TypeError('No "pdp" attribute set')
64       
65        # Add a reference to this context so that the PDP can invoke queries
66        # back to the PIP
67        xacmlRequest.ctxHandler = self 
68               
69        xacmlResponse = self.pdp.evaluate(xacmlRequest)
70       
71        # Convert XACML context response to domain specific request
72        myResponse = xacmlResponse
73       
74        return myResponse
75   
76    def pipQuery(self, request, designator):
77        '''PIP adds admin attribute value for given attribute ID and for any
78        subject'''
79        if designator.attributeId == ROLE_ATTRIBUTE_ID:
80            attrVal = StringAttributeValue(value='admin')
81            return [attrVal]
82        else:
83            return None 
84       
85       
86class XacmlContextBaseTestCase(unittest.TestCase):
87    """Base class containing common methods for test initialisation"""
88   
89    @staticmethod
90    def _createRequestCtx(resourceId, 
91                          includeSubject=True,
92                          subjectRoles=None,
93                          roleAttributeId=ROLE_ATTRIBUTE_ID,
94                          action='read'):
95        """Create an example XACML Request Context for tests"""
96        if subjectRoles is None:
97            subjectRoles = ('staff',)
98           
99        request = Request()
100       
101        if includeSubject:
102            subject = Subject()
103            openidSubjectAttribute = Attribute()
104           
105            openidSubjectAttribute.attributeId = "urn:esg:openid"
106            openidSubjectAttribute.dataType = AnyUriAttributeValue.IDENTIFIER
107           
108            openidSubjectAttribute.attributeValues.append(
109                                                        AnyUriAttributeValue())
110            openidSubjectAttribute.attributeValues[-1].value = SUBJECT_ID
111                                       
112           
113            subject.attributes.append(openidSubjectAttribute)
114   
115            for role in subjectRoles:
116                roleAttribute = Attribute()
117               
118                roleAttribute.attributeId = roleAttributeId
119                roleAttribute.dataType = StringAttributeValue.IDENTIFIER
120               
121                roleAttribute.attributeValues.append(StringAttributeValue())
122                roleAttribute.attributeValues[-1].value = role
123           
124                subject.attributes.append(roleAttribute)
125                                     
126            request.subjects.append(subject)
127       
128        resource = Resource()
129        resourceAttribute = Attribute()
130        resource.attributes.append(resourceAttribute)
131       
132        resourceAttribute.attributeId = Identifiers.Resource.RESOURCE_ID
133                           
134        resourceAttribute.dataType = AnyUriAttributeValue.IDENTIFIER
135        resourceAttribute.attributeValues.append(AnyUriAttributeValue())
136        resourceAttribute.attributeValues[-1].value = resourceId
137
138        request.resources.append(resource)
139       
140        request.action = Action()
141        actionAttribute = Attribute()
142        request.action.attributes.append(actionAttribute)
143       
144        actionAttribute.attributeId = Identifiers.Action.ACTION_ID
145        actionAttribute.dataType = StringAttributeValue.IDENTIFIER
146        actionAttribute.attributeValues.append(StringAttributeValue())
147        actionAttribute.attributeValues[-1].value = action
148       
149        return request
150       
151    @staticmethod
152    def _createPDPfromNdgTest1Policy():
153        """Create PDP from NDG test policy file"""
154        pdp = PDP.fromPolicySource(XACML_NDGTEST1_FILEPATH, ReaderFactory)
155        return pdp
Note: See TracBrowser for help on using the repository browser.