source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 7168

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py@7168
Revision 7168, 11.9 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • testing ndg.security.server.wsgi.authz.pep.SamlPepMiddleware? in ndg.security.test.unit.authz.test_authz
  • Property svn:keywords set to Id
Line 
1"""NDG Security unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11
12import unittest
13import logging
14import socket
15logging.basicConfig()
16log = logging.getLogger(__name__)
17
18import os
19from os.path import expandvars, join, dirname, abspath
20
21try:
22    from hashlib import md5
23except ImportError:
24    # Allow for < Python 2.5
25    from md5 import md5
26
27
28TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
29
30mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
31
32from ndg.security.common.X509 import X500DN
33from ndg.security.test.unit.wsgi import PasteDeployAppServer
34
35try:
36    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
37                            String)
38    from sqlalchemy.ext.declarative import declarative_base
39    from sqlalchemy.orm import sessionmaker
40   
41    sqlAlchemyInstalled = True
42except ImportError:
43    sqlAlchemyInstalled = False
44   
45   
46class BaseTestCase(unittest.TestCase):
47    '''Convenience base class from which other unit tests can extend.  Its
48    sets the generic data directory path'''
49    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
50   
51    AUTHORISATION_SERVICE_PORTNUM = 9443
52    AUTHORISATION_SERVICE_URI = 'https://localhost:%s/authorisation-service' % \
53                                AUTHORISATION_SERVICE_PORTNUM
54    AUTHORISATION_SERVICE_INI_FILEPATH = mkDataDirPath(
55            os.path.join('authorisationservice', 'authorisation-service.ini'))
56                         
57    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
58    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
59   
60    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
61                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
62                                   
63    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
64                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
65                                   
66    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
67        'http://localhost:%s/AttributeAuthority/saml' % \
68                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
69                                   
70    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
71        'http://localhost:%s/AttributeAuthority/saml' % \
72                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
73                                   
74    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
75    SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \
76        'https://localhost:%d/AttributeAuthority' % \
77                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
78    SSL_CERT_DN = "/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost"
79                                   
80    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
81   
82    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
83        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
84   
85    _disableServiceStartup = lambda self: bool(os.environ.get(
86        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
87   
88    disableServiceStartup = property(fget=_disableServiceStartup,
89                                     doc="Stop automated start-up of services "
90                                         "for unit tests")
91   
92    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
93                                            TEST_CONFIG_DIR)
94   
95    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca')
96    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
97    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
98    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
99   
100    # Test database set-up
101    DB_FILENAME = 'user.db'
102    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
103    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
104   
105    USERNAME = 'pjk'
106    PASSWORD = 'testpassword'
107    MD5_PASSWORD = md5(PASSWORD).hexdigest()
108   
109    OPENID_URI_STEM = 'https://localhost:7443/openid/'
110    OPENID_IDENTIFIER = 'philip.kershaw'
111    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
112   
113    FIRSTNAME = 'Philip'
114    LASTNAME = 'Kershaw'
115    EMAILADDRESS = 'pjk@somewhere.ac.uk'
116   
117    ATTRIBUTE_NAMES = (
118        "urn:siteA:security:authz:1.0:attr",
119    )
120
121    ATTRIBUTE_VALUES = (
122        'urn:siteA:security:authz:1.0:attr:postdoc',
123        'urn:siteA:security:authz:1.0:attr:staff', 
124        'urn:siteA:security:authz:1.0:attr:undergrad', 
125        'urn:siteA:security:authz:1.0:attr:coapec',
126        'urn:siteA:security:authz:1.0:attr:rapid'
127    )
128    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
129   
130    VALID_REQUESTOR_IDS = (
131        X500DN.fromString("/O=Site A/CN=Authorisation Service"), 
132        X500DN.fromString("/O=Site B/CN=Authorisation Service"),
133        X500DN.fromString('/CN=test/O=NDG/OU=BADC')
134    )
135   
136    SSL_PEM_FILENAME = 'localhost.pem'
137    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
138   
139    def __init__(self, *arg, **kw):
140        if BaseTestCase.configDirEnvVarName not in os.environ:
141            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
142               
143        unittest.TestCase.__init__(self, *arg, **kw)
144        self.services = []
145       
146    def addService(self, *arg, **kw):
147        """Utility for setting up threads to run Paste HTTP based services with
148        unit tests
149       
150        @param cfgFilePath: ini file containing configuration for the service
151        @type cfgFilePath: basestring
152        @param port: port number to run the service from
153        @type port: int
154        """
155        if self.disableServiceStartup:
156            return
157       
158        withSSL = kw.pop('withSSL', False)
159        if withSSL:
160            from OpenSSL import SSL
161           
162            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
163            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
164           
165            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
166            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
167       
168            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
169            kw['ssl_context'].use_certificate_file(certFilePath)
170           
171        try:
172            self.services.append(PasteDeployAppServer(*arg, **kw))
173            self.services[-1].startThread()
174           
175        except socket.error:
176            pass
177
178    def startAttributeAuthorities(self, withSSL=False, port=None):
179        """Serve test Attribute Authorities to test against"""
180        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
181        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
182       
183    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
184        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
185                                              'sitea', 
186                                              'site-a.ini'))
187        self.addService(cfgFilePath=siteACfgFilePath, 
188                        port=(port or 
189                              BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM),
190                        withSSL=withSSL)
191       
192    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
193        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
194                                              'siteb', 
195                                              'site-b.ini'))
196        self.addService(cfgFilePath=siteBCfgFilePath, 
197                        port=(port or 
198                              BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM),
199                        withSSL=withSSL)
200       
201    def startAuthorisationService(self, 
202                                  withSSL=True, 
203                                  port=AUTHORISATION_SERVICE_PORTNUM):
204        self.addService(
205            cfgFilePath=self.__class__.AUTHORISATION_SERVICE_INI_FILEPATH, 
206            port=port,
207            withSSL=withSSL)
208       
209    def __del__(self):
210        """Stop any services started with the addService method"""
211        if hasattr(self, 'services'):
212            for service in self.services:
213                service.terminateThread()
214 
215    @classmethod
216    def initDb(cls):
217        """Wrapper to _createDb - Create database only if it doesn't already
218        exist"""
219        if not os.path.isfile(cls.DB_FILEPATH):
220            cls._createDb()
221       
222    @classmethod 
223    def _createDb(cls):
224        """Create a test SQLite database with SQLAlchemy for use with unit
225        tests
226        """
227        log.debug("Creating database for %r ..." % cls.__name__)
228       
229        if not sqlAlchemyInstalled:
230            raise NotImplementedError("SQLAlchemy must be installed in order "
231                                      "for this method to be implemented")
232           
233        db = create_engine(cls.DB_CONNECTION_STR)
234       
235        metadata = MetaData()
236        usersTable = Table('users', metadata,
237                           Column('id', Integer, primary_key=True),
238                           Column('username', String),
239                           Column('md5password', String),
240                           Column('openid', String),
241                           Column('openid_identifier', String),
242                           Column('firstname', String),
243                           Column('lastname', String),
244                           Column('emailaddress', String))
245       
246        attributesTable = Table('attributes', metadata,
247                                Column('id', Integer, primary_key=True),
248                                Column('openid', String),
249                                Column('attributename', String))
250        metadata.create_all(db)
251       
252        class User(declarative_base()):
253            __tablename__ = 'users'
254       
255            id = Column(Integer, primary_key=True)
256            username = Column('username', String(40))
257            md5password = Column('md5password', String(64))
258            openid = Column('openid', String(128))
259            openid_identifier = Column('openid_identifier', String(40))
260            firstname = Column('firstname', String(40))
261            lastname = Column('lastname', String(40))
262            emailAddress = Column('emailaddress', String(40))
263       
264            def __init__(self, username, md5password, openid, openid_identifier, 
265                         firstname, lastname, emailaddress):
266                self.username = username
267                self.md5password = md5password
268                self.openid = openid
269                self.openid_identifier = openid_identifier
270                self.firstname = firstname
271                self.lastname = lastname
272                self.emailAddress = emailaddress
273       
274        class Attribute(declarative_base()):
275            __tablename__ = 'attributes'
276       
277            id = Column(Integer, primary_key=True)
278            openid = Column('openid', String(128))
279            attributename = Column('attributename', String(40))
280       
281            def __init__(self, openid, attributename):
282                self.openid = openid
283                self.attributename = attributename
284
285        Session = sessionmaker(bind=db)
286        session = Session()
287       
288        attributes = [Attribute(cls.OPENID_URI, attrVal)
289                      for attrVal in cls.ATTRIBUTE_VALUES]
290        session.add_all(attributes)
291           
292        user = User(cls.USERNAME, 
293                    cls.MD5_PASSWORD,
294                    cls.OPENID_URI,
295                    cls.OPENID_IDENTIFIER,
296                    cls.FIRSTNAME,
297                    cls.LASTNAME,
298                    cls.EMAILADDRESS)
299       
300        session.add(user)
301        session.commit() 
302
303
304def _getParentDir(depth=0, path=dirname(__file__)):
305    """
306    @type path: basestring
307    @param path: directory path from which to get parent directory, defaults
308    to dir of this module
309    @rtype: basestring
310    @return: parent directory at depth levels up from the current path
311    """
312    for i in range(depth):
313        path = dirname(path)
314    return path
315
316
Note: See TracBrowser for help on using the repository browser.