source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/authz/xacml/test_saml_pip.py @ 7358

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/authz/xacml/test_saml_pip.py@7358
Revision 7358, 6.4 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • added caching capability to Policy Information Point. This enables the PIP to retrieve previously cached assertions from an Attribute Authority optimising performance. Caching is done with beaker.session but instead of indexing based on a cookie, it's based on the subject Id i.e. for ESG, a user's OpenID.
Line 
1"""Unit tests for XACML Policy Information Point with SAML interface to
2Attribute Authority
3
4"""
5__author__ = "P J Kershaw"
6__date__ = "11/08/10"
7__copyright__ = "(C) 2010 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id:$'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13log = logging.getLogger(__name__)
14
15from os import path
16import unittest
17
18from urllib2 import URLError
19
20from ndg.xacml.core.attributedesignator import SubjectAttributeDesignator
21from ndg.xacml.core.attribute import Attribute
22from ndg.xacml.core.attributevalue import AttributeValueClassFactory
23from ndg.xacml.core.context.request import Request
24from ndg.xacml.core.context.subject import Subject
25
26from ndg.saml.saml2.core import Issuer as SamlIssuer
27
28from ndg.security.test.unit import BaseTestCase
29from ndg.security.server.xacml.pip.saml_pip import PIP
30
31
32class SamlPipTestCase(BaseTestCase):
33    """Test XACML Policy Information Point.  This PIP has a SAML interface to
34    query a remote attribute authority for attributes
35    """
36    THIS_DIR = path.abspath(path.dirname(__file__))
37    MAPPING_FILENAME = "pip-mapping.txt"
38    MAPPING_FILEPATH = path.join(THIS_DIR, MAPPING_FILENAME)
39    CONFIG_FILENAME = 'saml_pip.cfg'
40    CONFIG_FILEPATH = path.join(THIS_DIR, CONFIG_FILENAME)
41   
42    NDGS_ATTR_ID = BaseTestCase.ATTRIBUTE_NAMES[0]
43    OPENID_ATTR_ID = 'urn:esg:openid'
44   
45    CLNT_CERT_FILEPATH = path.join(BaseTestCase.PKI_DIR, 'localhost.crt')
46    CLNT_PRIKEY_FILEPATH = path.join(BaseTestCase.PKI_DIR, 'localhost.key')
47                                   
48    attributeValueClassFactory = AttributeValueClassFactory()
49           
50    def test01CreateAndCheckAttributes(self):
51        pip = PIP()
52        self.assert_(pip)
53        self.assert_(pip.mappingFilePath is None)
54        try:
55            pip.attribute2AttributeAuthorityMap = {}
56            self.fail("pip.attribute2AttributeAuthorityMap should be read-only")
57        except AttributeError:
58            pass
59       
60        setattr(pip, 'sessionCacheDataDir', 'My data dir')
61
62    def test02ReadMappingFile(self):
63        pip = PIP()
64        pip.mappingFilePath = self.__class__.MAPPING_FILEPATH
65        pip.readMappingFile()
66        self.assert_(len(pip.attribute2AttributeAuthorityMap.keys()) > 0)
67        self.assert_(self.__class__.NDGS_ATTR_ID in
68                     pip.attribute2AttributeAuthorityMap)
69        print(pip.attribute2AttributeAuthorityMap)
70       
71    def _createXacmlRequestCtx(self):
72        ctx = Request()
73       
74        ctx.subjects.append(Subject())
75        openidAttr = Attribute()
76        ctx.subjects[-1].attributes.append(openidAttr)
77        openidAttr.attributeId = self.__class__.OPENID_ATTR_ID
78        openidAttr.dataType = 'http://www.w3.org/2001/XMLSchema#anyURI'
79       
80        anyUriAttrValue = self.__class__.attributeValueClassFactory(
81                                                            openidAttr.dataType)
82       
83        openidAttrVal = anyUriAttrValue(self.__class__.OPENID_URI)
84        openidAttr.attributeValues.append(openidAttrVal) 
85       
86        return ctx
87   
88    def _createPIP(self):   
89        """Create PIP from test attribute settings"""             
90        pip = PIP()
91        pip.mappingFilePath = self.__class__.MAPPING_FILEPATH
92        pip.readMappingFile()
93        pip.subjectAttributeId = self.__class__.OPENID_ATTR_ID
94       
95        pip.attributeQueryBinding.issuerName = \
96            'O=NDG, OU=Security, CN=localhost'
97        pip.attributeQueryBinding.issuerFormat = SamlIssuer.X509_SUBJECT
98        pip.attributeQueryBinding.sslCertFilePath = \
99            self.__class__.CLNT_CERT_FILEPATH
100        pip.attributeQueryBinding.sslPriKeyFilePath = \
101            self.__class__.CLNT_PRIKEY_FILEPATH
102           
103        pip.attributeQueryBinding.sslCACertDir = self.__class__.CACERT_DIR
104       
105        return pip
106
107    def _createSubjectAttributeDesignator(self):
108        '''Make attribute designator - in practice this would be passed back
109        from the PDP via the context handler
110        '''
111        designator = SubjectAttributeDesignator()
112        designator.attributeId = self.__class__.NDGS_ATTR_ID
113        designator.dataType = 'http://www.w3.org/2001/XMLSchema#string'
114       
115        stringAttrValue = self.__class__.attributeValueClassFactory(
116                                    'http://www.w3.org/2001/XMLSchema#string')
117       
118        return designator
119   
120    def _initQuery(self):
121        '''Convenience method to set-up the parameters needed for a query'''
122        pip = self._createPIP()
123        designator = self._createSubjectAttributeDesignator()
124        ctx = self._createXacmlRequestCtx()
125        return pip, designator, ctx
126   
127    def test03Query(self):
128        self.startSiteAAttributeAuthority(withSSL=True, 
129                    port=self.__class__.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
130       
131        pip, designator, ctx = self._initQuery()
132       
133        # Avoid caching to avoid impacting other tests in this class
134        pip.cacheSessions = False
135       
136        attributeValues = pip.attributeQuery(ctx, designator)
137        self.assert_(len(attributeValues) > 0)
138        print("PIP retrieved attribute values %r" % attributeValues)
139       
140        self.stopAllServices()
141       
142    def test04InitFromConfigFile(self):
143        # Initialise from settings in a config file
144        pip = PIP.fromConfig(self.__class__.CONFIG_FILEPATH)
145        self.assert_(pip.mappingFilePath)
146       
147    def test05SessionCaching(self):
148        self.startSiteAAttributeAuthority(withSSL=True, 
149                    port=self.__class__.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
150       
151        pipA, designator, ctx = self._initQuery()
152        attributeValuesA = pipA.attributeQuery(ctx, designator)
153       
154        pipB = self._createPIP()
155        pipB.cacheSessions = False
156       
157        attributeValuesB = pipB.attributeQuery(ctx, designator)
158       
159        self.stopAllServices()
160       
161        attributeValuesA2 = pipA.attributeQuery(ctx, designator)
162        self.assert_(len(attributeValuesA2) > 0)
163       
164        try:
165            attributeValuesB2 = pipB.attributeQuery(ctx, designator)
166            self.fail("Expected URLError exception for call with no-caching set")
167        except URLError, e:
168            print("Pass: expected %r error for call with no-caching set" % e)
169       
170       
171       
172if __name__ == "__main__":
173    unittest.main()
Note: See TracBrowser for help on using the repository browser.