source: TI12-security/trunk/NDG_XACML/ndg/xacml/test/test_xacml.py @ 6775

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDG_XACML/ndg/xacml/test/test_xacml.py@6775
Revision 6775, 21.4 KB checked in by pjkersha, 11 years ago (diff)

More work on PDP and moved pdp, pip and pap modules to context package.

Line 
1"""NDG XACML unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "16/03/10"
7__copyright__ = "(C) 2010 Science and Technology Facilities Council"
8__contact__ = "Philip.Kershaw@stfc.ac.uk"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = "$Id: $"
12import unittest
13from os import path
14import logging
15logging.basicConfig(level=logging.DEBUG)
16
17from ndg.xacml.core.policy import Policy
18from ndg.xacml.core.attributedesignator import SubjectAttributeDesignator
19from ndg.xacml.core.attributeselector import AttributeSelector
20from ndg.xacml.parsers.etree.factory import ReaderFactory
21
22from ndg.xacml.core.attribute import Attribute
23from ndg.xacml.core.context.request import Request
24from ndg.xacml.core.context.response import Response
25from ndg.xacml.core.context.result import Result, Decision
26from ndg.xacml.core.context.subject import Subject
27from ndg.xacml.core.context.resource import Resource
28from ndg.xacml.core.context.action import Action
29
30THIS_DIR = path.dirname(__file__)
31
32
33class XACMLPolicyTestCase(unittest.TestCase):
34    XACML_TEST1_FILENAME = "rule1.xml"
35    XACML_TEST1_FILEPATH = path.join(THIS_DIR, XACML_TEST1_FILENAME)
36    XACML_TEST2_FILENAME = "rule2.xml"
37    XACML_TEST2_FILEPATH = path.join(THIS_DIR, XACML_TEST2_FILENAME)
38    XACML_TEST3_FILENAME = "rule3.xml"
39    XACML_TEST3_FILEPATH = path.join(THIS_DIR, XACML_TEST3_FILENAME)
40    XACML_TEST4_FILENAME = "rule4.xml"
41    XACML_TEST4_FILEPATH = path.join(THIS_DIR, XACML_TEST4_FILENAME)
42    XACML_NDGTEST1_FILENAME = "ndg1.xml"
43    XACML_NDGTEST1_FILEPATH = path.join(THIS_DIR, XACML_NDGTEST1_FILENAME)
44   
45    def test01ETreeParseRule1Policy(self):
46        PolicyReader = ReaderFactory.getReader(Policy)
47        policy = PolicyReader.parse(XACMLPolicyTestCase.XACML_TEST1_FILEPATH)
48        self.assert_(policy)
49       
50        self.assert_(
51            policy.policyId == "urn:oasis:names:tc:example:SimplePolicy1")
52       
53        self.assert_(policy.ruleCombiningAlgId == \
54        "urn:oasis:names:tc:xacml:1.0:rule-combining-algorithm:deny-overrides")
55       
56        self.assert_(
57            "Med Example Corp access control policy" in policy.description)
58       
59        self.assert_(len(policy.target.subjects) == 0)
60       
61        self.assert_(policy.rules[0].id == \
62                     "urn:oasis:names:tc:xacml:2.0:example:SimpleRule1")
63       
64        self.assert_(policy.rules[0].effect == 'Permit')
65       
66        self.assert_(
67            'Any subject with an e-mail name in the med.example.com domain' in \
68            policy.rules[0].description)
69       
70        self.assert_(len(policy.rules[0].target.subjects) == 1)
71        self.assert_(len(policy.rules[0].target.actions) == 0)
72        self.assert_(len(policy.rules[0].target.resources) == 0)
73        self.assert_(len(policy.rules[0].target.environments) == 0)
74       
75        self.assert_(len(policy.rules[0].target.subjects[0].subjectMatches) == 1)
76       
77        self.assert_(policy.rules[0].target.subjects[0].subjectMatches[0
78            ].matchId == \
79            "urn:oasis:names:tc:xacml:1.0:function:rfc822Name-match")
80       
81        self.assert_(policy.rules[0].target.subjects[0].subjectMatches[0
82            ].attributeValue.dataType == \
83            "urn:oasis:names:tc:xacml:1.0:data-type:rfc822Name")
84       
85        self.assert_(policy.rules[0].target.subjects[0].subjectMatches[0
86            ].attributeDesignator.dataType == \
87            "urn:oasis:names:tc:xacml:1.0:data-type:rfc822Name")
88       
89        # Attribute ID
90        self.assert_(policy.rules[0].target.subjects[0].subjectMatches[0
91            ].attributeDesignator.attributeId == \
92            "urn:oasis:names:tc:xacml:1.0:subject:subject-id")
93         
94    def test02ETreeParseRule2Policy(self):
95        PolicyReader = ReaderFactory.getReader(Policy)
96        policy = PolicyReader.parse(XACMLPolicyTestCase.XACML_TEST2_FILEPATH)
97        self.assert_(policy)
98       
99        self.assert_(
100        policy.policyId == "urn:oasis:names:tc:xacml:2.0:example:policyid:2")
101       
102        self.assert_(policy.ruleCombiningAlgId == \
103        "urn:oasis:names:tc:xacml:1.0:rule-combining-algorithm:deny-overrides")
104       
105        self.assert_(policy.description is None)
106       
107        self.assert_(len(policy.target.actions) == 0)
108       
109        self.assert_(policy.rules[0].id == \
110                     "urn:oasis:names:tc:xacml:2.0:example:ruleid:2")
111       
112        self.assert_(policy.rules[0].effect == 'Permit')
113       
114        self.assert_(policy.rules[0].description == """\
115A person may read any medical record in the
116            http://www.med.example.com/records.xsd namespace
117            for which he or she is the designated parent or guardian,
118            and for which the patient is under 16 years of age""")
119       
120        self.assert_(len(policy.rules[0].target.subjects) == 0)
121        self.assert_(len(policy.rules[0].target.actions) == 1)
122        self.assert_(len(policy.rules[0].target.resources) == 1)
123        self.assert_(len(policy.rules[0].target.environments) == 0)
124       
125        self.assert_(len(policy.rules[0].target.resources[0
126                                                    ].resourceMatches) == 2)
127       
128        self.assert_(policy.rules[0].target.resources[0].resourceMatches[0
129            ].matchId == "urn:oasis:names:tc:xacml:1.0:function:string-equal")
130       
131        self.assert_(policy.rules[0].target.resources[0].resourceMatches[0
132            ].attributeValue.dataType == \
133                                    "http://www.w3.org/2001/XMLSchema#string")
134       
135        self.assert_(policy.rules[0].target.resources[0].resourceMatches[0
136            ].attributeValue.value == 'urn:med:example:schemas:record')
137       
138        self.assert_(policy.rules[0].target.resources[0].resourceMatches[0
139            ].attributeDesignator.dataType == \
140                                    "http://www.w3.org/2001/XMLSchema#string")
141       
142        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
143            ].attributeDesignator.attributeId == \
144                            "urn:oasis:names:tc:xacml:1.0:resource:xpath")
145        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
146            ].matchId == \
147                "urn:oasis:names:tc:xacml:1.0:function:xpath-node-match")
148       
149        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
150            ].attributeValue.dataType == \
151                                    "http://www.w3.org/2001/XMLSchema#string")
152       
153        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
154            ].attributeValue.value == '/md:record')
155       
156        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
157            ].attributeDesignator.dataType == \
158                                    "http://www.w3.org/2001/XMLSchema#string")
159       
160        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
161            ].attributeDesignator.attributeId == \
162                                "urn:oasis:names:tc:xacml:1.0:resource:xpath")
163       
164        # Verify Action
165        self.assert_(len(policy.rules[0].target.actions[0
166                                                    ].actionMatches) == 1)
167       
168        self.assert_(policy.rules[0].target.actions[0].actionMatches[0
169            ].matchId == "urn:oasis:names:tc:xacml:1.0:function:string-equal")
170       
171        self.assert_(policy.rules[0].target.actions[0].actionMatches[0
172            ].attributeValue.dataType == \
173                                    "http://www.w3.org/2001/XMLSchema#string")
174       
175        self.assert_(policy.rules[0].target.actions[0].actionMatches[0
176            ].attributeValue.value == "read")
177       
178        self.assert_(policy.rules[0].target.actions[0].actionMatches[0
179            ].attributeDesignator.dataType == \
180                                    "http://www.w3.org/2001/XMLSchema#string")
181       
182        self.assert_(policy.rules[0].target.actions[0].actionMatches[0
183            ].attributeDesignator.attributeId == \
184                            "urn:oasis:names:tc:xacml:1.0:action:action-id")
185
186        self.assert_(policy.rules[0].condition)       
187        self.assert_(policy.rules[0].condition.expression.functionId == \
188                     "urn:oasis:names:tc:xacml:1.0:function:and")
189       
190        self.assert_(len(policy.rules[0].condition.expression.expressions) == 1)
191       
192        self.assert_(policy.rules[0].condition.expression.expressions[0
193            ].functionId == \
194                'urn:oasis:names:tc:xacml:1.0:function:string-equal')
195       
196        self.assert_(len(policy.rules[0].condition.expression.expressions) == 1)
197       
198        self.assert_(len(policy.rules[0].condition.expression.expressions[0
199                     ].expressions) == 2)
200       
201        self.assert_(policy.rules[0].condition.expression.expressions[0
202            ].expressions[0].functionId == \
203                "urn:oasis:names:tc:xacml:1.0:function:string-one-and-only")
204       
205        self.assert_(isinstance(
206                        policy.rules[0].condition.expression.expressions[0
207                            ].expressions[0
208                            ].expressions[0], SubjectAttributeDesignator))
209       
210        self.assert_(policy.rules[0].condition.expression.expressions[0
211                            ].expressions[0
212                            ].expressions[0].attributeId == \
213                            "urn:oasis:names:tc:xacml:2.0:example:attribute:"
214                            "parent-guardian-id")
215
216        self.assert_(policy.rules[0].condition.expression.expressions[0
217                            ].expressions[0
218                            ].expressions[0].dataType == \
219                            "http://www.w3.org/2001/XMLSchema#string")
220       
221        self.assert_(policy.rules[0].condition.expression.expressions[0
222                            ].expressions[0
223                            ].expressions[0].attributeId == \
224                            "urn:oasis:names:tc:xacml:2.0:example:attribute:"
225                            "parent-guardian-id")
226       
227        self.assert_(isinstance(policy.rules[0
228                            ].condition.expression.expressions[0
229                            ].expressions[1
230                            ].expressions[0], AttributeSelector))
231       
232        self.assert_(policy.rules[0
233                            ].condition.expression.expressions[0
234                            ].expressions[1
235                            ].expressions[0].requestContextPath == \
236                            "//md:record/md:parentGuardian/md:parentGuardianId/"
237                            "text()")
238       
239        self.assert_(policy.rules[0
240                            ].condition.expression.expressions[0
241                            ].expressions[1
242                            ].expressions[0].dataType == \
243                            "http://www.w3.org/2001/XMLSchema#string")
244
245    def test03ETreeParseRule3Policy(self):
246        PolicyReader = ReaderFactory.getReader(Policy)
247       
248        try:
249            policy = PolicyReader.parse(XACMLPolicyTestCase.XACML_TEST3_FILEPATH)
250            self.assert_(policy)
251        except NotImplementedError, e:
252            print("Expecting Obligations not implemented exception: %s" %e)
253                   
254    def test04ETreeParseRule4Policy(self):
255        PolicyReader = ReaderFactory.getReader(Policy)
256        policy = PolicyReader.parse(XACMLPolicyTestCase.XACML_TEST4_FILEPATH)
257        self.assert_(policy)
258                   
259    def test05ETreeParseNdg1Policy(self):
260        # Example policy for URI Regular expression based matching of
261        # resources for NDG
262        PolicyReader = ReaderFactory.getReader(Policy)
263        policy = PolicyReader.parse(XACMLPolicyTestCase.XACML_NDGTEST1_FILEPATH)
264        self.assert_(policy)
265       
266       
267class PDP(PDPInterface):
268    """A XACML Policy Decision Point implementation.  It supports the use of a
269    single policy but not policy sets"""
270    __slots__ = ('__policy',)
271   
272    def __init__(self, policy=None):
273        """
274        @param policy: policy object for PDP to use to apply access control
275        decisions, may be omitted.
276        @type policy: ndg.xacml.core.policy.Policy / None
277        """
278        self.__policy = None
279        if policy is not None:
280            self.policy = policy
281       
282    @classmethod
283    def fromPolicy(cls, source, reader):
284        """Create a new PDP instance with a given policy
285        @param source: source for policy
286        @type source: type (dependent on the reader set, it could be for example
287        a file path string, file object, XML element instance)
288        @param reader: the reader instance to use to read this policy
289        @type reader: ndg.xacml.parsers.AbstractReader derived type
290        """
291        if not isinstance(reader, AbstractReader):
292            raise TypeError('Expecting %r derived type for "reader" input; got '
293                            '%r instead' % (AbstractReader, type(reader)))
294           
295        pdp = cls()
296        pdp.policy = reader.parse(source)
297        return policy
298   
299    @property
300    def policy(self):
301        """policy object for PDP to use to apply access control decisions"""
302        return self.__policy
303   
304    @policy.setter
305    def policy(self, value):
306        '''policy object for PDP to use to apply access control decisions'''
307        if not isinstance(value, Policy):
308            raise TypeError('Expecting %r derived type for "policy" input; got '
309                            '%r instead' % (Policy, type(value)))
310        self.__policy = value
311                   
312    def evaluate(self, request):
313        """Make an access control decision for the given request based on the
314        policy set
315       
316        @param request: XACML request context
317        @type request: ndg.xacml.core.context.request.Request
318        @return: XACML response instance
319        @rtype: ndg.xacml.core.context.response.Response
320        """
321        response = Response
322        result = Result()
323        response.results.append(result)
324        result.decision = Decision.NOT_APPLICABLE
325       
326        if not isinstance(request, Request):
327             log.error('Expecting %r derived type for "reader" input; got '
328                       '%r instead' % Request, type(request))
329             result.decision = Decision.INDETERMINATE
330             return response
331           
332        # Exception block around all rule processing in order to set
333        # INDETERMINATE response from any exceptions raised
334        try: 
335                 
336            # Check policy target for match
337            log.debug('Checking policy target for match...')
338           
339            if not self.matchTarget(self.policy.target, request):
340                log.debug('No match for policy target setting Decision=%r',
341                          Decision.NOT_APPLICABLE_STR)
342               
343                result.decision = Decision.NOT_APPLICABLE
344                return response
345           
346            # Check rules
347            for rule in self.policy.rules:
348                log.debug('Checking policy rule %r for match...', rule.id)
349                if not self.matchTarget(rule.target, request):
350                    log.debug('No match to request context for target in rule '
351                              '%r', rule.id)
352                    continue         
353        except:
354            log.error('Exception raised evaluating request context, returning '
355                      'Decision=%r:%s', 
356                      Decision.INDETERMINATE_STR, 
357                      traceback.format_exc())
358            result.decision = Decision.INDETERMINATE
359           
360        return response
361           
362   
363    def matchTarget(self, target, request):
364        if target is None:
365            log.debug('No target set so no match with request context')
366            return False
367       
368        # From section 5.5 of the XACML 2.0 Core Spec:
369        #
370        # For the parent of the <Target> element to be applicable to the
371        # decision request, there MUST be at least one positive match between
372        # each section of the <Target> element and the corresponding section of
373        # the <xacml-context:Request> element.       
374        for i in ('subjects', 'resources', 'actions', 'environments'):
375            for targetChild in getattr(target, i):
376                for requestChild in getattr(request, i):
377                    if self.matchTargetChild(targetChild, requestChild):
378                        return True
379               
380       
381               
382        return False
383   
384    @classmethod
385    def matchTargetChild(cls, targetChild, requestChild):
386        """Match a child (Subject, Resource, Action or Environment) from the
387        request context with a given target's child
388       
389        @param targetChild: Target Subject, Resource, Action or Environment
390        object
391        @type targetChild: ndg.xacml.core.TargetChildBase
392        @param requestChild: Request Subject, Resource, Action or Environment
393        object
394        @type requestChild: ndg.xacml.core.context.RequestChildBase
395        @return: True if request context matches something in the target
396        @rtype: bool
397        @raise NotImplementedError: AttributeSelector processing is not
398        currently supported.  If an AttributeSelector is found in the policy,
399        this exception will be raised.
400        """
401        if targetChild is None:
402            # Default if target child is not set is to match all children
403            return True
404       
405        for childMatch in targetChild.matches:
406            attributeValue = childMatch.attributeValue
407           
408            # Create a match function based on the presence or absence of an
409            # AttributeDesignator or AttributeSelector
410            if childMatch.attributeDesignator is not None:
411                attributeId = childMatch.attributeDesignator.attributeId
412                dataType = childMatch.attributeDesignator.dataType
413               
414                _attributeMatch = lambda requestChildAttribute: (
415                    requestChildAttribute.attributeValue == attributeValue and
416                    requestChildAttribute.attributeId == attributeId and
417                    requestChildAttribute.dataType == dataType
418                )
419               
420            elif childMatch.attributeSelector is not None:
421                # Nb. This will require that the request provide a reference to
422                # it's XML representation and an abstraction of the XML parser
423                # for executing XPath searches into that representation
424                raise NotImplementedError('This PDP implementation does not '
425                                          'support <AttributeSelector> '
426                                          'elements')
427            else:
428                _attributeMatch = lambda requestChildAttribute: (
429                    requestChildAttribute.attributeValue == attributeValue
430                )
431               
432            for attribute in requestChild.attributes:
433                if _attributeMatch(attribute):
434                    return True
435                   
436        return False
437   
438               
439class TestContextHandler(AbstractContextHandler):
440    """Test implementation of Context Handler"""
441   
442    def __init__(self):
443        super(TestContextHandler, self).__init__()
444        self.pip = None       
445       
446    def handlePEPRequest(self, myRequest):
447       
448        # Convert myRequest to XACML context request
449        request = myRequest
450       
451        if self.pdp is None:
452            raise TypeError('No "pdp" attribute set')
453       
454        response = self.pdp.evaluate(request)
455       
456        # Convert XACML context response to domain specific request
457        myResponse = response
458       
459        return myResponse
460   
461
462class XACMLContextTestCase(unittest.TestCase):
463    """Test PDP, PAP, PIP and Context handler"""
464   
465    def test01CreateRequest(self):
466        request = Request()
467       
468        subject = Subject()
469        subjectAttribute = Attribute()
470        subject.attributes.append(subjectAttribute)
471        subjectAttribute.attributeId = \
472                            "urn:oasis:names:tc:xacml:1.0:subject:subject-id"
473        subjectAttribute.dataType = \
474                            "urn:oasis:names:tc:xacml:1.0:data-type:rfc822Name"
475        subjectAttribute.attributeValue = 'bs@simpsons.com'
476       
477        request.subjects.append(subject)
478       
479        resource = Resource()
480        resourceAttribute = Attribute()
481        resource.attributes.append(resourceAttribute)
482       
483        resourceAttribute.attributeId = \
484                            "urn:oasis:names:tc:xacml:1.0:resource:resource-id"
485                           
486        resourceAttribute.dataType = "http://www.w3.org/2001/XMLSchema#anyURI"
487        resourceAttribute.attributeValue = \
488                            'file://example/med/record/patient/BartSimpson'
489
490        request.resources.append(resource)
491       
492        request.action = Action()
493        actionAttribute = Attribute()
494        request.action.append(actionAttribute)
495       
496        requestAttribute.attributeId = \
497                                "urn:oasis:names:tc:xacml:1.0:action:action-id"
498        requestAttribute.dataType = "http://www.w3.org/2001/XMLSchema#string"
499        requestAttribute.attributeValue = 'read'
500       
501    def test02CreateResponse(self):
502        response = Response()
503        result = Result()
504        response.results.append(result)
505        result.decision = Decision.value = Decision.NOT_APPLICABLE
506       
507    def test03CreateContextHandler(self):
508
509       
510
511       
512if __name__ == "__main__":
513    unittest.main()
Note: See TracBrowser for help on using the repository browser.