source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 7077

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py@7077
Revision 7077, 11.3 KB checked in by pjkersha, 9 years ago (diff)
  • Property svn:keywords set to Id
RevLine 
[5290]1"""NDG Security unit test package
2
[6276]3NERC DataGrid Project
[5290]4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
[7077]10__revision__ = '$Id$'
[5290]11
12import unittest
13import logging
[5774]14import socket
[5290]15logging.basicConfig()
[6067]16log = logging.getLogger(__name__)
[5290]17
18import os
19from os.path import expandvars, join, dirname, abspath
20
[6009]21try:
22    from hashlib import md5
23except ImportError:
24    # Allow for < Python 2.5
25    from md5 import md5
26
27
[5499]28TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
[5290]29
[5774]30mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
31
[6067]32from ndg.security.common.X509 import X500DN
[5774]33from ndg.security.test.unit.wsgi import PasteDeployAppServer
34
[5971]35try:
36    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
37                            String)
38    from sqlalchemy.ext.declarative import declarative_base
39    from sqlalchemy.orm import sessionmaker
40   
41    sqlAlchemyInstalled = True
42except ImportError:
43    sqlAlchemyInstalled = False
44   
[6033]45   
[5290]46class BaseTestCase(unittest.TestCase):
47    '''Convenience base class from which other unit tests can extend.  Its
48    sets the generic data directory path'''
49    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
[6033]50   
[5779]51    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
52    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
[6033]53   
54    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
55                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
56                                   
57    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
58                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
59                                   
60    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
61        'http://localhost:%s/AttributeAuthority/saml' % \
62                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
63                                   
64    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
65        'http://localhost:%s/AttributeAuthority/saml' % \
66                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
[6039]67                                   
[6052]68    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
69    SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \
[6788]70        'https://localhost:%d/AttributeAuthority' % \
[6052]71                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
72    SSL_CERT_DN = "/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost"
73                                   
[6039]74    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
[6033]75   
[5779]76    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
77        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
78   
79    _disableServiceStartup = lambda self: bool(os.environ.get(
80        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
81   
82    disableServiceStartup = property(fget=_disableServiceStartup,
83                                     doc="Stop automated start-up of services "
84                                         "for unit tests")
85   
[6009]86    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
87                                            TEST_CONFIG_DIR)
88   
[6052]89    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca')
90    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
91    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
92    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
93   
[5971]94    # Test database set-up
95    DB_FILENAME = 'user.db'
[6009]96    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
97    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
98   
[5971]99    USERNAME = 'pjk'
[6009]100    PASSWORD = 'testpassword'
101    MD5_PASSWORD = md5(PASSWORD).hexdigest()
102   
[6067]103    OPENID_URI_STEM = 'https://localhost:7443/openid/'
[5971]104    OPENID_IDENTIFIER = 'philip.kershaw'
[6009]105    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
106   
[5971]107    FIRSTNAME = 'Philip'
108    LASTNAME = 'Kershaw'
109    EMAILADDRESS = 'pjk@somewhere.ac.uk'
[5977]110   
111    ATTRIBUTE_NAMES = (
112        "urn:siteA:security:authz:1.0:attr",
113    )
114
[5971]115    ATTRIBUTE_VALUES = (
116        'urn:siteA:security:authz:1.0:attr:postdoc',
117        'urn:siteA:security:authz:1.0:attr:staff', 
118        'urn:siteA:security:authz:1.0:attr:undergrad', 
119        'urn:siteA:security:authz:1.0:attr:coapec',
120        'urn:siteA:security:authz:1.0:attr:rapid'
121    )
[5982]122    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
[5971]123   
[6067]124    VALID_REQUESTOR_IDS = (
125        X500DN.fromString("/O=Site A/CN=Authorisation Service"), 
126        X500DN.fromString("/O=Site B/CN=Authorisation Service"),
127        X500DN.fromString('/CN=test/O=NDG/OU=BADC')
128    )
[6039]129   
[6040]130    SSL_PEM_FILENAME = 'localhost.pem'
131    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
132   
[5499]133    def __init__(self, *arg, **kw):
[5290]134        if BaseTestCase.configDirEnvVarName not in os.environ:
[5499]135            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
136               
137        unittest.TestCase.__init__(self, *arg, **kw)
[5774]138        self.services = []
139       
[5779]140    def addService(self, *arg, **kw):
[5774]141        """Utility for setting up threads to run Paste HTTP based services with
142        unit tests
143       
144        @param cfgFilePath: ini file containing configuration for the service
145        @type cfgFilePath: basestring
146        @param port: port number to run the service from
147        @type port: int
148        """
[5779]149        if self.disableServiceStartup:
150            return
151       
[6040]152        withSSL = kw.pop('withSSL', False)
153        if withSSL:
154            from OpenSSL import SSL
155           
156            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
157            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
158           
159            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
160            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
161       
162            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
163            kw['ssl_context'].use_certificate_file(certFilePath)
164           
[5774]165        try:
[5779]166            self.services.append(PasteDeployAppServer(*arg, **kw))
[5774]167            self.services[-1].startThread()
168           
169        except socket.error:
170            pass
[5290]171
[6040]172    def startAttributeAuthorities(self, withSSL=False, port=None):
[5774]173        """Serve test Attribute Authorities to test against"""
[6040]174        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
175        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
[5907]176       
[6040]177    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
[5774]178        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
179                                              'sitea', 
180                                              'site-a.ini'))
[5779]181        self.addService(cfgFilePath=siteACfgFilePath, 
[6721]182                        port=(port or 
183                              BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM),
[6040]184                        withSSL=withSSL)
[5774]185       
[6040]186    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
[5774]187        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
188                                              'siteb', 
189                                              'site-b.ini'))
[5779]190        self.addService(cfgFilePath=siteBCfgFilePath, 
[6721]191                        port=(port or 
192                              BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM),
193                        withSSL=withSSL)
[5779]194
[5774]195    def __del__(self):
196        """Stop any services started with the addService method"""
197        if hasattr(self, 'services'):
198            for service in self.services:
199                service.terminateThread()
[6009]200 
[6067]201    @classmethod
202    def initDb(cls):
[6009]203        """Wrapper to _createDb - Create database only if it doesn't already
204        exist"""
[6067]205        if not os.path.isfile(cls.DB_FILEPATH):
206            cls._createDb()
207       
208    @classmethod 
209    def _createDb(cls):
[5971]210        """Create a test SQLite database with SQLAlchemy for use with unit
211        tests
212        """
[6067]213        log.debug("Creating database for %r ..." % cls.__name__)
214       
[5971]215        if not sqlAlchemyInstalled:
216            raise NotImplementedError("SQLAlchemy must be installed in order "
217                                      "for this method to be implemented")
218           
[6067]219        db = create_engine(cls.DB_CONNECTION_STR)
[5971]220       
221        metadata = MetaData()
222        usersTable = Table('users', metadata,
223                           Column('id', Integer, primary_key=True),
224                           Column('username', String),
[6009]225                           Column('md5password', String),
[6051]226                           Column('openid', String),
[5971]227                           Column('openid_identifier', String),
228                           Column('firstname', String),
229                           Column('lastname', String),
230                           Column('emailaddress', String))
231       
232        attributesTable = Table('attributes', metadata,
233                                Column('id', Integer, primary_key=True),
[6067]234                                Column('openid', String),
[5971]235                                Column('attributename', String))
236        metadata.create_all(db)
237       
238        class User(declarative_base()):
239            __tablename__ = 'users'
240       
241            id = Column(Integer, primary_key=True)
242            username = Column('username', String(40))
[6051]243            md5password = Column('md5password', String(64))
244            openid = Column('openid', String(128))
[5971]245            openid_identifier = Column('openid_identifier', String(40))
246            firstname = Column('firstname', String(40))
247            lastname = Column('lastname', String(40))
248            emailAddress = Column('emailaddress', String(40))
249       
[6051]250            def __init__(self, username, md5password, openid, openid_identifier, 
[6009]251                         firstname, lastname, emailaddress):
[5971]252                self.username = username
[6009]253                self.md5password = md5password
[6051]254                self.openid = openid
[5971]255                self.openid_identifier = openid_identifier
256                self.firstname = firstname
257                self.lastname = lastname
258                self.emailAddress = emailaddress
259       
260        class Attribute(declarative_base()):
261            __tablename__ = 'attributes'
262       
263            id = Column(Integer, primary_key=True)
[6067]264            openid = Column('openid', String(128))
[5971]265            attributename = Column('attributename', String(40))
266       
[6067]267            def __init__(self, openid, attributename):
268                self.openid = openid
[5971]269                self.attributename = attributename
[6067]270
[5971]271        Session = sessionmaker(bind=db)
272        session = Session()
273       
[6067]274        attributes = [Attribute(cls.OPENID_URI, attrVal)
275                      for attrVal in cls.ATTRIBUTE_VALUES]
[5982]276        session.add_all(attributes)
[6067]277           
278        user = User(cls.USERNAME, 
279                    cls.MD5_PASSWORD,
280                    cls.OPENID_URI,
281                    cls.OPENID_IDENTIFIER,
282                    cls.FIRSTNAME,
283                    cls.LASTNAME,
284                    cls.EMAILADDRESS)
[5971]285       
286        session.add(user)
287        session.commit() 
288
289
[5291]290def _getParentDir(depth=0, path=dirname(__file__)):
291    """
292    @type path: basestring
[5343]293    @param path: directory path from which to get parent directory, defaults
[5291]294    to dir of this module
295    @rtype: basestring
296    @return: parent directory at depth levels up from the current path
297    """
298    for i in range(depth):
299        path = dirname(path)
300    return path
301
302
Note: See TracBrowser for help on using the repository browser.