source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 6276

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py@6276
Revision 6276, 11.6 KB checked in by pjkersha, 11 years ago (diff)
  • ndg.security.server.wsgi.NDGSecurityMiddlewareBase: change code to leave trailing space in path info attribute
  • ndg.security.server.wsgi.openid.provider.OpenIDProviderMiddleware:
    • added properties with type checking
    • added trustedRelyingParties attribute - a list of sites trusted by the Provider for which no decide page interface is invoked.
  • ndg.security.server.wsgi.openid.provider.axinterface.csv: fixed adding of AX attributes for CSV class. Required attributes were being set twice.
  • ndg.security.server.wsgi.openid.relyingparty.OpenIDRelyingParty:
    • changed whitelisting to use ndg.security.server.wsgi.openid.relyingparty.validation.SSLIdPValidationDriver class. FIXME: Fix required in SSL verify callback needed to check only the server certificate in the verification chain and ignore CA certificates.
  • ndg.security.server.wsgi.openid.relyingparty.validation.SSLIdPValidationDriver: changed to init to include keyword option for installing default urllib2 opener and defaulting to OMIT it.
Line 
1"""NDG Security unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id: __init__.py 4840 2009-01-19 13:59:08Z pjkersha $'
11
12import unittest
13import logging
14import socket
15logging.basicConfig()
16log = logging.getLogger(__name__)
17
18import os
19from os.path import expandvars, join, dirname, abspath
20
21try:
22    from hashlib import md5
23except ImportError:
24    # Allow for < Python 2.5
25    from md5 import md5
26
27
28TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
29
30mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
31
32from ndg.security.common.X509 import X500DN
33from ndg.security.test.unit.wsgi import PasteDeployAppServer
34
35try:
36    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
37                            String)
38    from sqlalchemy.ext.declarative import declarative_base
39    from sqlalchemy.orm import sessionmaker
40   
41    sqlAlchemyInstalled = True
42except ImportError:
43    sqlAlchemyInstalled = False
44   
45   
46class BaseTestCase(unittest.TestCase):
47    '''Convenience base class from which other unit tests can extend.  Its
48    sets the generic data directory path'''
49    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
50   
51    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
52    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
53   
54    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
55                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
56                                   
57    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
58                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
59                                   
60    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
61        'http://localhost:%s/AttributeAuthority/saml' % \
62                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
63                                   
64    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
65        'http://localhost:%s/AttributeAuthority/saml' % \
66                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
67                                   
68    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
69    SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \
70        'https://localhost:%d/AttributeAuthority/saml' % \
71                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
72    SSL_CERT_DN = "/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost"
73                                   
74    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
75   
76    SESSIONMANAGER_PORTNUM = 5500
77   
78    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
79        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
80   
81    _disableServiceStartup = lambda self: bool(os.environ.get(
82        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
83   
84    disableServiceStartup = property(fget=_disableServiceStartup,
85                                     doc="Stop automated start-up of services "
86                                         "for unit tests")
87   
88    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
89                                            TEST_CONFIG_DIR)
90   
91    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca')
92    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
93    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
94    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
95   
96    # Test database set-up
97    DB_FILENAME = 'user.db'
98    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
99    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
100   
101    USERNAME = 'pjk'
102    PASSWORD = 'testpassword'
103    MD5_PASSWORD = md5(PASSWORD).hexdigest()
104   
105    OPENID_URI_STEM = 'https://localhost:7443/openid/'
106    OPENID_IDENTIFIER = 'philip.kershaw'
107    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
108   
109    FIRSTNAME = 'Philip'
110    LASTNAME = 'Kershaw'
111    EMAILADDRESS = 'pjk@somewhere.ac.uk'
112   
113    ATTRIBUTE_NAMES = (
114        "urn:siteA:security:authz:1.0:attr",
115    )
116
117    ATTRIBUTE_VALUES = (
118        'urn:siteA:security:authz:1.0:attr:postdoc',
119        'urn:siteA:security:authz:1.0:attr:staff', 
120        'urn:siteA:security:authz:1.0:attr:undergrad', 
121        'urn:siteA:security:authz:1.0:attr:coapec',
122        'urn:siteA:security:authz:1.0:attr:rapid'
123    )
124    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
125   
126    VALID_REQUESTOR_IDS = (
127        X500DN.fromString("/O=Site A/CN=Authorisation Service"), 
128        X500DN.fromString("/O=Site B/CN=Authorisation Service"),
129        X500DN.fromString('/CN=test/O=NDG/OU=BADC')
130    )
131   
132    SSL_PEM_FILENAME = 'localhost.pem'
133    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
134   
135    def __init__(self, *arg, **kw):
136        if BaseTestCase.configDirEnvVarName not in os.environ:
137            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
138               
139        unittest.TestCase.__init__(self, *arg, **kw)
140        self.services = []
141       
142    def addService(self, *arg, **kw):
143        """Utility for setting up threads to run Paste HTTP based services with
144        unit tests
145       
146        @param cfgFilePath: ini file containing configuration for the service
147        @type cfgFilePath: basestring
148        @param port: port number to run the service from
149        @type port: int
150        """
151        if self.disableServiceStartup:
152            return
153       
154        withSSL = kw.pop('withSSL', False)
155        if withSSL:
156            from OpenSSL import SSL
157           
158            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
159            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
160           
161            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
162            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
163       
164            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
165            kw['ssl_context'].use_certificate_file(certFilePath)
166           
167        try:
168            self.services.append(PasteDeployAppServer(*arg, **kw))
169            self.services[-1].startThread()
170           
171        except socket.error:
172            pass
173
174    def startAttributeAuthorities(self, withSSL=False, port=None):
175        """Serve test Attribute Authorities to test against"""
176        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
177        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
178       
179    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
180        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
181                                              'sitea', 
182                                              'site-a.ini'))
183        self.addService(cfgFilePath=siteACfgFilePath, 
184                        port=port or BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM,
185                        withSSL=withSSL)
186       
187    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
188        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
189                                              'siteb', 
190                                              'site-b.ini'))
191        self.addService(cfgFilePath=siteBCfgFilePath, 
192                        port=port or BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM,
193                        withSSL=withSSL)       
194
195    def startSessionManager(self):
196        """Serve test Session Manager service"""
197        cfgFilePath = mkDataDirPath(join('sessionmanager', 
198                                         'session-manager.ini'))
199        self.addService(cfgFilePath=cfgFilePath, 
200                        port=BaseTestCase.SESSIONMANAGER_PORTNUM)
201       
202
203    def __del__(self):
204        """Stop any services started with the addService method"""
205        if hasattr(self, 'services'):
206            for service in self.services:
207                service.terminateThread()
208 
209    @classmethod
210    def initDb(cls):
211        """Wrapper to _createDb - Create database only if it doesn't already
212        exist"""
213        if not os.path.isfile(cls.DB_FILEPATH):
214            cls._createDb()
215       
216    @classmethod 
217    def _createDb(cls):
218        """Create a test SQLite database with SQLAlchemy for use with unit
219        tests
220        """
221        log.debug("Creating database for %r ..." % cls.__name__)
222       
223        if not sqlAlchemyInstalled:
224            raise NotImplementedError("SQLAlchemy must be installed in order "
225                                      "for this method to be implemented")
226           
227        db = create_engine(cls.DB_CONNECTION_STR)
228       
229        metadata = MetaData()
230        usersTable = Table('users', metadata,
231                           Column('id', Integer, primary_key=True),
232                           Column('username', String),
233                           Column('md5password', String),
234                           Column('openid', String),
235                           Column('openid_identifier', String),
236                           Column('firstname', String),
237                           Column('lastname', String),
238                           Column('emailaddress', String))
239       
240        attributesTable = Table('attributes', metadata,
241                                Column('id', Integer, primary_key=True),
242                                Column('openid', String),
243                                Column('attributename', String))
244        metadata.create_all(db)
245       
246        class User(declarative_base()):
247            __tablename__ = 'users'
248       
249            id = Column(Integer, primary_key=True)
250            username = Column('username', String(40))
251            md5password = Column('md5password', String(64))
252            openid = Column('openid', String(128))
253            openid_identifier = Column('openid_identifier', String(40))
254            firstname = Column('firstname', String(40))
255            lastname = Column('lastname', String(40))
256            emailAddress = Column('emailaddress', String(40))
257       
258            def __init__(self, username, md5password, openid, openid_identifier, 
259                         firstname, lastname, emailaddress):
260                self.username = username
261                self.md5password = md5password
262                self.openid = openid
263                self.openid_identifier = openid_identifier
264                self.firstname = firstname
265                self.lastname = lastname
266                self.emailAddress = emailaddress
267       
268        class Attribute(declarative_base()):
269            __tablename__ = 'attributes'
270       
271            id = Column(Integer, primary_key=True)
272            openid = Column('openid', String(128))
273            attributename = Column('attributename', String(40))
274       
275            def __init__(self, openid, attributename):
276                self.openid = openid
277                self.attributename = attributename
278
279        Session = sessionmaker(bind=db)
280        session = Session()
281       
282        attributes = [Attribute(cls.OPENID_URI, attrVal)
283                      for attrVal in cls.ATTRIBUTE_VALUES]
284        session.add_all(attributes)
285           
286        user = User(cls.USERNAME, 
287                    cls.MD5_PASSWORD,
288                    cls.OPENID_URI,
289                    cls.OPENID_IDENTIFIER,
290                    cls.FIRSTNAME,
291                    cls.LASTNAME,
292                    cls.EMAILADDRESS)
293       
294        session.add(user)
295        session.commit() 
296
297
298def _getParentDir(depth=0, path=dirname(__file__)):
299    """
300    @type path: basestring
301    @param path: directory path from which to get parent directory, defaults
302    to dir of this module
303    @rtype: basestring
304    @return: parent directory at depth levels up from the current path
305    """
306    for i in range(depth):
307        path = dirname(path)
308    return path
309
310
Note: See TracBrowser for help on using the repository browser.