Ignore:
Timestamp:
25/01/10 14:17:49 (11 years ago)
Author:
pjkersha
Message:
 
File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/WSSecurity/ndg/wssecurity/test/unit/__init__.py

    r6276 r6396  
    4747    '''Convenience base class from which other unit tests can extend.  Its 
    4848    sets the generic data directory path''' 
    49     configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR' 
    50      
    51     SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000 
    52     SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100 
    53      
    54     SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \ 
    55                                     SITEA_ATTRIBUTEAUTHORITY_PORTNUM 
    56                                      
    57     SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \ 
    58                                     SITEB_ATTRIBUTEAUTHORITY_PORTNUM 
    59                                      
    60     SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \ 
    61         'http://localhost:%s/AttributeAuthority/saml' % \ 
    62                                     SITEA_ATTRIBUTEAUTHORITY_PORTNUM 
    63                                      
    64     SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \ 
    65         'http://localhost:%s/AttributeAuthority/saml' % \ 
    66                                     SITEB_ATTRIBUTEAUTHORITY_PORTNUM 
    67                                      
    68     SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443 
    69     SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \ 
    70         'https://localhost:%d/AttributeAuthority/saml' % \ 
    71                                     SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM 
    72     SSL_CERT_DN = "/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost" 
    73                                      
    74     SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority" 
    75      
    76     SESSIONMANAGER_PORTNUM = 5500 
     49    TEST_CONFIG_DIR_VARNAME = 'NDGSEC_TEST_CONFIG_DIR' 
     50 
    7751     
    7852    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \ 
     
    8660                                         "for unit tests") 
    8761     
    88     NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName,  
     62    NDGSEC_TEST_CONFIG_DIR = os.environ.get(TEST_CONFIG_DIR_VARNAME,  
    8963                                            TEST_CONFIG_DIR) 
    9064     
    9165    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca') 
    9266    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki') 
    93     SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt') 
    94     SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key') 
    95      
    96     # Test database set-up 
    97     DB_FILENAME = 'user.db' 
    98     DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME) 
    99     DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH 
    100      
    101     USERNAME = 'pjk' 
    102     PASSWORD = 'testpassword' 
    103     MD5_PASSWORD = md5(PASSWORD).hexdigest() 
    104      
    105     OPENID_URI_STEM = 'https://localhost:7443/openid/' 
    106     OPENID_IDENTIFIER = 'philip.kershaw' 
    107     OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER 
    108      
    109     FIRSTNAME = 'Philip' 
    110     LASTNAME = 'Kershaw' 
    111     EMAILADDRESS = 'pjk@somewhere.ac.uk' 
    112      
    113     ATTRIBUTE_NAMES = ( 
    114         "urn:siteA:security:authz:1.0:attr", 
    115     ) 
    116  
    117     ATTRIBUTE_VALUES = ( 
    118         'urn:siteA:security:authz:1.0:attr:postdoc', 
    119         'urn:siteA:security:authz:1.0:attr:staff',  
    120         'urn:siteA:security:authz:1.0:attr:undergrad',  
    121         'urn:siteA:security:authz:1.0:attr:coapec', 
    122         'urn:siteA:security:authz:1.0:attr:rapid' 
    123     ) 
    124     N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES) 
    125      
    126     VALID_REQUESTOR_IDS = ( 
    127         X500DN.fromString("/O=Site A/CN=Authorisation Service"),  
    128         X500DN.fromString("/O=Site B/CN=Authorisation Service"), 
    129         X500DN.fromString('/CN=test/O=NDG/OU=BADC') 
    130     ) 
    131      
    132     SSL_PEM_FILENAME = 'localhost.pem' 
    133     SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME)) 
    13467     
    13568    def __init__(self, *arg, **kw): 
    136         if BaseTestCase.configDirEnvVarName not in os.environ: 
    137             os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR 
     69        if BaseTestCase.TEST_CONFIG_DIR_VARNAME not in os.environ: 
     70            os.environ[BaseTestCase.TEST_CONFIG_DIR_VARNAME] = TEST_CONFIG_DIR 
    13871                 
    13972        unittest.TestCase.__init__(self, *arg, **kw) 
     
    171104        except socket.error: 
    172105            pass 
    173  
    174     def startAttributeAuthorities(self, withSSL=False, port=None): 
    175         """Serve test Attribute Authorities to test against""" 
    176         self.startSiteAAttributeAuthority(withSSL=withSSL, port=port) 
    177         self.startSiteBAttributeAuthority(withSSL=withSSL, port=port) 
    178106         
    179     def startSiteAAttributeAuthority(self, withSSL=False, port=None): 
    180         siteACfgFilePath = mkDataDirPath(join('attributeauthority',  
    181                                               'sitea',  
    182                                               'site-a.ini')) 
    183         self.addService(cfgFilePath=siteACfgFilePath,  
    184                         port=port or BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM, 
    185                         withSSL=withSSL) 
    186          
    187     def startSiteBAttributeAuthority(self, withSSL=False, port=None): 
    188         siteBCfgFilePath = mkDataDirPath(join('attributeauthority', 
    189                                               'siteb',  
    190                                               'site-b.ini')) 
    191         self.addService(cfgFilePath=siteBCfgFilePath,  
    192                         port=port or BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM, 
    193                         withSSL=withSSL)         
    194  
    195     def startSessionManager(self): 
    196         """Serve test Session Manager service""" 
    197         cfgFilePath = mkDataDirPath(join('sessionmanager',  
    198                                          'session-manager.ini')) 
    199         self.addService(cfgFilePath=cfgFilePath,  
    200                         port=BaseTestCase.SESSIONMANAGER_PORTNUM) 
    201          
    202  
    203107    def __del__(self): 
    204108        """Stop any services started with the addService method""" 
     
    206110            for service in self.services: 
    207111                service.terminateThread() 
    208   
    209     @classmethod 
    210     def initDb(cls): 
    211         """Wrapper to _createDb - Create database only if it doesn't already  
    212         exist""" 
    213         if not os.path.isfile(cls.DB_FILEPATH): 
    214             cls._createDb() 
    215          
    216     @classmethod   
    217     def _createDb(cls): 
    218         """Create a test SQLite database with SQLAlchemy for use with unit  
    219         tests 
    220         """ 
    221         log.debug("Creating database for %r ..." % cls.__name__) 
    222          
    223         if not sqlAlchemyInstalled: 
    224             raise NotImplementedError("SQLAlchemy must be installed in order " 
    225                                       "for this method to be implemented") 
    226              
    227         db = create_engine(cls.DB_CONNECTION_STR) 
    228          
    229         metadata = MetaData() 
    230         usersTable = Table('users', metadata, 
    231                            Column('id', Integer, primary_key=True), 
    232                            Column('username', String), 
    233                            Column('md5password', String), 
    234                            Column('openid', String), 
    235                            Column('openid_identifier', String), 
    236                            Column('firstname', String), 
    237                            Column('lastname', String), 
    238                            Column('emailaddress', String)) 
    239          
    240         attributesTable = Table('attributes', metadata, 
    241                                 Column('id', Integer, primary_key=True), 
    242                                 Column('openid', String), 
    243                                 Column('attributename', String)) 
    244         metadata.create_all(db) 
    245          
    246         class User(declarative_base()): 
    247             __tablename__ = 'users' 
    248          
    249             id = Column(Integer, primary_key=True) 
    250             username = Column('username', String(40)) 
    251             md5password = Column('md5password', String(64)) 
    252             openid = Column('openid', String(128)) 
    253             openid_identifier = Column('openid_identifier', String(40)) 
    254             firstname = Column('firstname', String(40)) 
    255             lastname = Column('lastname', String(40)) 
    256             emailAddress = Column('emailaddress', String(40)) 
    257          
    258             def __init__(self, username, md5password, openid, openid_identifier,  
    259                          firstname, lastname, emailaddress): 
    260                 self.username = username 
    261                 self.md5password = md5password 
    262                 self.openid = openid 
    263                 self.openid_identifier = openid_identifier 
    264                 self.firstname = firstname 
    265                 self.lastname = lastname 
    266                 self.emailAddress = emailaddress 
    267          
    268         class Attribute(declarative_base()): 
    269             __tablename__ = 'attributes' 
    270          
    271             id = Column(Integer, primary_key=True) 
    272             openid = Column('openid', String(128)) 
    273             attributename = Column('attributename', String(40)) 
    274          
    275             def __init__(self, openid, attributename): 
    276                 self.openid = openid 
    277                 self.attributename = attributename 
    278  
    279         Session = sessionmaker(bind=db) 
    280         session = Session() 
    281          
    282         attributes = [Attribute(cls.OPENID_URI, attrVal) 
    283                       for attrVal in cls.ATTRIBUTE_VALUES] 
    284         session.add_all(attributes) 
    285             
    286         user = User(cls.USERNAME,  
    287                     cls.MD5_PASSWORD, 
    288                     cls.OPENID_URI, 
    289                     cls.OPENID_IDENTIFIER, 
    290                     cls.FIRSTNAME, 
    291                     cls.LASTNAME, 
    292                     cls.EMAILADDRESS) 
    293          
    294         session.add(user) 
    295         session.commit()  
    296  
    297112 
    298113def _getParentDir(depth=0, path=dirname(__file__)): 
Note: See TracChangeset for help on using the changeset viewer.