source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 7843

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py
Revision 7843, 11.6 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 16: NDG Security 2.x.x - incl. updated Paster templates

  • Moved PasteDeployAppServer? class to ndg.security.server.utils.paste_utils module so that it can be included in scripts as part of the paster templates.
  • tidied and rationalised attribute and authorisation service scripts.
  • Property svn:keywords set to Id
Line 
1"""NDG Security unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11
12import unittest
13import logging
14import socket
15
16logging.basicConfig()
17log = logging.getLogger(__name__)
18
19import os
20from os.path import expandvars, join, dirname, abspath
21
22try:
23    from hashlib import md5
24except ImportError:
25    # Allow for < Python 2.5
26    from md5 import md5
27
28
29TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
30
31mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
32
33from ndg.security.common.X509 import X500DN
34from ndg.security.server.utils.paste_utils import PasteDeployAppServer
35from ndg.security.common.saml_utils.esgf import ESGFGroupRoleAttributeValue
36
37try:
38    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
39                            String)
40    from sqlalchemy.ext.declarative import declarative_base
41    from sqlalchemy.orm import sessionmaker
42   
43    sqlAlchemyInstalled = True
44except ImportError:
45    sqlAlchemyInstalled = False
46   
47   
48class BaseTestCase(unittest.TestCase):
49    '''Convenience base class from which other unit tests can extend.  Its
50    sets the generic data directory path'''
51    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
52   
53    AUTHORISATION_SERVICE_PORTNUM = 9443
54    AUTHORISATION_SERVICE_URI = 'https://localhost:%s/authorisation-service' % \
55                                AUTHORISATION_SERVICE_PORTNUM
56    AUTHORISATION_SERVICE_INI_FILEPATH = mkDataDirPath(
57            os.path.join('authorisationservice', 'authorisation-service.ini'))
58                         
59    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
60    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
61                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
62                                   
63    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
64    SITEA_SSL_ATTRIBUTEAUTHORITY_URI = \
65        'https://localhost:%d/AttributeAuthority' % \
66                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
67    SSL_CERT_DN = "/O=NDG/OU=Security/CN=localhost"
68                                   
69    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
70   
71    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
72        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
73   
74    _disableServiceStartup = lambda self: bool(os.environ.get(
75        self.__class__.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
76   
77    disableServiceStartup = property(fget=_disableServiceStartup,
78                                     doc="Stop automated start-up of services "
79                                         "for unit tests")
80   
81    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
82                                            TEST_CONFIG_DIR)
83   
84    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
85    CACERT_DIR = os.path.join(PKI_DIR, 'ca')
86    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
87    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
88   
89    # Test database set-up
90    DB_FILENAME = 'user.db'
91    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
92    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
93   
94    USERNAME = 'pjk'
95    PASSWORD = 'testpassword'
96    MD5_PASSWORD = md5(PASSWORD).hexdigest()
97   
98    OPENID_URI_STEM = 'https://localhost:7443/openid/'
99    OPENID_IDENTIFIER = 'philip.kershaw'
100    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
101   
102    FIRSTNAME = 'Philip'
103    LASTNAME = 'Kershaw'
104    EMAILADDRESS = 'pjk@somewhere.ac.uk'
105   
106    ATTRIBUTE_NAMES = (
107        "urn:siteA:security:authz:1.0:attr",
108        "urn:siteA:security:authz:1.0:attr",
109        "urn:siteA:security:authz:1.0:attr",
110        "urn:siteA:security:authz:1.0:attr",
111        "urn:siteA:security:authz:1.0:attr",
112        "urn:siteA:security:authz:1.0:attr",
113        "urn:esg:sitea:grouprole",
114    )
115
116    ATTRIBUTE_VALUES = (
117        'postdoc',
118        'staff', 
119        'undergrad', 
120        'coapec',
121        'rapid',
122        'admin',
123        'siteagroup:default'
124    )
125    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
126   
127    VALID_REQUESTOR_IDS = (
128        X500DN.fromString("/O=Site A/CN=Authorisation Service"), 
129        X500DN.fromString("/O=Site B/CN=Authorisation Service"),
130        X500DN.fromString('/CN=test/O=NDG/OU=BADC'),
131        X500DN.fromString('/O=NDG/OU=Security/CN=localhost')
132    )
133   
134    SSL_PEM_FILENAME = 'localhost.pem'
135    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
136   
137    def __init__(self, *arg, **kw):
138        if BaseTestCase.configDirEnvVarName not in os.environ:
139            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
140               
141        unittest.TestCase.__init__(self, *arg, **kw)
142        self.services = []
143       
144        self.__class__.initDb()
145       
146    def addService(self, *arg, **kw):
147        """Utility for setting up threads to run Paste HTTP based services with
148        unit tests
149       
150        @param cfgFilePath: ini file containing configuration for the service
151        @type cfgFilePath: basestring
152        @param port: port number to run the service from
153        @type port: int
154        """
155        if self.disableServiceStartup:
156            return
157       
158        withSSL = kw.pop('withSSL', False)
159        if withSSL:
160            from OpenSSL import SSL
161           
162            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
163            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
164           
165            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
166            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
167       
168            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
169            kw['ssl_context'].use_certificate_file(certFilePath)
170           
171        try:
172            self.services.append(PasteDeployAppServer(*arg, **kw))
173            self.services[-1].startThread()
174           
175        except socket.error:
176            pass
177
178    def startAttributeAuthorities(self, withSSL=False, port=None):
179        """Serve test Attribute Authorities to test against"""
180        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
181        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
182       
183    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
184        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
185                                              'sitea', 
186                                              'attribute-service.ini'))
187        self.addService(cfgFilePath=siteACfgFilePath, 
188                        port=(port or 
189                              BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM),
190                        withSSL=withSSL)
191       
192    def startAuthorisationService(self, 
193                                  withSSL=True, 
194                                  port=AUTHORISATION_SERVICE_PORTNUM):
195        self.addService(
196            cfgFilePath=self.__class__.AUTHORISATION_SERVICE_INI_FILEPATH, 
197            port=port,
198            withSSL=withSSL)
199       
200    def __del__(self):
201        self.stopAllServices()
202       
203    def stopAllServices(self):
204        """Stop any services started with the addService method"""
205        if hasattr(self, 'services'):
206            for service in self.services:
207                service.terminateThread()
208 
209    @classmethod
210    def initDb(cls):
211        """Wrapper to _createDb - Create database only if it doesn't already
212        exist"""
213        if not os.path.isfile(cls.DB_FILEPATH):
214            cls._createDb()
215       
216    @classmethod 
217    def _createDb(cls):
218        """Create a test SQLite database with SQLAlchemy for use with unit
219        tests
220        """
221        log.debug("Creating database for %r ..." % cls.__name__)
222       
223        if not sqlAlchemyInstalled:
224            raise NotImplementedError("SQLAlchemy must be installed in order "
225                                      "for this method to be implemented")
226           
227        db = create_engine(cls.DB_CONNECTION_STR)
228       
229        metadata = MetaData()
230        usersTable = Table('users', metadata,
231                           Column('id', Integer, primary_key=True),
232                           Column('username', String),
233                           Column('md5password', String),
234                           Column('openid', String),
235                           Column('openid_identifier', String),
236                           Column('firstname', String),
237                           Column('lastname', String),
238                           Column('emailaddress', String))
239       
240        attributesTable = Table('attributes', metadata,
241                                Column('id', Integer, primary_key=True),
242                                Column('openid', String),
243                                Column('attributename', String),
244                                Column('attributetype', String))
245        metadata.create_all(db)
246       
247        class User(declarative_base()):
248            __tablename__ = 'users'
249       
250            id = Column(Integer, primary_key=True)
251            username = Column('username', String(40))
252            md5password = Column('md5password', String(64))
253            openid = Column('openid', String(128))
254            openid_identifier = Column('openid_identifier', String(40))
255            firstname = Column('firstname', String(40))
256            lastname = Column('lastname', String(40))
257            emailAddress = Column('emailaddress', String(40))
258       
259            def __init__(self, username, md5password, openid, openid_identifier, 
260                         firstname, lastname, emailaddress):
261                self.username = username
262                self.md5password = md5password
263                self.openid = openid
264                self.openid_identifier = openid_identifier
265                self.firstname = firstname
266                self.lastname = lastname
267                self.emailAddress = emailaddress
268       
269        class Attribute(declarative_base()):
270            __tablename__ = 'attributes'
271       
272            id = Column(Integer, primary_key=True)
273            openid = Column('openid', String(128))
274            attributename = Column('attributename', String(40))
275            attributetype = Column('attributetype', String(40))
276       
277            def __init__(self, openid, attributetype, attributename):
278                self.openid = openid
279                self.attributetype = attributetype
280                self.attributename = attributename
281
282        Session = sessionmaker(bind=db)
283        session = Session()
284       
285        attributes = [Attribute(cls.OPENID_URI, attrType, attrVal)
286                      for attrType, attrVal in zip(cls.ATTRIBUTE_NAMES, 
287                                                   cls.ATTRIBUTE_VALUES)]
288        session.add_all(attributes)
289           
290        user = User(cls.USERNAME, 
291                    cls.MD5_PASSWORD,
292                    cls.OPENID_URI,
293                    cls.OPENID_IDENTIFIER,
294                    cls.FIRSTNAME,
295                    cls.LASTNAME,
296                    cls.EMAILADDRESS)
297       
298        session.add(user)
299        session.commit() 
300
301
302def dbAttr2ESGFGroupRole(attrVal):
303    """Callback for SQLAlchemyAttributeInterface class to convert attribute
304    value as stored in the SQLite Db defined here to an ESGF Group/Role
305    Attribute Value type
306    """
307    groupRoleAttrValue = ESGFGroupRoleAttributeValue()
308   
309    # The group/role is stored in a single field in the database with a colon
310    # separator
311    groupRoleAttrValue.value = attrVal.split(':')
312   
313    return groupRoleAttrValue
314
Note: See TracBrowser for help on using the repository browser.