source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 6788

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py@6788
Revision 6788, 11.3 KB checked in by pjkersha, 10 years ago (diff)

Contains important fix for OpenIDProviderMiddleware - moved OpenIDResponse object from class member to session key to preserve separation between user sessions in sign in process. This bug was manifest in users being incorrectly redirected following login.

Line 
1"""NDG Security unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id: __init__.py 4840 2009-01-19 13:59:08Z pjkersha $'
11
12import unittest
13import logging
14import socket
15logging.basicConfig()
16log = logging.getLogger(__name__)
17
18import os
19from os.path import expandvars, join, dirname, abspath
20
21try:
22    from hashlib import md5
23except ImportError:
24    # Allow for < Python 2.5
25    from md5 import md5
26
27
28TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
29
30mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
31
32from ndg.security.common.X509 import X500DN
33from ndg.security.test.unit.wsgi import PasteDeployAppServer
34
35try:
36    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
37                            String)
38    from sqlalchemy.ext.declarative import declarative_base
39    from sqlalchemy.orm import sessionmaker
40   
41    sqlAlchemyInstalled = True
42except ImportError:
43    sqlAlchemyInstalled = False
44   
45   
46class BaseTestCase(unittest.TestCase):
47    '''Convenience base class from which other unit tests can extend.  Its
48    sets the generic data directory path'''
49    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
50   
51    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
52    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
53   
54    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
55                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
56                                   
57    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
58                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
59                                   
60    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
61        'http://localhost:%s/AttributeAuthority/saml' % \
62                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
63                                   
64    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
65        'http://localhost:%s/AttributeAuthority/saml' % \
66                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
67                                   
68    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
69    SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \
70        'https://localhost:%d/AttributeAuthority' % \
71                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
72    SSL_CERT_DN = "/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost"
73                                   
74    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
75   
76    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
77        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
78   
79    _disableServiceStartup = lambda self: bool(os.environ.get(
80        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
81   
82    disableServiceStartup = property(fget=_disableServiceStartup,
83                                     doc="Stop automated start-up of services "
84                                         "for unit tests")
85   
86    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
87                                            TEST_CONFIG_DIR)
88   
89    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca')
90    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
91    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
92    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
93   
94    # Test database set-up
95    DB_FILENAME = 'user.db'
96    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
97    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
98   
99    USERNAME = 'pjk'
100    PASSWORD = 'testpassword'
101    MD5_PASSWORD = md5(PASSWORD).hexdigest()
102   
103    OPENID_URI_STEM = 'https://localhost:7443/openid/'
104    OPENID_IDENTIFIER = 'philip.kershaw'
105    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
106   
107    FIRSTNAME = 'Philip'
108    LASTNAME = 'Kershaw'
109    EMAILADDRESS = 'pjk@somewhere.ac.uk'
110   
111    ATTRIBUTE_NAMES = (
112        "urn:siteA:security:authz:1.0:attr",
113    )
114
115    ATTRIBUTE_VALUES = (
116        'urn:siteA:security:authz:1.0:attr:postdoc',
117        'urn:siteA:security:authz:1.0:attr:staff', 
118        'urn:siteA:security:authz:1.0:attr:undergrad', 
119        'urn:siteA:security:authz:1.0:attr:coapec',
120        'urn:siteA:security:authz:1.0:attr:rapid'
121    )
122    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
123   
124    VALID_REQUESTOR_IDS = (
125        X500DN.fromString("/O=Site A/CN=Authorisation Service"), 
126        X500DN.fromString("/O=Site B/CN=Authorisation Service"),
127        X500DN.fromString('/CN=test/O=NDG/OU=BADC')
128    )
129   
130    SSL_PEM_FILENAME = 'localhost.pem'
131    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
132   
133    def __init__(self, *arg, **kw):
134        if BaseTestCase.configDirEnvVarName not in os.environ:
135            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
136               
137        unittest.TestCase.__init__(self, *arg, **kw)
138        self.services = []
139       
140    def addService(self, *arg, **kw):
141        """Utility for setting up threads to run Paste HTTP based services with
142        unit tests
143       
144        @param cfgFilePath: ini file containing configuration for the service
145        @type cfgFilePath: basestring
146        @param port: port number to run the service from
147        @type port: int
148        """
149        if self.disableServiceStartup:
150            return
151       
152        withSSL = kw.pop('withSSL', False)
153        if withSSL:
154            from OpenSSL import SSL
155           
156            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
157            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
158           
159            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
160            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
161       
162            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
163            kw['ssl_context'].use_certificate_file(certFilePath)
164           
165        try:
166            self.services.append(PasteDeployAppServer(*arg, **kw))
167            self.services[-1].startThread()
168           
169        except socket.error:
170            pass
171
172    def startAttributeAuthorities(self, withSSL=False, port=None):
173        """Serve test Attribute Authorities to test against"""
174        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
175        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
176       
177    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
178        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
179                                              'sitea', 
180                                              'site-a.ini'))
181        self.addService(cfgFilePath=siteACfgFilePath, 
182                        port=(port or 
183                              BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM),
184                        withSSL=withSSL)
185       
186    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
187        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
188                                              'siteb', 
189                                              'site-b.ini'))
190        self.addService(cfgFilePath=siteBCfgFilePath, 
191                        port=(port or 
192                              BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM),
193                        withSSL=withSSL)
194
195    def __del__(self):
196        """Stop any services started with the addService method"""
197        if hasattr(self, 'services'):
198            for service in self.services:
199                service.terminateThread()
200 
201    @classmethod
202    def initDb(cls):
203        """Wrapper to _createDb - Create database only if it doesn't already
204        exist"""
205        if not os.path.isfile(cls.DB_FILEPATH):
206            cls._createDb()
207       
208    @classmethod 
209    def _createDb(cls):
210        """Create a test SQLite database with SQLAlchemy for use with unit
211        tests
212        """
213        log.debug("Creating database for %r ..." % cls.__name__)
214       
215        if not sqlAlchemyInstalled:
216            raise NotImplementedError("SQLAlchemy must be installed in order "
217                                      "for this method to be implemented")
218           
219        db = create_engine(cls.DB_CONNECTION_STR)
220       
221        metadata = MetaData()
222        usersTable = Table('users', metadata,
223                           Column('id', Integer, primary_key=True),
224                           Column('username', String),
225                           Column('md5password', String),
226                           Column('openid', String),
227                           Column('openid_identifier', String),
228                           Column('firstname', String),
229                           Column('lastname', String),
230                           Column('emailaddress', String))
231       
232        attributesTable = Table('attributes', metadata,
233                                Column('id', Integer, primary_key=True),
234                                Column('openid', String),
235                                Column('attributename', String))
236        metadata.create_all(db)
237       
238        class User(declarative_base()):
239            __tablename__ = 'users'
240       
241            id = Column(Integer, primary_key=True)
242            username = Column('username', String(40))
243            md5password = Column('md5password', String(64))
244            openid = Column('openid', String(128))
245            openid_identifier = Column('openid_identifier', String(40))
246            firstname = Column('firstname', String(40))
247            lastname = Column('lastname', String(40))
248            emailAddress = Column('emailaddress', String(40))
249       
250            def __init__(self, username, md5password, openid, openid_identifier, 
251                         firstname, lastname, emailaddress):
252                self.username = username
253                self.md5password = md5password
254                self.openid = openid
255                self.openid_identifier = openid_identifier
256                self.firstname = firstname
257                self.lastname = lastname
258                self.emailAddress = emailaddress
259       
260        class Attribute(declarative_base()):
261            __tablename__ = 'attributes'
262       
263            id = Column(Integer, primary_key=True)
264            openid = Column('openid', String(128))
265            attributename = Column('attributename', String(40))
266       
267            def __init__(self, openid, attributename):
268                self.openid = openid
269                self.attributename = attributename
270
271        Session = sessionmaker(bind=db)
272        session = Session()
273       
274        attributes = [Attribute(cls.OPENID_URI, attrVal)
275                      for attrVal in cls.ATTRIBUTE_VALUES]
276        session.add_all(attributes)
277           
278        user = User(cls.USERNAME, 
279                    cls.MD5_PASSWORD,
280                    cls.OPENID_URI,
281                    cls.OPENID_IDENTIFIER,
282                    cls.FIRSTNAME,
283                    cls.LASTNAME,
284                    cls.EMAILADDRESS)
285       
286        session.add(user)
287        session.commit() 
288
289
290def _getParentDir(depth=0, path=dirname(__file__)):
291    """
292    @type path: basestring
293    @param path: directory path from which to get parent directory, defaults
294    to dir of this module
295    @rtype: basestring
296    @return: parent directory at depth levels up from the current path
297    """
298    for i in range(depth):
299        path = dirname(path)
300    return path
301
302
Note: See TracBrowser for help on using the repository browser.