source: TI12-security/trunk/NDG_XACML/ndg/xacml/test/test_xacml.py @ 6774

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDG_XACML/ndg/xacml/test/test_xacml.py@6774
Revision 6774, 19.6 KB checked in by pjkersha, 11 years ago (diff)

Work on test PDP currently in unittests but will be moved into context package.

Line 
1"""NDG XACML unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "16/03/10"
7__copyright__ = "(C) 2010 Science and Technology Facilities Council"
8__contact__ = "Philip.Kershaw@stfc.ac.uk"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = "$Id: $"
12import unittest
13from os import path
14import logging
15logging.basicConfig(level=logging.DEBUG)
16
17from ndg.xacml.core.policy import Policy
18from ndg.xacml.core.attributedesignator import SubjectAttributeDesignator
19from ndg.xacml.core.attributeselector import AttributeSelector
20from ndg.xacml.parsers.etree.factory import ReaderFactory
21
22from ndg.xacml.core.attribute import Attribute
23from ndg.xacml.core.context.request import Request
24from ndg.xacml.core.context.response import Response
25from ndg.xacml.core.context.result import Result, Decision
26from ndg.xacml.core.context.subject import Subject
27from ndg.xacml.core.context.resource import Resource
28from ndg.xacml.core.context.action import Action
29
30THIS_DIR = path.dirname(__file__)
31
32
33class XACMLPolicyTestCase(unittest.TestCase):
34    XACML_TEST1_FILENAME = "rule1.xml"
35    XACML_TEST1_FILEPATH = path.join(THIS_DIR, XACML_TEST1_FILENAME)
36    XACML_TEST2_FILENAME = "rule2.xml"
37    XACML_TEST2_FILEPATH = path.join(THIS_DIR, XACML_TEST2_FILENAME)
38    XACML_TEST3_FILENAME = "rule3.xml"
39    XACML_TEST3_FILEPATH = path.join(THIS_DIR, XACML_TEST3_FILENAME)
40    XACML_TEST4_FILENAME = "rule4.xml"
41    XACML_TEST4_FILEPATH = path.join(THIS_DIR, XACML_TEST4_FILENAME)
42    XACML_NDGTEST1_FILENAME = "ndg1.xml"
43    XACML_NDGTEST1_FILEPATH = path.join(THIS_DIR, XACML_NDGTEST1_FILENAME)
44   
45    def test01ETreeParseRule1Policy(self):
46        PolicyReader = ReaderFactory.getReader(Policy)
47        policy = PolicyReader.parse(XACMLPolicyTestCase.XACML_TEST1_FILEPATH)
48        self.assert_(policy)
49       
50        self.assert_(
51            policy.policyId == "urn:oasis:names:tc:example:SimplePolicy1")
52       
53        self.assert_(policy.ruleCombiningAlgId == \
54        "urn:oasis:names:tc:xacml:1.0:rule-combining-algorithm:deny-overrides")
55       
56        self.assert_(
57            "Med Example Corp access control policy" in policy.description)
58       
59        self.assert_(len(policy.target.subjects) == 0)
60       
61        self.assert_(policy.rules[0].id == \
62                     "urn:oasis:names:tc:xacml:2.0:example:SimpleRule1")
63       
64        self.assert_(policy.rules[0].effect == 'Permit')
65       
66        self.assert_(
67            'Any subject with an e-mail name in the med.example.com domain' in \
68            policy.rules[0].description)
69       
70        self.assert_(len(policy.rules[0].target.subjects) == 1)
71        self.assert_(len(policy.rules[0].target.actions) == 0)
72        self.assert_(len(policy.rules[0].target.resources) == 0)
73        self.assert_(len(policy.rules[0].target.environments) == 0)
74       
75        self.assert_(len(policy.rules[0].target.subjects[0].subjectMatches) == 1)
76       
77        self.assert_(policy.rules[0].target.subjects[0].subjectMatches[0
78            ].matchId == \
79            "urn:oasis:names:tc:xacml:1.0:function:rfc822Name-match")
80       
81        self.assert_(policy.rules[0].target.subjects[0].subjectMatches[0
82            ].attributeValue.dataType == \
83            "urn:oasis:names:tc:xacml:1.0:data-type:rfc822Name")
84       
85        self.assert_(policy.rules[0].target.subjects[0].subjectMatches[0
86            ].attributeDesignator.dataType == \
87            "urn:oasis:names:tc:xacml:1.0:data-type:rfc822Name")
88       
89        # Attribute ID
90        self.assert_(policy.rules[0].target.subjects[0].subjectMatches[0
91            ].attributeDesignator.attributeId == \
92            "urn:oasis:names:tc:xacml:1.0:subject:subject-id")
93         
94    def test02ETreeParseRule2Policy(self):
95        PolicyReader = ReaderFactory.getReader(Policy)
96        policy = PolicyReader.parse(XACMLPolicyTestCase.XACML_TEST2_FILEPATH)
97        self.assert_(policy)
98       
99        self.assert_(
100        policy.policyId == "urn:oasis:names:tc:xacml:2.0:example:policyid:2")
101       
102        self.assert_(policy.ruleCombiningAlgId == \
103        "urn:oasis:names:tc:xacml:1.0:rule-combining-algorithm:deny-overrides")
104       
105        self.assert_(policy.description is None)
106       
107        self.assert_(len(policy.target.actions) == 0)
108       
109        self.assert_(policy.rules[0].id == \
110                     "urn:oasis:names:tc:xacml:2.0:example:ruleid:2")
111       
112        self.assert_(policy.rules[0].effect == 'Permit')
113       
114        self.assert_(policy.rules[0].description == """\
115A person may read any medical record in the
116            http://www.med.example.com/records.xsd namespace
117            for which he or she is the designated parent or guardian,
118            and for which the patient is under 16 years of age""")
119       
120        self.assert_(len(policy.rules[0].target.subjects) == 0)
121        self.assert_(len(policy.rules[0].target.actions) == 1)
122        self.assert_(len(policy.rules[0].target.resources) == 1)
123        self.assert_(len(policy.rules[0].target.environments) == 0)
124       
125        self.assert_(len(policy.rules[0].target.resources[0
126                                                    ].resourceMatches) == 2)
127       
128        self.assert_(policy.rules[0].target.resources[0].resourceMatches[0
129            ].matchId == "urn:oasis:names:tc:xacml:1.0:function:string-equal")
130       
131        self.assert_(policy.rules[0].target.resources[0].resourceMatches[0
132            ].attributeValue.dataType == \
133                                    "http://www.w3.org/2001/XMLSchema#string")
134       
135        self.assert_(policy.rules[0].target.resources[0].resourceMatches[0
136            ].attributeValue.value == 'urn:med:example:schemas:record')
137       
138        self.assert_(policy.rules[0].target.resources[0].resourceMatches[0
139            ].attributeDesignator.dataType == \
140                                    "http://www.w3.org/2001/XMLSchema#string")
141       
142        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
143            ].attributeDesignator.attributeId == \
144                            "urn:oasis:names:tc:xacml:1.0:resource:xpath")
145        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
146            ].matchId == \
147                "urn:oasis:names:tc:xacml:1.0:function:xpath-node-match")
148       
149        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
150            ].attributeValue.dataType == \
151                                    "http://www.w3.org/2001/XMLSchema#string")
152       
153        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
154            ].attributeValue.value == '/md:record')
155       
156        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
157            ].attributeDesignator.dataType == \
158                                    "http://www.w3.org/2001/XMLSchema#string")
159       
160        self.assert_(policy.rules[0].target.resources[0].resourceMatches[1
161            ].attributeDesignator.attributeId == \
162                                "urn:oasis:names:tc:xacml:1.0:resource:xpath")
163       
164        # Verify Action
165        self.assert_(len(policy.rules[0].target.actions[0
166                                                    ].actionMatches) == 1)
167       
168        self.assert_(policy.rules[0].target.actions[0].actionMatches[0
169            ].matchId == "urn:oasis:names:tc:xacml:1.0:function:string-equal")
170       
171        self.assert_(policy.rules[0].target.actions[0].actionMatches[0
172            ].attributeValue.dataType == \
173                                    "http://www.w3.org/2001/XMLSchema#string")
174       
175        self.assert_(policy.rules[0].target.actions[0].actionMatches[0
176            ].attributeValue.value == "read")
177       
178        self.assert_(policy.rules[0].target.actions[0].actionMatches[0
179            ].attributeDesignator.dataType == \
180                                    "http://www.w3.org/2001/XMLSchema#string")
181       
182        self.assert_(policy.rules[0].target.actions[0].actionMatches[0
183            ].attributeDesignator.attributeId == \
184                            "urn:oasis:names:tc:xacml:1.0:action:action-id")
185
186        self.assert_(policy.rules[0].condition)       
187        self.assert_(policy.rules[0].condition.expression.functionId == \
188                     "urn:oasis:names:tc:xacml:1.0:function:and")
189       
190        self.assert_(len(policy.rules[0].condition.expression.expressions) == 1)
191       
192        self.assert_(policy.rules[0].condition.expression.expressions[0
193            ].functionId == \
194                'urn:oasis:names:tc:xacml:1.0:function:string-equal')
195       
196        self.assert_(len(policy.rules[0].condition.expression.expressions) == 1)
197       
198        self.assert_(len(policy.rules[0].condition.expression.expressions[0
199                     ].expressions) == 2)
200       
201        self.assert_(policy.rules[0].condition.expression.expressions[0
202            ].expressions[0].functionId == \
203                "urn:oasis:names:tc:xacml:1.0:function:string-one-and-only")
204       
205        self.assert_(isinstance(
206                        policy.rules[0].condition.expression.expressions[0
207                            ].expressions[0
208                            ].expressions[0], SubjectAttributeDesignator))
209       
210        self.assert_(policy.rules[0].condition.expression.expressions[0
211                            ].expressions[0
212                            ].expressions[0].attributeId == \
213                            "urn:oasis:names:tc:xacml:2.0:example:attribute:"
214                            "parent-guardian-id")
215
216        self.assert_(policy.rules[0].condition.expression.expressions[0
217                            ].expressions[0
218                            ].expressions[0].dataType == \
219                            "http://www.w3.org/2001/XMLSchema#string")
220       
221        self.assert_(policy.rules[0].condition.expression.expressions[0
222                            ].expressions[0
223                            ].expressions[0].attributeId == \
224                            "urn:oasis:names:tc:xacml:2.0:example:attribute:"
225                            "parent-guardian-id")
226       
227        self.assert_(isinstance(policy.rules[0
228                            ].condition.expression.expressions[0
229                            ].expressions[1
230                            ].expressions[0], AttributeSelector))
231       
232        self.assert_(policy.rules[0
233                            ].condition.expression.expressions[0
234                            ].expressions[1
235                            ].expressions[0].requestContextPath == \
236                            "//md:record/md:parentGuardian/md:parentGuardianId/"
237                            "text()")
238       
239        self.assert_(policy.rules[0
240                            ].condition.expression.expressions[0
241                            ].expressions[1
242                            ].expressions[0].dataType == \
243                            "http://www.w3.org/2001/XMLSchema#string")
244
245    def test03ETreeParseRule3Policy(self):
246        PolicyReader = ReaderFactory.getReader(Policy)
247       
248        try:
249            policy = PolicyReader.parse(XACMLPolicyTestCase.XACML_TEST3_FILEPATH)
250            self.assert_(policy)
251        except NotImplementedError, e:
252            print("Expecting Obligations not implemented exception: %s" %e)
253                   
254    def test04ETreeParseRule4Policy(self):
255        PolicyReader = ReaderFactory.getReader(Policy)
256        policy = PolicyReader.parse(XACMLPolicyTestCase.XACML_TEST4_FILEPATH)
257        self.assert_(policy)
258                   
259    def test05ETreeParseNdg1Policy(self):
260        # Example policy for URI Regular expression based matching of
261        # resources for NDG
262        PolicyReader = ReaderFactory.getReader(Policy)
263        policy = PolicyReader.parse(XACMLPolicyTestCase.XACML_NDGTEST1_FILEPATH)
264        self.assert_(policy)
265       
266       
267class MyPDP(PDPInterface):
268   
269    def __init__(self):
270        self.policy = None
271       
272    @classmethod
273    def fromPolicy(cls, source):
274        pdp = cls()
275        self.policy = ReaderFactory.getReader(Policy).parse(source)
276       
277    def evaluate(self, request):
278        """Make an access control decision for the given request based on the
279        policy set
280       
281        @param request: XACML request context
282        @type request: ndg.xacml.core.context.request.Request
283        @return: XACML response instance
284        @rtype: ndg.xacml.core.context.response.Response
285        """
286        response = Response
287        result = Result()
288        response.results.append(result)
289        result.decision = Decision.NOT_APPLICABLE
290           
291        # Exception block around all rule processing in order to set
292        # INDETERMINATE response from any exceptions raised
293        try: 
294            # Check policy target for match
295            log.debug('Checking policy target for match...')
296           
297            if not self.matchTarget(self.policy.target, request):
298                log.debug('No match for policy target setting Decision=%r',
299                          Decision.NOT_APPLICABLE_STR)
300               
301                result.decision = Decision.NOT_APPLICABLE
302                return response
303           
304            # Check rules
305            for rule in self.policy.rules:
306                log.debug('Checking policy rule %r for match...', rule.id)
307                if not self.matchTarget(rule.target, request):
308                    log.debug('No match to request context for target in rule '
309                              '%r', rule.id)
310                    continue         
311        except:
312            log.error('Exception raised evaluating request context, returning '
313                      'Decision=%r:%s', 
314                      Decision.INDETERMINATE_STR, 
315                      traceback.format_exc())
316            result.decision = Decision.INDETERMINATE
317           
318        return response
319           
320   
321    def matchTarget(self, target, request):
322        if target is None:
323            log.debug('No target set so no match with request context')
324            return False
325       
326        # From section 5.5 of the XACML 2.0 Core Spec:
327        #
328        # For the parent of the <Target> element to be applicable to the
329        # decision request, there MUST be at least one positive match between
330        # each section of the <Target> element and the corresponding section of
331        # the <xacml-context:Request> element.       
332        for i in ('subjects', 'resources', 'actions', 'environments'):
333            for targetChild in getattr(target, i):
334                for requestChild in getattr(request, i):
335                    if self.matchTargetChild(targetChild, requestChild):
336                        return True
337               
338       
339               
340        return False
341   
342    @classmethod
343    def matchTargetChild(cls, targetChild, requestChild):
344        """Match a child (Subject, Resource, Action or Environment) from the
345        request context with a given target's child
346       
347        @param targetChild: Target Subject, Resource, Action or Environment
348        object
349        @type targetChild: ndg.xacml.core.TargetChildBase
350        @param requestChild: Request Subject, Resource, Action or Environment
351        object
352        @type requestChild: ndg.xacml.core.context.RequestChildBase
353        @return: True if request context matches something in the target
354        @rtype: bool
355        @raise NotImplementedError: AttributeSelector processing is not
356        currently supported.  If an AttributeSelector is found in the policy,
357        this exception will be raised.
358        """
359        if targetChild is None:
360            # Default if target child is not set is to match all children
361            return True
362       
363        for childMatch in targetChild.matches:
364            attributeValue = childMatch.attributeValue
365           
366            # Create a match function based on the presence or absence of an
367            # AttributeDesignator or AttributeSelector
368            if childMatch.attributeDesignator is not None:
369                attributeId = childMatch.attributeDesignator.attributeId
370                dataType = childMatch.attributeDesignator.dataType
371               
372                _attributeMatch = lambda requestChildAttribute: (
373                    requestChildAttribute.attributeValue == attributeValue and
374                    requestChildAttribute.attributeId == attributeId and
375                    requestChildAttribute.dataType == dataType
376                )
377               
378            elif childMatch.attributeSelector is not None:
379                # Nb. This will require that the request provide a reference to
380                # it's XML representation and an abstraction of the XML parser
381                # for executing XPath searches into that representation
382                raise NotImplementedError('This PDP implementation does not '
383                                          'support <AttributeSelector> '
384                                          'elements')
385            else:
386                _attributeMatch = lambda requestChildAttribute: (
387                    requestChildAttribute.attributeValue == attributeValue
388                )
389               
390            for attribute in requestChild.attributes:
391                if _attributeMatch(attribute):
392                    return True
393                   
394        return False
395   
396               
397class TestContextHandler(AbstractContextHandler):
398    """Test implementation of Context Handler"""
399   
400    def __init__(self):
401        super(TestContextHandler, self).__init__()
402        self.pip = None       
403       
404    def handlePEPRequest(self, myRequest):
405       
406        # Convert myRequest to XACML context request
407        request = myRequest
408       
409        if self.pdp is None:
410            raise TypeError('No "pdp" attribute set')
411       
412        response = self.pdp.evaluate(request)
413       
414        # Convert XACML context response to domain specific request
415        myResponse = response
416       
417        return myResponse
418   
419
420class XACMLContextTestCase(unittest.TestCase):
421    """Test PDP, PAP, PIP and Context handler"""
422   
423    def test01CreateRequest(self):
424        request = Request()
425       
426        subject = Subject()
427        subjectAttribute = Attribute()
428        subject.attributes.append(subjectAttribute)
429        subjectAttribute.attributeId = \
430                            "urn:oasis:names:tc:xacml:1.0:subject:subject-id"
431        subjectAttribute.dataType = \
432                            "urn:oasis:names:tc:xacml:1.0:data-type:rfc822Name"
433        subjectAttribute.attributeValue = 'bs@simpsons.com'
434       
435        request.subjects.append(subject)
436       
437        resource = Resource()
438        resourceAttribute = Attribute()
439        resource.attributes.append(resourceAttribute)
440       
441        resourceAttribute.attributeId = \
442                            "urn:oasis:names:tc:xacml:1.0:resource:resource-id"
443                           
444        resourceAttribute.dataType = "http://www.w3.org/2001/XMLSchema#anyURI"
445        resourceAttribute.attributeValue = \
446                            'file://example/med/record/patient/BartSimpson'
447
448        request.resources.append(resource)
449       
450        request.action = Action()
451        actionAttribute = Attribute()
452        request.action.append(actionAttribute)
453       
454        requestAttribute.attributeId = \
455                                "urn:oasis:names:tc:xacml:1.0:action:action-id"
456        requestAttribute.dataType = "http://www.w3.org/2001/XMLSchema#string"
457        requestAttribute.attributeValue = 'read'
458       
459    def test02CreateResponse(self):
460        response = Response()
461        result = Result()
462        response.results.append(result)
463        result.decision = Decision.value = Decision.NOT_APPLICABLE
464       
465    def test03CreateContextHandler(self):
466
467       
468
469       
470if __name__ == "__main__":
471    unittest.main()
Note: See TracBrowser for help on using the repository browser.