source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 7358

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py@7358
Revision 7358, 11.9 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • added caching capability to Policy Information Point. This enables the PIP to retrieve previously cached assertions from an Attribute Authority optimising performance. Caching is done with beaker.session but instead of indexing based on a cookie, it's based on the subject Id i.e. for ESG, a user's OpenID.
  • Property svn:keywords set to Id
Line 
1"""NDG Security unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11
12import unittest
13import logging
14import socket
15logging.basicConfig()
16log = logging.getLogger(__name__)
17
18import os
19from os.path import expandvars, join, dirname, abspath
20
21try:
22    from hashlib import md5
23except ImportError:
24    # Allow for < Python 2.5
25    from md5 import md5
26
27
28TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
29
30mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
31
32from ndg.security.common.X509 import X500DN
33from ndg.security.test.unit.wsgi import PasteDeployAppServer
34
35try:
36    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
37                            String)
38    from sqlalchemy.ext.declarative import declarative_base
39    from sqlalchemy.orm import sessionmaker
40   
41    sqlAlchemyInstalled = True
42except ImportError:
43    sqlAlchemyInstalled = False
44   
45   
46class BaseTestCase(unittest.TestCase):
47    '''Convenience base class from which other unit tests can extend.  Its
48    sets the generic data directory path'''
49    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
50   
51    AUTHORISATION_SERVICE_PORTNUM = 9443
52    AUTHORISATION_SERVICE_URI = 'https://localhost:%s/authorisation-service' % \
53                                AUTHORISATION_SERVICE_PORTNUM
54    AUTHORISATION_SERVICE_INI_FILEPATH = mkDataDirPath(
55            os.path.join('authorisationservice', 'authorisation-service.ini'))
56                         
57    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
58    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
59   
60    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
61                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
62                                   
63    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
64                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
65                                   
66    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
67        'http://localhost:%s/AttributeAuthority/saml' % \
68                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
69                                   
70    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
71        'http://localhost:%s/AttributeAuthority/saml' % \
72                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
73                                   
74    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
75    SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \
76        'https://localhost:%d/AttributeAuthority' % \
77                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
78    SSL_CERT_DN = "/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost"
79                                   
80    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
81   
82    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
83        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
84   
85    _disableServiceStartup = lambda self: bool(os.environ.get(
86        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
87   
88    disableServiceStartup = property(fget=_disableServiceStartup,
89                                     doc="Stop automated start-up of services "
90                                         "for unit tests")
91   
92    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
93                                            TEST_CONFIG_DIR)
94   
95    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca')
96    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
97    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
98    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
99   
100    # Test database set-up
101    DB_FILENAME = 'user.db'
102    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
103    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
104   
105    USERNAME = 'pjk'
106    PASSWORD = 'testpassword'
107    MD5_PASSWORD = md5(PASSWORD).hexdigest()
108   
109    OPENID_URI_STEM = 'https://localhost:7443/openid/'
110    OPENID_IDENTIFIER = 'philip.kershaw'
111    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
112   
113    FIRSTNAME = 'Philip'
114    LASTNAME = 'Kershaw'
115    EMAILADDRESS = 'pjk@somewhere.ac.uk'
116   
117    ATTRIBUTE_NAMES = (
118        "urn:siteA:security:authz:1.0:attr",
119    )
120
121    ATTRIBUTE_VALUES = (
122        'postdoc',
123        'staff', 
124        'undergrad', 
125        'coapec',
126        'rapid',
127        'admin'
128    )
129    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
130   
131    VALID_REQUESTOR_IDS = (
132        X500DN.fromString("/O=Site A/CN=Authorisation Service"), 
133        X500DN.fromString("/O=Site B/CN=Authorisation Service"),
134        X500DN.fromString('/CN=test/O=NDG/OU=BADC'),
135        X500DN.fromString('/O=NDG/OU=Security/CN=localhost')
136    )
137   
138    SSL_PEM_FILENAME = 'localhost.pem'
139    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
140   
141    def __init__(self, *arg, **kw):
142        if BaseTestCase.configDirEnvVarName not in os.environ:
143            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
144               
145        unittest.TestCase.__init__(self, *arg, **kw)
146        self.services = []
147       
148    def addService(self, *arg, **kw):
149        """Utility for setting up threads to run Paste HTTP based services with
150        unit tests
151       
152        @param cfgFilePath: ini file containing configuration for the service
153        @type cfgFilePath: basestring
154        @param port: port number to run the service from
155        @type port: int
156        """
157        if self.disableServiceStartup:
158            return
159       
160        withSSL = kw.pop('withSSL', False)
161        if withSSL:
162            from OpenSSL import SSL
163           
164            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
165            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
166           
167            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
168            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
169       
170            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
171            kw['ssl_context'].use_certificate_file(certFilePath)
172           
173        try:
174            self.services.append(PasteDeployAppServer(*arg, **kw))
175            self.services[-1].startThread()
176           
177        except socket.error:
178            pass
179
180    def startAttributeAuthorities(self, withSSL=False, port=None):
181        """Serve test Attribute Authorities to test against"""
182        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
183        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
184       
185    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
186        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
187                                              'sitea', 
188                                              'site-a.ini'))
189        self.addService(cfgFilePath=siteACfgFilePath, 
190                        port=(port or 
191                              BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM),
192                        withSSL=withSSL)
193       
194    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
195        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
196                                              'siteb', 
197                                              'site-b.ini'))
198        self.addService(cfgFilePath=siteBCfgFilePath, 
199                        port=(port or 
200                              BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM),
201                        withSSL=withSSL)
202       
203    def startAuthorisationService(self, 
204                                  withSSL=True, 
205                                  port=AUTHORISATION_SERVICE_PORTNUM):
206        self.addService(
207            cfgFilePath=self.__class__.AUTHORISATION_SERVICE_INI_FILEPATH, 
208            port=port,
209            withSSL=withSSL)
210       
211    def __del__(self):
212        self.stopAllServices()
213       
214    def stopAllServices(self):
215        """Stop any services started with the addService method"""
216        if hasattr(self, 'services'):
217            for service in self.services:
218                service.terminateThread()
219 
220    @classmethod
221    def initDb(cls):
222        """Wrapper to _createDb - Create database only if it doesn't already
223        exist"""
224        if not os.path.isfile(cls.DB_FILEPATH):
225            cls._createDb()
226       
227    @classmethod 
228    def _createDb(cls):
229        """Create a test SQLite database with SQLAlchemy for use with unit
230        tests
231        """
232        log.debug("Creating database for %r ..." % cls.__name__)
233       
234        if not sqlAlchemyInstalled:
235            raise NotImplementedError("SQLAlchemy must be installed in order "
236                                      "for this method to be implemented")
237           
238        db = create_engine(cls.DB_CONNECTION_STR)
239       
240        metadata = MetaData()
241        usersTable = Table('users', metadata,
242                           Column('id', Integer, primary_key=True),
243                           Column('username', String),
244                           Column('md5password', String),
245                           Column('openid', String),
246                           Column('openid_identifier', String),
247                           Column('firstname', String),
248                           Column('lastname', String),
249                           Column('emailaddress', String))
250       
251        attributesTable = Table('attributes', metadata,
252                                Column('id', Integer, primary_key=True),
253                                Column('openid', String),
254                                Column('attributename', String))
255        metadata.create_all(db)
256       
257        class User(declarative_base()):
258            __tablename__ = 'users'
259       
260            id = Column(Integer, primary_key=True)
261            username = Column('username', String(40))
262            md5password = Column('md5password', String(64))
263            openid = Column('openid', String(128))
264            openid_identifier = Column('openid_identifier', String(40))
265            firstname = Column('firstname', String(40))
266            lastname = Column('lastname', String(40))
267            emailAddress = Column('emailaddress', String(40))
268       
269            def __init__(self, username, md5password, openid, openid_identifier, 
270                         firstname, lastname, emailaddress):
271                self.username = username
272                self.md5password = md5password
273                self.openid = openid
274                self.openid_identifier = openid_identifier
275                self.firstname = firstname
276                self.lastname = lastname
277                self.emailAddress = emailaddress
278       
279        class Attribute(declarative_base()):
280            __tablename__ = 'attributes'
281       
282            id = Column(Integer, primary_key=True)
283            openid = Column('openid', String(128))
284            attributename = Column('attributename', String(40))
285       
286            def __init__(self, openid, attributename):
287                self.openid = openid
288                self.attributename = attributename
289
290        Session = sessionmaker(bind=db)
291        session = Session()
292       
293        attributes = [Attribute(cls.OPENID_URI, attrVal)
294                      for attrVal in cls.ATTRIBUTE_VALUES]
295        session.add_all(attributes)
296           
297        user = User(cls.USERNAME, 
298                    cls.MD5_PASSWORD,
299                    cls.OPENID_URI,
300                    cls.OPENID_IDENTIFIER,
301                    cls.FIRSTNAME,
302                    cls.LASTNAME,
303                    cls.EMAILADDRESS)
304       
305        session.add(user)
306        session.commit() 
307
308
309def _getParentDir(depth=0, path=dirname(__file__)):
310    """
311    @type path: basestring
312    @param path: directory path from which to get parent directory, defaults
313    to dir of this module
314    @rtype: basestring
315    @return: parent directory at depth levels up from the current path
316    """
317    for i in range(depth):
318        path = dirname(path)
319    return path
320
321
Note: See TracBrowser for help on using the repository browser.