source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 6051

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/__init__.py@6051
Revision 6051, 10.9 KB checked in by pjkersha, 10 years ago (diff)

Refactoring MyProxy? Cert Extension app following updates to the SAML Attribute Query client interface

Line 
1"""NDG Security unit test package
2
3NERC Data Grid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id: __init__.py 4840 2009-01-19 13:59:08Z pjkersha $'
11
12import unittest
13import logging
14import socket
15logging.basicConfig()
16
17import os
18from os.path import expandvars, join, dirname, abspath
19
20try:
21    from hashlib import md5
22except ImportError:
23    # Allow for < Python 2.5
24    from md5 import md5
25
26
27TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
28
29mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
30
31from ndg.security.test.unit.wsgi import PasteDeployAppServer
32
33try:
34    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
35                            String)
36    from sqlalchemy.ext.declarative import declarative_base
37    from sqlalchemy.orm import sessionmaker
38   
39    sqlAlchemyInstalled = True
40except ImportError:
41    sqlAlchemyInstalled = False
42   
43   
44class BaseTestCase(unittest.TestCase):
45    '''Convenience base class from which other unit tests can extend.  Its
46    sets the generic data directory path'''
47    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
48   
49    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
50    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
51   
52    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
53                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
54                                   
55    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
56                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
57                                   
58    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
59        'http://localhost:%s/AttributeAuthority/saml' % \
60                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
61                                   
62    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
63        'http://localhost:%s/AttributeAuthority/saml' % \
64                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
65                                   
66    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
67   
68    SESSIONMANAGER_PORTNUM = 5500
69   
70    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
71        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
72   
73    _disableServiceStartup = lambda self: bool(os.environ.get(
74        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
75   
76    disableServiceStartup = property(fget=_disableServiceStartup,
77                                     doc="Stop automated start-up of services "
78                                         "for unit tests")
79   
80    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
81                                            TEST_CONFIG_DIR)
82   
83    # Test database set-up
84    DB_FILENAME = 'user.db'
85    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
86    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
87   
88    USERNAME = 'pjk'
89    PASSWORD = 'testpassword'
90    MD5_PASSWORD = md5(PASSWORD).hexdigest()
91   
92    OPENID_URI_STEM = 'https://openid.localhost/'
93    OPENID_IDENTIFIER = 'philip.kershaw'
94    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
95   
96    FIRSTNAME = 'Philip'
97    LASTNAME = 'Kershaw'
98    EMAILADDRESS = 'pjk@somewhere.ac.uk'
99   
100    ATTRIBUTE_NAMES = (
101        "urn:siteA:security:authz:1.0:attr",
102    )
103
104    ATTRIBUTE_VALUES = (
105        'urn:siteA:security:authz:1.0:attr:postdoc',
106        'urn:siteA:security:authz:1.0:attr:staff', 
107        'urn:siteA:security:authz:1.0:attr:undergrad', 
108        'urn:siteA:security:authz:1.0:attr:coapec',
109        'urn:siteA:security:authz:1.0:attr:rapid'
110    )
111    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
112   
113    VALID_REQUESTOR_IDS = ("/O=Site A/CN=Authorisation Service", 
114                           "/O=Site B/CN=Authorisation Service")
115   
116    SSL_PEM_FILENAME = 'localhost.pem'
117    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
118   
119    def __init__(self, *arg, **kw):
120        if BaseTestCase.configDirEnvVarName not in os.environ:
121            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
122               
123        unittest.TestCase.__init__(self, *arg, **kw)
124        self.services = []
125       
126    def addService(self, *arg, **kw):
127        """Utility for setting up threads to run Paste HTTP based services with
128        unit tests
129       
130        @param cfgFilePath: ini file containing configuration for the service
131        @type cfgFilePath: basestring
132        @param port: port number to run the service from
133        @type port: int
134        """
135        if self.disableServiceStartup:
136            return
137       
138        withSSL = kw.pop('withSSL', False)
139        if withSSL:
140            from OpenSSL import SSL
141           
142            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
143            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
144           
145            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
146            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
147       
148            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
149            kw['ssl_context'].use_certificate_file(certFilePath)
150           
151        try:
152            self.services.append(PasteDeployAppServer(*arg, **kw))
153            self.services[-1].startThread()
154           
155        except socket.error:
156            pass
157
158    def startAttributeAuthorities(self, withSSL=False, port=None):
159        """Serve test Attribute Authorities to test against"""
160        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
161        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
162       
163    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
164        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
165                                              'sitea', 
166                                              'site-a.ini'))
167        self.addService(cfgFilePath=siteACfgFilePath, 
168                        port=port or BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM,
169                        withSSL=withSSL)
170       
171    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
172        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
173                                              'siteb', 
174                                              'site-b.ini'))
175        self.addService(cfgFilePath=siteBCfgFilePath, 
176                        port=port or BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM,
177                        withSSL=withSSL)       
178
179    def startSessionManager(self):
180        """Serve test Session Manager service"""
181        cfgFilePath = mkDataDirPath(join('sessionmanager', 
182                                         'session-manager.ini'))
183        self.addService(cfgFilePath=cfgFilePath, 
184                        port=BaseTestCase.SESSIONMANAGER_PORTNUM)
185       
186
187    def __del__(self):
188        """Stop any services started with the addService method"""
189        if hasattr(self, 'services'):
190            for service in self.services:
191                service.terminateThread()
192 
193    def initDb(self):
194        """Wrapper to _createDb - Create database only if it doesn't already
195        exist"""
196        if not os.path.isfile(BaseTestCase.DB_FILEPATH):
197            self._createDb()
198           
199    def _createDb(self):
200        """Create a test SQLite database with SQLAlchemy for use with unit
201        tests
202        """
203        if not sqlAlchemyInstalled:
204            raise NotImplementedError("SQLAlchemy must be installed in order "
205                                      "for this method to be implemented")
206           
207        db = create_engine(BaseTestCase.DB_CONNECTION_STR)
208       
209        metadata = MetaData()
210        usersTable = Table('users', metadata,
211                           Column('id', Integer, primary_key=True),
212                           Column('username', String),
213                           Column('md5password', String),
214                           Column('openid', String),
215                           Column('openid_identifier', String),
216                           Column('firstname', String),
217                           Column('lastname', String),
218                           Column('emailaddress', String))
219       
220        attributesTable = Table('attributes', metadata,
221                                Column('id', Integer, primary_key=True),
222                                Column('username', String),
223                                Column('attributename', String))
224        metadata.create_all(db)
225       
226        class User(declarative_base()):
227            __tablename__ = 'users'
228       
229            id = Column(Integer, primary_key=True)
230            username = Column('username', String(40))
231            md5password = Column('md5password', String(64))
232            openid = Column('openid', String(128))
233            openid_identifier = Column('openid_identifier', String(40))
234            firstname = Column('firstname', String(40))
235            lastname = Column('lastname', String(40))
236            emailAddress = Column('emailaddress', String(40))
237       
238            def __init__(self, username, md5password, openid, openid_identifier, 
239                         firstname, lastname, emailaddress):
240                self.username = username
241                self.md5password = md5password
242                self.openid = openid
243                self.openid_identifier = openid_identifier
244                self.firstname = firstname
245                self.lastname = lastname
246                self.emailAddress = emailaddress
247       
248        class Attribute(declarative_base()):
249            __tablename__ = 'attributes'
250       
251            id = Column(Integer, primary_key=True)
252            username = Column('username', String(40))
253            attributename = Column('attributename', String(40))
254       
255            def __init__(self, username, attributename):
256                self.username = username
257                self.attributename = attributename
258               
259        Session = sessionmaker(bind=db)
260        session = Session()
261       
262        attributes = [Attribute(BaseTestCase.USERNAME, attrVal)
263                      for attrVal in BaseTestCase.ATTRIBUTE_VALUES]
264        session.add_all(attributes)
265           
266        user = User(BaseTestCase.USERNAME, 
267                    BaseTestCase.MD5_PASSWORD,
268                    BaseTestCase.OPENID_URI,
269                    BaseTestCase.OPENID_IDENTIFIER,
270                    BaseTestCase.FIRSTNAME,
271                    BaseTestCase.LASTNAME,
272                    BaseTestCase.EMAILADDRESS)
273       
274        session.add(user)
275        session.commit() 
276
277
278def _getParentDir(depth=0, path=dirname(__file__)):
279    """
280    @type path: basestring
281    @param path: directory path from which to get parent directory, defaults
282    to dir of this module
283    @rtype: basestring
284    @return: parent directory at depth levels up from the current path
285    """
286    for i in range(depth):
287        path = dirname(path)
288    return path
289
290
Note: See TracBrowser for help on using the repository browser.