source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 6033

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/__init__.py@6033
Revision 6033, 9.7 KB checked in by pjkersha, 10 years ago (diff)

Refactoring Credential Wallet to enable caching of SAML assertions.

Line 
1"""NDG Security unit test package
2
3NERC Data Grid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id: __init__.py 4840 2009-01-19 13:59:08Z pjkersha $'
11
12import unittest
13import logging
14import socket
15logging.basicConfig()
16
17import os
18from os.path import expandvars, join, dirname, abspath
19
20try:
21    from hashlib import md5
22except ImportError:
23    # Allow for < Python 2.5
24    from md5 import md5
25
26
27TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
28
29mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
30
31from ndg.security.test.unit.wsgi import PasteDeployAppServer
32
33try:
34    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
35                            String)
36    from sqlalchemy.ext.declarative import declarative_base
37    from sqlalchemy.orm import sessionmaker
38   
39    sqlAlchemyInstalled = True
40except ImportError:
41    sqlAlchemyInstalled = False
42   
43   
44class BaseTestCase(unittest.TestCase):
45    '''Convenience base class from which other unit tests can extend.  Its
46    sets the generic data directory path'''
47    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
48   
49    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
50    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
51   
52    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
53                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
54                                   
55    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
56                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
57                                   
58    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
59        'http://localhost:%s/AttributeAuthority/saml' % \
60                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
61                                   
62    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
63        'http://localhost:%s/AttributeAuthority/saml' % \
64                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
65   
66    SESSIONMANAGER_PORTNUM = 5500
67   
68    mkDataDirPath = staticmethod(mkDataDirPath)
69   
70    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
71        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
72   
73    _disableServiceStartup = lambda self: bool(os.environ.get(
74        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
75   
76    disableServiceStartup = property(fget=_disableServiceStartup,
77                                     doc="Stop automated start-up of services "
78                                         "for unit tests")
79   
80    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
81                                            TEST_CONFIG_DIR)
82   
83    # Test database set-up
84    DB_FILENAME = 'user.db'
85    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
86    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
87   
88    USERNAME = 'pjk'
89    PASSWORD = 'testpassword'
90    MD5_PASSWORD = md5(PASSWORD).hexdigest()
91   
92    OPENID_URI_STEM = 'https://openid.localhost/'
93    OPENID_IDENTIFIER = 'philip.kershaw'
94    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
95   
96    FIRSTNAME = 'Philip'
97    LASTNAME = 'Kershaw'
98    EMAILADDRESS = 'pjk@somewhere.ac.uk'
99   
100    ATTRIBUTE_NAMES = (
101        "urn:siteA:security:authz:1.0:attr",
102    )
103
104    ATTRIBUTE_VALUES = (
105        'urn:siteA:security:authz:1.0:attr:postdoc',
106        'urn:siteA:security:authz:1.0:attr:staff', 
107        'urn:siteA:security:authz:1.0:attr:undergrad', 
108        'urn:siteA:security:authz:1.0:attr:coapec',
109        'urn:siteA:security:authz:1.0:attr:rapid'
110    )
111    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
112   
113    def __init__(self, *arg, **kw):
114        if BaseTestCase.configDirEnvVarName not in os.environ:
115            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
116               
117        unittest.TestCase.__init__(self, *arg, **kw)
118        self.services = []
119       
120    def addService(self, *arg, **kw):
121        """Utility for setting up threads to run Paste HTTP based services with
122        unit tests
123       
124        @param cfgFilePath: ini file containing configuration for the service
125        @type cfgFilePath: basestring
126        @param port: port number to run the service from
127        @type port: int
128        """
129        if self.disableServiceStartup:
130            return
131       
132        try:
133            self.services.append(PasteDeployAppServer(*arg, **kw))
134            self.services[-1].startThread()
135           
136        except socket.error:
137            pass
138
139    def startAttributeAuthorities(self):
140        """Serve test Attribute Authorities to test against"""
141        self.startSiteAAttributeAuthority()
142        self.startSiteBAttributeAuthority()
143       
144    def startSiteAAttributeAuthority(self):
145        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
146                                              'sitea', 
147                                              'site-a.ini'))
148        self.addService(cfgFilePath=siteACfgFilePath, 
149                        port=BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM)
150       
151    def startSiteBAttributeAuthority(self):
152        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
153                                              'siteb', 
154                                              'site-b.ini'))
155        self.addService(cfgFilePath=siteBCfgFilePath, 
156                        port=BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM)       
157
158    def startSessionManager(self):
159        """Serve test Session Manager service"""
160        cfgFilePath = mkDataDirPath(join('sessionmanager', 
161                                         'session-manager.ini'))
162        self.addService(cfgFilePath=cfgFilePath, 
163                        port=BaseTestCase.SESSIONMANAGER_PORTNUM)
164       
165
166    def __del__(self):
167        """Stop any services started with the addService method"""
168        if hasattr(self, 'services'):
169            for service in self.services:
170                service.terminateThread()
171 
172    def initDb(self):
173        """Wrapper to _createDb - Create database only if it doesn't already
174        exist"""
175        if not os.path.isfile(BaseTestCase.DB_FILEPATH):
176            self._createDb()
177           
178    def _createDb(self):
179        """Create a test SQLite database with SQLAlchemy for use with unit
180        tests
181        """
182        if not sqlAlchemyInstalled:
183            raise NotImplementedError("SQLAlchemy must be installed in order "
184                                      "for this method to be implemented")
185           
186        db = create_engine(BaseTestCase.DB_CONNECTION_STR)
187       
188        metadata = MetaData()
189        usersTable = Table('users', metadata,
190                           Column('id', Integer, primary_key=True),
191                           Column('username', String),
192                           Column('md5password', String),
193                           Column('openid_identifier', String),
194                           Column('firstname', String),
195                           Column('lastname', String),
196                           Column('emailaddress', String))
197       
198        attributesTable = Table('attributes', metadata,
199                                Column('id', Integer, primary_key=True),
200                                Column('username', String),
201                                Column('attributename', String))
202        metadata.create_all(db)
203       
204        class User(declarative_base()):
205            __tablename__ = 'users'
206       
207            id = Column(Integer, primary_key=True)
208            username = Column('username', String(40))
209            md5password = Column('md5password', String(40))
210            openid_identifier = Column('openid_identifier', String(40))
211            firstname = Column('firstname', String(40))
212            lastname = Column('lastname', String(40))
213            emailAddress = Column('emailaddress', String(40))
214       
215            def __init__(self, username, md5password, openid_identifier, 
216                         firstname, lastname, emailaddress):
217                self.username = username
218                self.md5password = md5password
219                self.openid_identifier = openid_identifier
220                self.firstname = firstname
221                self.lastname = lastname
222                self.emailAddress = emailaddress
223       
224        class Attribute(declarative_base()):
225            __tablename__ = 'attributes'
226       
227            id = Column(Integer, primary_key=True)
228            username = Column('username', String(40))
229            attributename = Column('attributename', String(40))
230       
231            def __init__(self, username, attributename):
232                self.username = username
233                self.attributename = attributename
234               
235        Session = sessionmaker(bind=db)
236        session = Session()
237       
238        attributes = [Attribute(BaseTestCase.USERNAME, attrVal)
239                      for attrVal in BaseTestCase.ATTRIBUTE_VALUES]
240        session.add_all(attributes)
241           
242        user = User(BaseTestCase.USERNAME, 
243                    BaseTestCase.MD5_PASSWORD,
244                    BaseTestCase.OPENID_IDENTIFIER,
245                    BaseTestCase.FIRSTNAME,
246                    BaseTestCase.LASTNAME,
247                    BaseTestCase.EMAILADDRESS)
248       
249        session.add(user)
250        session.commit() 
251
252
253def _getParentDir(depth=0, path=dirname(__file__)):
254    """
255    @type path: basestring
256    @param path: directory path from which to get parent directory, defaults
257    to dir of this module
258    @rtype: basestring
259    @return: parent directory at depth levels up from the current path
260    """
261    for i in range(depth):
262        path = dirname(path)
263    return path
264
265
Note: See TracBrowser for help on using the repository browser.