source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 6052

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/__init__.py@6052
Revision 6052, 11.5 KB checked in by pjkersha, 11 years ago (diff)

Updated MyProxy? Cert extension app for use with improved SAML Attribute Query interface class AttributeQuerySslSOAPBinding

Line 
1"""NDG Security unit test package
2
3NERC Data Grid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id: __init__.py 4840 2009-01-19 13:59:08Z pjkersha $'
11
12import unittest
13import logging
14import socket
15logging.basicConfig()
16
17import os
18from os.path import expandvars, join, dirname, abspath
19
20try:
21    from hashlib import md5
22except ImportError:
23    # Allow for < Python 2.5
24    from md5 import md5
25
26
27TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
28
29mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
30
31from ndg.security.test.unit.wsgi import PasteDeployAppServer
32
33try:
34    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
35                            String)
36    from sqlalchemy.ext.declarative import declarative_base
37    from sqlalchemy.orm import sessionmaker
38   
39    sqlAlchemyInstalled = True
40except ImportError:
41    sqlAlchemyInstalled = False
42   
43   
44class BaseTestCase(unittest.TestCase):
45    '''Convenience base class from which other unit tests can extend.  Its
46    sets the generic data directory path'''
47    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
48   
49    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
50    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
51   
52    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
53                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
54                                   
55    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
56                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
57                                   
58    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
59        'http://localhost:%s/AttributeAuthority/saml' % \
60                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
61                                   
62    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
63        'http://localhost:%s/AttributeAuthority/saml' % \
64                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
65                                   
66    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
67    SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \
68        'https://localhost:%d/AttributeAuthority/saml' % \
69                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
70    SSL_CERT_DN = "/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost"
71                                   
72    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
73   
74    SESSIONMANAGER_PORTNUM = 5500
75   
76    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
77        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
78   
79    _disableServiceStartup = lambda self: bool(os.environ.get(
80        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
81   
82    disableServiceStartup = property(fget=_disableServiceStartup,
83                                     doc="Stop automated start-up of services "
84                                         "for unit tests")
85   
86    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
87                                            TEST_CONFIG_DIR)
88   
89    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca')
90    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
91    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
92    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
93   
94    # Test database set-up
95    DB_FILENAME = 'user.db'
96    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
97    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
98   
99    USERNAME = 'pjk'
100    PASSWORD = 'testpassword'
101    MD5_PASSWORD = md5(PASSWORD).hexdigest()
102   
103    OPENID_URI_STEM = 'https://openid.localhost/'
104    OPENID_IDENTIFIER = 'philip.kershaw'
105    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
106   
107    FIRSTNAME = 'Philip'
108    LASTNAME = 'Kershaw'
109    EMAILADDRESS = 'pjk@somewhere.ac.uk'
110   
111    ATTRIBUTE_NAMES = (
112        "urn:siteA:security:authz:1.0:attr",
113    )
114
115    ATTRIBUTE_VALUES = (
116        'urn:siteA:security:authz:1.0:attr:postdoc',
117        'urn:siteA:security:authz:1.0:attr:staff', 
118        'urn:siteA:security:authz:1.0:attr:undergrad', 
119        'urn:siteA:security:authz:1.0:attr:coapec',
120        'urn:siteA:security:authz:1.0:attr:rapid'
121    )
122    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
123   
124    VALID_REQUESTOR_IDS = ("/O=Site A/CN=Authorisation Service", 
125                           "/O=Site B/CN=Authorisation Service")
126   
127    SSL_PEM_FILENAME = 'localhost.pem'
128    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
129   
130    def __init__(self, *arg, **kw):
131        if BaseTestCase.configDirEnvVarName not in os.environ:
132            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
133               
134        unittest.TestCase.__init__(self, *arg, **kw)
135        self.services = []
136       
137    def addService(self, *arg, **kw):
138        """Utility for setting up threads to run Paste HTTP based services with
139        unit tests
140       
141        @param cfgFilePath: ini file containing configuration for the service
142        @type cfgFilePath: basestring
143        @param port: port number to run the service from
144        @type port: int
145        """
146        if self.disableServiceStartup:
147            return
148       
149        withSSL = kw.pop('withSSL', False)
150        if withSSL:
151            from OpenSSL import SSL
152           
153            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
154            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
155           
156            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
157            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
158       
159            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
160            kw['ssl_context'].use_certificate_file(certFilePath)
161           
162        try:
163            self.services.append(PasteDeployAppServer(*arg, **kw))
164            self.services[-1].startThread()
165           
166        except socket.error:
167            pass
168
169    def startAttributeAuthorities(self, withSSL=False, port=None):
170        """Serve test Attribute Authorities to test against"""
171        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
172        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
173       
174    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
175        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
176                                              'sitea', 
177                                              'site-a.ini'))
178        self.addService(cfgFilePath=siteACfgFilePath, 
179                        port=port or BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM,
180                        withSSL=withSSL)
181       
182    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
183        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
184                                              'siteb', 
185                                              'site-b.ini'))
186        self.addService(cfgFilePath=siteBCfgFilePath, 
187                        port=port or BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM,
188                        withSSL=withSSL)       
189
190    def startSessionManager(self):
191        """Serve test Session Manager service"""
192        cfgFilePath = mkDataDirPath(join('sessionmanager', 
193                                         'session-manager.ini'))
194        self.addService(cfgFilePath=cfgFilePath, 
195                        port=BaseTestCase.SESSIONMANAGER_PORTNUM)
196       
197
198    def __del__(self):
199        """Stop any services started with the addService method"""
200        if hasattr(self, 'services'):
201            for service in self.services:
202                service.terminateThread()
203 
204    def initDb(self):
205        """Wrapper to _createDb - Create database only if it doesn't already
206        exist"""
207        if not os.path.isfile(BaseTestCase.DB_FILEPATH):
208            self._createDb()
209           
210    def _createDb(self):
211        """Create a test SQLite database with SQLAlchemy for use with unit
212        tests
213        """
214        if not sqlAlchemyInstalled:
215            raise NotImplementedError("SQLAlchemy must be installed in order "
216                                      "for this method to be implemented")
217           
218        db = create_engine(BaseTestCase.DB_CONNECTION_STR)
219       
220        metadata = MetaData()
221        usersTable = Table('users', metadata,
222                           Column('id', Integer, primary_key=True),
223                           Column('username', String),
224                           Column('md5password', String),
225                           Column('openid', String),
226                           Column('openid_identifier', String),
227                           Column('firstname', String),
228                           Column('lastname', String),
229                           Column('emailaddress', String))
230       
231        attributesTable = Table('attributes', metadata,
232                                Column('id', Integer, primary_key=True),
233                                Column('username', String),
234                                Column('attributename', String))
235        metadata.create_all(db)
236       
237        class User(declarative_base()):
238            __tablename__ = 'users'
239       
240            id = Column(Integer, primary_key=True)
241            username = Column('username', String(40))
242            md5password = Column('md5password', String(64))
243            openid = Column('openid', String(128))
244            openid_identifier = Column('openid_identifier', String(40))
245            firstname = Column('firstname', String(40))
246            lastname = Column('lastname', String(40))
247            emailAddress = Column('emailaddress', String(40))
248       
249            def __init__(self, username, md5password, openid, openid_identifier, 
250                         firstname, lastname, emailaddress):
251                self.username = username
252                self.md5password = md5password
253                self.openid = openid
254                self.openid_identifier = openid_identifier
255                self.firstname = firstname
256                self.lastname = lastname
257                self.emailAddress = emailaddress
258       
259        class Attribute(declarative_base()):
260            __tablename__ = 'attributes'
261       
262            id = Column(Integer, primary_key=True)
263            username = Column('username', String(40))
264            attributename = Column('attributename', String(40))
265       
266            def __init__(self, username, attributename):
267                self.username = username
268                self.attributename = attributename
269               
270        Session = sessionmaker(bind=db)
271        session = Session()
272       
273        attributes = [Attribute(BaseTestCase.USERNAME, attrVal)
274                      for attrVal in BaseTestCase.ATTRIBUTE_VALUES]
275        session.add_all(attributes)
276           
277        user = User(BaseTestCase.USERNAME, 
278                    BaseTestCase.MD5_PASSWORD,
279                    BaseTestCase.OPENID_URI,
280                    BaseTestCase.OPENID_IDENTIFIER,
281                    BaseTestCase.FIRSTNAME,
282                    BaseTestCase.LASTNAME,
283                    BaseTestCase.EMAILADDRESS)
284       
285        session.add(user)
286        session.commit() 
287
288
289def _getParentDir(depth=0, path=dirname(__file__)):
290    """
291    @type path: basestring
292    @param path: directory path from which to get parent directory, defaults
293    to dir of this module
294    @rtype: basestring
295    @return: parent directory at depth levels up from the current path
296    """
297    for i in range(depth):
298        path = dirname(path)
299    return path
300
301
Note: See TracBrowser for help on using the repository browser.