source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 7335

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py@7335
Revision 7335, 12.1 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • first working e2e test with PEP calling a SAML Authorisation service configured with PIP to make callouts to an Attribute Authority to pull user attributes. This meets the ESG requirements. Next steps:
    • integrate with ndg.security.test.integration.authz_lite browser based integration tests
    • optimise by adding caching of authz decisions to PEP and possibly caching attribute assertions in the PEP.
  • Property svn:keywords set to Id
Line 
1"""NDG Security unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11
12import unittest
13import logging
14import socket
15logging.basicConfig()
16log = logging.getLogger(__name__)
17
18import os
19from os.path import expandvars, join, dirname, abspath
20
21try:
22    from hashlib import md5
23except ImportError:
24    # Allow for < Python 2.5
25    from md5 import md5
26
27
28TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
29
30mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
31
32from ndg.security.common.X509 import X500DN
33from ndg.security.test.unit.wsgi import PasteDeployAppServer
34
35try:
36    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
37                            String)
38    from sqlalchemy.ext.declarative import declarative_base
39    from sqlalchemy.orm import sessionmaker
40   
41    sqlAlchemyInstalled = True
42except ImportError:
43    sqlAlchemyInstalled = False
44   
45   
46class BaseTestCase(unittest.TestCase):
47    '''Convenience base class from which other unit tests can extend.  Its
48    sets the generic data directory path'''
49    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
50   
51    AUTHORISATION_SERVICE_PORTNUM = 9443
52    AUTHORISATION_SERVICE_URI = 'https://localhost:%s/authorisation-service' % \
53                                AUTHORISATION_SERVICE_PORTNUM
54    AUTHORISATION_SERVICE_INI_FILEPATH = mkDataDirPath(
55            os.path.join('authorisationservice', 'authorisation-service.ini'))
56                         
57    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
58    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
59   
60    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
61                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
62                                   
63    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
64                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
65                                   
66    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
67        'http://localhost:%s/AttributeAuthority/saml' % \
68                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
69                                   
70    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
71        'http://localhost:%s/AttributeAuthority/saml' % \
72                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
73                                   
74    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
75    SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \
76        'https://localhost:%d/AttributeAuthority' % \
77                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
78    SSL_CERT_DN = "/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost"
79                                   
80    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
81   
82    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
83        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
84   
85    _disableServiceStartup = lambda self: bool(os.environ.get(
86        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
87   
88    disableServiceStartup = property(fget=_disableServiceStartup,
89                                     doc="Stop automated start-up of services "
90                                         "for unit tests")
91   
92    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
93                                            TEST_CONFIG_DIR)
94   
95    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca')
96    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
97    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
98    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
99   
100    # Test database set-up
101    DB_FILENAME = 'user.db'
102    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
103    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
104   
105    USERNAME = 'pjk'
106    PASSWORD = 'testpassword'
107    MD5_PASSWORD = md5(PASSWORD).hexdigest()
108   
109    OPENID_URI_STEM = 'https://localhost:7443/openid/'
110    OPENID_IDENTIFIER = 'philip.kershaw'
111    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
112   
113    FIRSTNAME = 'Philip'
114    LASTNAME = 'Kershaw'
115    EMAILADDRESS = 'pjk@somewhere.ac.uk'
116   
117    ATTRIBUTE_NAMES = (
118        "urn:siteA:security:authz:1.0:attr",
119    )
120
121    ATTRIBUTE_VALUES = (
122        'urn:siteA:security:authz:1.0:attr:postdoc',
123        'urn:siteA:security:authz:1.0:attr:staff', 
124        'urn:siteA:security:authz:1.0:attr:undergrad', 
125        'urn:siteA:security:authz:1.0:attr:coapec',
126        'urn:siteA:security:authz:1.0:attr:rapid',
127        'urn:siteA:security:authz:1.0:attr:admin'
128    )
129    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
130   
131    VALID_REQUESTOR_IDS = (
132        X500DN.fromString("/O=Site A/CN=Authorisation Service"), 
133        X500DN.fromString("/O=Site B/CN=Authorisation Service"),
134        X500DN.fromString('/CN=test/O=NDG/OU=BADC'),
135        X500DN.fromString('/O=NDG/OU=Security/CN=localhost')
136    )
137   
138    SSL_PEM_FILENAME = 'localhost.pem'
139    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
140   
141    def __init__(self, *arg, **kw):
142        if BaseTestCase.configDirEnvVarName not in os.environ:
143            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
144               
145        unittest.TestCase.__init__(self, *arg, **kw)
146        self.services = []
147       
148    def addService(self, *arg, **kw):
149        """Utility for setting up threads to run Paste HTTP based services with
150        unit tests
151       
152        @param cfgFilePath: ini file containing configuration for the service
153        @type cfgFilePath: basestring
154        @param port: port number to run the service from
155        @type port: int
156        """
157        if self.disableServiceStartup:
158            return
159       
160        withSSL = kw.pop('withSSL', False)
161        if withSSL:
162            from OpenSSL import SSL
163           
164            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
165            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
166           
167            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
168            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
169       
170            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
171            kw['ssl_context'].use_certificate_file(certFilePath)
172           
173        try:
174            self.services.append(PasteDeployAppServer(*arg, **kw))
175            self.services[-1].startThread()
176           
177        except socket.error:
178            pass
179
180    def startAttributeAuthorities(self, withSSL=False, port=None):
181        """Serve test Attribute Authorities to test against"""
182        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
183        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
184       
185    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
186        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
187                                              'sitea', 
188                                              'site-a.ini'))
189        self.addService(cfgFilePath=siteACfgFilePath, 
190                        port=(port or 
191                              BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM),
192                        withSSL=withSSL)
193       
194    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
195        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
196                                              'siteb', 
197                                              'site-b.ini'))
198        self.addService(cfgFilePath=siteBCfgFilePath, 
199                        port=(port or 
200                              BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM),
201                        withSSL=withSSL)
202       
203    def startAuthorisationService(self, 
204                                  withSSL=True, 
205                                  port=AUTHORISATION_SERVICE_PORTNUM):
206        self.addService(
207            cfgFilePath=self.__class__.AUTHORISATION_SERVICE_INI_FILEPATH, 
208            port=port,
209            withSSL=withSSL)
210       
211    def __del__(self):
212        """Stop any services started with the addService method"""
213        if hasattr(self, 'services'):
214            for service in self.services:
215                service.terminateThread()
216 
217    @classmethod
218    def initDb(cls):
219        """Wrapper to _createDb - Create database only if it doesn't already
220        exist"""
221        if not os.path.isfile(cls.DB_FILEPATH):
222            cls._createDb()
223       
224    @classmethod 
225    def _createDb(cls):
226        """Create a test SQLite database with SQLAlchemy for use with unit
227        tests
228        """
229        log.debug("Creating database for %r ..." % cls.__name__)
230       
231        if not sqlAlchemyInstalled:
232            raise NotImplementedError("SQLAlchemy must be installed in order "
233                                      "for this method to be implemented")
234           
235        db = create_engine(cls.DB_CONNECTION_STR)
236       
237        metadata = MetaData()
238        usersTable = Table('users', metadata,
239                           Column('id', Integer, primary_key=True),
240                           Column('username', String),
241                           Column('md5password', String),
242                           Column('openid', String),
243                           Column('openid_identifier', String),
244                           Column('firstname', String),
245                           Column('lastname', String),
246                           Column('emailaddress', String))
247       
248        attributesTable = Table('attributes', metadata,
249                                Column('id', Integer, primary_key=True),
250                                Column('openid', String),
251                                Column('attributename', String))
252        metadata.create_all(db)
253       
254        class User(declarative_base()):
255            __tablename__ = 'users'
256       
257            id = Column(Integer, primary_key=True)
258            username = Column('username', String(40))
259            md5password = Column('md5password', String(64))
260            openid = Column('openid', String(128))
261            openid_identifier = Column('openid_identifier', String(40))
262            firstname = Column('firstname', String(40))
263            lastname = Column('lastname', String(40))
264            emailAddress = Column('emailaddress', String(40))
265       
266            def __init__(self, username, md5password, openid, openid_identifier, 
267                         firstname, lastname, emailaddress):
268                self.username = username
269                self.md5password = md5password
270                self.openid = openid
271                self.openid_identifier = openid_identifier
272                self.firstname = firstname
273                self.lastname = lastname
274                self.emailAddress = emailaddress
275       
276        class Attribute(declarative_base()):
277            __tablename__ = 'attributes'
278       
279            id = Column(Integer, primary_key=True)
280            openid = Column('openid', String(128))
281            attributename = Column('attributename', String(40))
282       
283            def __init__(self, openid, attributename):
284                self.openid = openid
285                self.attributename = attributename
286
287        Session = sessionmaker(bind=db)
288        session = Session()
289       
290        attributes = [Attribute(cls.OPENID_URI, attrVal)
291                      for attrVal in cls.ATTRIBUTE_VALUES]
292        session.add_all(attributes)
293           
294        user = User(cls.USERNAME, 
295                    cls.MD5_PASSWORD,
296                    cls.OPENID_URI,
297                    cls.OPENID_IDENTIFIER,
298                    cls.FIRSTNAME,
299                    cls.LASTNAME,
300                    cls.EMAILADDRESS)
301       
302        session.add(user)
303        session.commit() 
304
305
306def _getParentDir(depth=0, path=dirname(__file__)):
307    """
308    @type path: basestring
309    @param path: directory path from which to get parent directory, defaults
310    to dir of this module
311    @rtype: basestring
312    @return: parent directory at depth levels up from the current path
313    """
314    for i in range(depth):
315        path = dirname(path)
316    return path
317
318
Note: See TracBrowser for help on using the repository browser.