source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 7077

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py@7077
Revision 7077, 11.3 KB checked in by pjkersha, 9 years ago (diff)
  • Property svn:keywords set to Id
Line 
1"""NDG Security unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11
12import unittest
13import logging
14import socket
15logging.basicConfig()
16log = logging.getLogger(__name__)
17
18import os
19from os.path import expandvars, join, dirname, abspath
20
21try:
22    from hashlib import md5
23except ImportError:
24    # Allow for < Python 2.5
25    from md5 import md5
26
27
28TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
29
30mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
31
32from ndg.security.common.X509 import X500DN
33from ndg.security.test.unit.wsgi import PasteDeployAppServer
34
35try:
36    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
37                            String)
38    from sqlalchemy.ext.declarative import declarative_base
39    from sqlalchemy.orm import sessionmaker
40   
41    sqlAlchemyInstalled = True
42except ImportError:
43    sqlAlchemyInstalled = False
44   
45   
46class BaseTestCase(unittest.TestCase):
47    '''Convenience base class from which other unit tests can extend.  Its
48    sets the generic data directory path'''
49    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
50   
51    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
52    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
53   
54    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
55                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
56                                   
57    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
58                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
59                                   
60    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
61        'http://localhost:%s/AttributeAuthority/saml' % \
62                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
63                                   
64    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
65        'http://localhost:%s/AttributeAuthority/saml' % \
66                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
67                                   
68    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
69    SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \
70        'https://localhost:%d/AttributeAuthority' % \
71                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
72    SSL_CERT_DN = "/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost"
73                                   
74    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
75   
76    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
77        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
78   
79    _disableServiceStartup = lambda self: bool(os.environ.get(
80        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
81   
82    disableServiceStartup = property(fget=_disableServiceStartup,
83                                     doc="Stop automated start-up of services "
84                                         "for unit tests")
85   
86    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
87                                            TEST_CONFIG_DIR)
88   
89    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca')
90    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
91    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
92    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
93   
94    # Test database set-up
95    DB_FILENAME = 'user.db'
96    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
97    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
98   
99    USERNAME = 'pjk'
100    PASSWORD = 'testpassword'
101    MD5_PASSWORD = md5(PASSWORD).hexdigest()
102   
103    OPENID_URI_STEM = 'https://localhost:7443/openid/'
104    OPENID_IDENTIFIER = 'philip.kershaw'
105    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
106   
107    FIRSTNAME = 'Philip'
108    LASTNAME = 'Kershaw'
109    EMAILADDRESS = 'pjk@somewhere.ac.uk'
110   
111    ATTRIBUTE_NAMES = (
112        "urn:siteA:security:authz:1.0:attr",
113    )
114
115    ATTRIBUTE_VALUES = (
116        'urn:siteA:security:authz:1.0:attr:postdoc',
117        'urn:siteA:security:authz:1.0:attr:staff', 
118        'urn:siteA:security:authz:1.0:attr:undergrad', 
119        'urn:siteA:security:authz:1.0:attr:coapec',
120        'urn:siteA:security:authz:1.0:attr:rapid'
121    )
122    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
123   
124    VALID_REQUESTOR_IDS = (
125        X500DN.fromString("/O=Site A/CN=Authorisation Service"), 
126        X500DN.fromString("/O=Site B/CN=Authorisation Service"),
127        X500DN.fromString('/CN=test/O=NDG/OU=BADC')
128    )
129   
130    SSL_PEM_FILENAME = 'localhost.pem'
131    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
132   
133    def __init__(self, *arg, **kw):
134        if BaseTestCase.configDirEnvVarName not in os.environ:
135            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
136               
137        unittest.TestCase.__init__(self, *arg, **kw)
138        self.services = []
139       
140    def addService(self, *arg, **kw):
141        """Utility for setting up threads to run Paste HTTP based services with
142        unit tests
143       
144        @param cfgFilePath: ini file containing configuration for the service
145        @type cfgFilePath: basestring
146        @param port: port number to run the service from
147        @type port: int
148        """
149        if self.disableServiceStartup:
150            return
151       
152        withSSL = kw.pop('withSSL', False)
153        if withSSL:
154            from OpenSSL import SSL
155           
156            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
157            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
158           
159            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
160            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
161       
162            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
163            kw['ssl_context'].use_certificate_file(certFilePath)
164           
165        try:
166            self.services.append(PasteDeployAppServer(*arg, **kw))
167            self.services[-1].startThread()
168           
169        except socket.error:
170            pass
171
172    def startAttributeAuthorities(self, withSSL=False, port=None):
173        """Serve test Attribute Authorities to test against"""
174        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
175        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
176       
177    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
178        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
179                                              'sitea', 
180                                              'site-a.ini'))
181        self.addService(cfgFilePath=siteACfgFilePath, 
182                        port=(port or 
183                              BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM),
184                        withSSL=withSSL)
185       
186    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
187        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
188                                              'siteb', 
189                                              'site-b.ini'))
190        self.addService(cfgFilePath=siteBCfgFilePath, 
191                        port=(port or 
192                              BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM),
193                        withSSL=withSSL)
194
195    def __del__(self):
196        """Stop any services started with the addService method"""
197        if hasattr(self, 'services'):
198            for service in self.services:
199                service.terminateThread()
200 
201    @classmethod
202    def initDb(cls):
203        """Wrapper to _createDb - Create database only if it doesn't already
204        exist"""
205        if not os.path.isfile(cls.DB_FILEPATH):
206            cls._createDb()
207       
208    @classmethod 
209    def _createDb(cls):
210        """Create a test SQLite database with SQLAlchemy for use with unit
211        tests
212        """
213        log.debug("Creating database for %r ..." % cls.__name__)
214       
215        if not sqlAlchemyInstalled:
216            raise NotImplementedError("SQLAlchemy must be installed in order "
217                                      "for this method to be implemented")
218           
219        db = create_engine(cls.DB_CONNECTION_STR)
220       
221        metadata = MetaData()
222        usersTable = Table('users', metadata,
223                           Column('id', Integer, primary_key=True),
224                           Column('username', String),
225                           Column('md5password', String),
226                           Column('openid', String),
227                           Column('openid_identifier', String),
228                           Column('firstname', String),
229                           Column('lastname', String),
230                           Column('emailaddress', String))
231       
232        attributesTable = Table('attributes', metadata,
233                                Column('id', Integer, primary_key=True),
234                                Column('openid', String),
235                                Column('attributename', String))
236        metadata.create_all(db)
237       
238        class User(declarative_base()):
239            __tablename__ = 'users'
240       
241            id = Column(Integer, primary_key=True)
242            username = Column('username', String(40))
243            md5password = Column('md5password', String(64))
244            openid = Column('openid', String(128))
245            openid_identifier = Column('openid_identifier', String(40))
246            firstname = Column('firstname', String(40))
247            lastname = Column('lastname', String(40))
248            emailAddress = Column('emailaddress', String(40))
249       
250            def __init__(self, username, md5password, openid, openid_identifier, 
251                         firstname, lastname, emailaddress):
252                self.username = username
253                self.md5password = md5password
254                self.openid = openid
255                self.openid_identifier = openid_identifier
256                self.firstname = firstname
257                self.lastname = lastname
258                self.emailAddress = emailaddress
259       
260        class Attribute(declarative_base()):
261            __tablename__ = 'attributes'
262       
263            id = Column(Integer, primary_key=True)
264            openid = Column('openid', String(128))
265            attributename = Column('attributename', String(40))
266       
267            def __init__(self, openid, attributename):
268                self.openid = openid
269                self.attributename = attributename
270
271        Session = sessionmaker(bind=db)
272        session = Session()
273       
274        attributes = [Attribute(cls.OPENID_URI, attrVal)
275                      for attrVal in cls.ATTRIBUTE_VALUES]
276        session.add_all(attributes)
277           
278        user = User(cls.USERNAME, 
279                    cls.MD5_PASSWORD,
280                    cls.OPENID_URI,
281                    cls.OPENID_IDENTIFIER,
282                    cls.FIRSTNAME,
283                    cls.LASTNAME,
284                    cls.EMAILADDRESS)
285       
286        session.add(user)
287        session.commit() 
288
289
290def _getParentDir(depth=0, path=dirname(__file__)):
291    """
292    @type path: basestring
293    @param path: directory path from which to get parent directory, defaults
294    to dir of this module
295    @rtype: basestring
296    @return: parent directory at depth levels up from the current path
297    """
298    for i in range(depth):
299        path = dirname(path)
300    return path
301
302
Note: See TracBrowser for help on using the repository browser.