Changeset 8077


Ignore:
Timestamp:
27/01/12 17:33:18 (8 years ago)
Author:
mnagni
Message:

Refactored the migration process
Now its a periodic task managed by libs.migration.MigrationThread? and is started by the molesSessionMiddleware
Refactored the database connections. Now all rae subclasses of libs.EPB

Location:
mauRepo/MolesManager/trunk/src
Files:
2 added
12 edited
1 moved

Legend:

Unmodified
Added
Removed
  • mauRepo/MolesManager/trunk/src/MolesManager/dataaccess.py

    r8054 r8077  
    55''' 
    66from ea_model.iso_19103_2005_schema_language.basic_types.primitive.date_and_time.date import Date 
    7 from ea_model.ceda_moles.ceda_observation.ceda_observation import CEDA_Observation 
    8 from sqlalchemy.orm.util import class_mapper 
     7from ea_model.ceda_metadatamodel.ceda_observation.ceda_observation import CEDA_Observation 
    98 
    109def _buildFilter(key, keyValue): 
  • mauRepo/MolesManager/trunk/src/MolesManager/forms/date.py

    r7976 r8077  
    3434        for attrName in self._clazz.__dict__: 
    3535            if not attrName.startswith('_'): 
     36                if attrName == 'dataLineage': 
     37                    pass #print 'ciao' 
    3638                attrValue = getattr(self._clazz, attrName) 
    3739                if isinstance(attrValue, InstrumentedAttribute): 
  • mauRepo/MolesManager/trunk/src/MolesManager/molesSessionMiddleware.py

    r8054 r8077  
    44@author: mnagni 
    55''' 
    6 from libs.migration.pjb import PJB 
     6from MolesManager.moles3epb import Moles3EPB 
     7from libs.migration.client import MigrationThread 
     8from MolesManager.settings import RUN_MIGRATION 
    79 
    810class MolesSessionMiddleware(object): 
     11     
     12    #This attribute should be not here. 
     13    #unfortunately I cannot find any start/stop signals from Django 
     14    _migrationThread = MigrationThread(interval = 86400) 
     15     
     16    def _migration(self, runMigration = RUN_MIGRATION): 
     17        if runMigration and not MolesSessionMiddleware._migrationThread.isAlive(): 
     18            #t.setDaemon(False) 
     19            MolesSessionMiddleware._migrationThread.start() 
     20             
    921    """ 
    10         Represents the access to the Moles database 
     22        Represents the access to the Moles database. 
     23        Creates from the existing db connections pools a new session 
     24        to be used by all the DB operations involved in the actual HTTPRequest 
    1125    """   
    1226     
    13     def process_request(self, request): 
    14         request.moles_session = PJB.getNewMolesSession() 
     27    def process_request(self, request):         
     28        self._migration() #see the note on  MolesSessionMiddleware._migration 
     29         
     30        request.moles_session = Moles3EPB.getNewMolesSession() 
    1531        ''' 
    1632        request.db_sessions = {} 
     
    4056            return 
    4157        session.rollback() 
    42         session.close() 
    43  
    44     @classmethod 
    45     def search(self, request, clazz, inst_id): 
    46         return PJB.search(clazz, inst_id, request.moles_session) 
    47          
     58        session.close()         
  • mauRepo/MolesManager/trunk/src/MolesManager/settings.py

    r8074 r8077  
    162162#moles3 database connection 
    163163MOLES3 = 'moles3' 
    164 #MOLES3_DB_CONNECTION = 'postgresql://user:pws@host:port/dbName' 
    165 MOLES3_DB_CONNECTION = 'PG_MOLES3_DB_CONNECTION' 
     164MOLES3_DB_CONNECTION = 'postgresql://user:pws@host:port/dbName' 
     165 
     166#MOLES3_DB_CONNECTION = 'PG_MOLES3_DB_CONNECTION' 
    166167MOLES3_DB_SCRIPT = SRC + '/sqlTables.py' 
    167168 
     
    169170#moles2 to moles3 migration database connection 
    170171MIGRATION = 'migration' 
    171 #MIGRATION_DB_CONNECTION = 'postgresql://user:pws@host:port/dbName' 
    172 MIGRATION_DB_CONNECTION = 'PG_MIGRATION_DB_CONNECTION' 
     172MIGRATION_DB_CONNECTION = 'postgresql://user:pws@host:port/dbName' 
     173 
     174#MIGRATION_DB_CONNECTION = 'PG_MIGRATION_DB_CONNECTION' 
    173175MIGRATION_DB_SCRIPT = SRC + '/libs/migration/db/migrationTables.py' 
    174   
     176 
     177RUN_MIGRATION = True 
  • mauRepo/MolesManager/trunk/src/MolesManager/views/cedaObservationView.py

    r8054 r8077  
    99from MolesManager.forms.commons import ObjectById 
    1010from ea_model.ceda_metadatamodel.ceda_observation.ceda_observation import CEDA_Observation 
    11 from MolesManager.molesSessionMiddleware import MolesSessionMiddleware 
     11from MolesManager.moles3epb import Moles3EPB 
    1212 
    1313 
     
    3030    if objectId.is_valid(): 
    3131        id = objectId.cleaned_data['obj_id'] 
    32         dp = MolesSessionMiddleware.search(request, CEDA_Observation, objectId.cleaned_data['obj_id']) 
     32        dp = Moles3EPB.search(CEDA_Observation, objectId.cleaned_data['obj_id'], request.moles_session) 
    3333        return createForm(dp) 
    3434    return None 
  • mauRepo/MolesManager/trunk/src/libs/commons_db.py

    r8054 r8077  
    1919 
    2020class DbManager(object): 
     21     
     22    _metadata = MetaData() 
    2123    """ 
    2224        Represents a database instance managed by an SQLAlchemy engine.         
     
    3032        self._script = script 
    3133        self._engine = create_engine(self._connection, echo=True) 
    32         if (self._script): 
    33             self._metadata = MetaData()  
    34             execfile(self._script, {'metadata': self._metadata, 'engine': self._engine}) 
     34        if (self._script):  
     35            execfile(self._script, {'metadata': DbManager._metadata, 'engine': self._engine}) 
     36            DbManager._metadata.create_all(bind=self._engine) 
    3537        self._session = scoped_session(sessionmaker(bind=self._engine))     
    3638  
  • mauRepo/MolesManager/trunk/src/libs/migration/MigrationEPB.py

    r8054 r8077  
    44@author: mnagni 
    55''' 
    6 from MolesManager.settings import MOLES3_DB_CONNECTION, MOLES3_DB_SCRIPT,\ 
    7     MIGRATION_DB_SCRIPT, MIGRATION_DB_CONNECTION, MIGRATION, MOLES3 
     6from MolesManager.settings import MIGRATION_DB_SCRIPT, MIGRATION_DB_CONNECTION 
    87from libs.migration.db.classes import ObservationMigration,\ 
    98    ObservationCollectionMigration 
    109from sqlalchemy.sql.expression import asc 
    11 from libs.commons_db import DbManager, DbManagerCollection 
     10from libs.commons_db import DbManager 
     11from libs.epb import EPB 
    1212 
    13 molesDB = DbManager(MOLES3_DB_CONNECTION, MOLES3_DB_SCRIPT) 
    14 migrationDB = DbManager(MIGRATION_DB_CONNECTION, MIGRATION_DB_SCRIPT) 
    15 dbManagers = {MOLES3: molesDB, MIGRATION: migrationDB}   
    16 dbManagerCollection = DbManagerCollection(dbManagers) 
    1713 
    18 class PJB(object): 
    19  
    20     @classmethod 
    21     def buildFilter(self, key, keyValue): 
    22         return '%s = \'%s\'' % (key, keyValue) 
    23  
    24     @classmethod 
    25     def search(self, clazz, inst_id, session):           
    26         res = session.query(clazz).get(inst_id) 
    27         if res is None: 
    28             return None 
    29         return res 
     14class MigrationEPB(EPB): 
     15     
     16    _migrationDB = DbManager(MIGRATION_DB_CONNECTION, MIGRATION_DB_SCRIPT) 
    3017 
    3118    @classmethod 
    3219    def addObservationCollectionMigration(self, obsCollMigration, session = None): 
    33         intSession = PJB._getSession(session, MIGRATION) 
     20        intSession = MigrationEPB._getSession(session) 
    3421        intSession.add(obsCollMigration) 
    35         PJB._closeSession(session) 
     22        EPB._closeSession(session) 
    3623 
    3724    @classmethod         
    3825    def addObservationMigration(self, obsMigration, session = None): 
    39         intSession = PJB._getSession(session, MIGRATION) 
     26        intSession = MigrationEPB._getSession(session) 
    4027        intSession.add(obsMigration) 
    41         PJB._closeSession(session)         
     28        EPB._closeSession(session)         
    4229 
    4330    @classmethod         
    4431    def getObservationCollectionMigrationOrderByDate(self, session = None): 
    45         intSession = PJB._getSession(session, MIGRATION)    
    46         data_ents = PJB.getAllObjects(ObservationCollectionMigration, intSession) 
     32        intSession = MigrationEPB._getSession(session)    
     33        data_ents = EPB.getAllObjects(ObservationCollectionMigration, intSession) 
    4734        return data_ents.order_by(asc("data_ent_creation"))      
    4835 
    4936    @classmethod 
    5037    def getObservationMigrationByName(self, deplName, session = None): 
    51         intSession = PJB._getSession(session, MIGRATION) 
    52         return intSession.query(ObservationMigration).filter(*[PJB.buildFilter('depl_name', deplName)]).first() 
    53  
    54     @classmethod         
    55     def addCEDA_Observation(self, ceda_observation, session = None): 
    56         intSession = PJB._getSession(session, MOLES3) 
    57         intSession.add(ceda_observation) 
    58         PJB._closeSession(session)  
     38        intSession = MigrationEPB._getSession(session) 
     39        return intSession.query(ObservationMigration).filter(*[EPB.buildFilter('depl_name', deplName)]).first() 
    5940 
    6041    @classmethod 
    6142    def getNewMigrationSession(self): 
    62         return dbManagerCollection.createSession(MIGRATION) 
    63          
    64     @classmethod         
    65     def getNewMolesSession(self): 
    66             return dbManagerCollection.createSession(MOLES3) 
     43        return MigrationEPB._getSession() 
    6744 
    6845    @classmethod 
    69     def getAllObjects(self, clazz, session):      
    70         res = session.query(clazz) 
    71         if res is None: 
    72             return None 
    73         return res 
    74  
    75     @classmethod 
    76     def _getSession(self, session = None, sessionName = ""): 
    77         if session: 
    78             return session         
    79         return dbManagerCollection.createSession(sessionName) 
    80  
    81     @classmethod 
    82     def _closeSession(self, session): 
    83         if session is not None: 
    84             return 
    85         session.commit() 
    86         session.close() 
     46    def _getSession(self, session = None): 
     47        return EPB._getSession(MigrationEPB._migrationDB, session) 
  • mauRepo/MolesManager/trunk/src/libs/migration/client.py

    r8074 r8077  
    55''' 
    66from libs.migration.processor.loadResources import LoadResources 
    7 from libs.migration.pjb import PJB 
    87from libs.migration.processor.migrationProcess import MigrationProcess 
    98 
    10 ''' 
    11 exist/rest/atoms/published/data_entities/neodc.nerc.ac.uk 
    12 ''' 
     9import threading 
     10import time 
     11from threading import Timer 
    1312 
    1413 
    15              
    16          
     14class MigrationThread(threading.Thread): 
     15    """ 
     16        Constructs a scheduler for the Moles2 to Moles3 migration. 
     17        @param interval: define the minimal time, in seconds, between an execution and the successive.  
     18        If the current migration run takes more than the defined interval the next will starts 
     19        as soon the previous ends.          
     20    """ 
     21    def __init__(self, interval=600): 
     22        threading.Thread.__init__(self) 
     23        self._doRun = True    
     24        self.interval = interval 
     25     
     26    def stop(self): 
     27        self._doRun = False 
     28     
     29    def run(self): 
     30        while self._doRun: 
     31            #self._migrate() 
     32            #s = sched.scheduler(time.time, time.sleep) 
     33            startTime = (int)(time.time()) 
     34            timer = Timer(5, self._migrate, ()) 
     35            timer.start() 
     36            timer.join() 
     37            diffTime = startTime + self.interval - (int)(time.time())  
     38            if diffTime > 0: 
     39                time.sleep(diffTime) 
     40     
     41    def _printTime(self): 
     42        print "From print_time", time.time()     
     43 
     44    def _migrate(self): 
     45        #mEPB = MigrationEPB() 
     46        lr = LoadResources() 
     47        lr.process() 
     48 
     49        mp = MigrationProcess() 
     50        mp.process() 
     51     
     52#t = MigrationThread(interval = 1000) 
     53#t.setDaemon(False) 
     54#t.start() 
     55#print "ciao" 
     56#time.sleep(600) 
     57#t.stop() 
     58 
     59 
    1760 
    1861 
    1962 
    2063''' 
    21 So it works: 
    22 Loads the eXist resources 
    23 Loops over a docStatus 
    24 Loops over the data_entities 
    25 For each data_entity loop over its deployments 
     64 
    2665''' 
    2766 
    28  
    29  
    30 ''' 
    31 ceda_observation = CEDA_Observation() 
    32 ceda_observation.dataLineage = 'mauData' 
    33 ceda_observation.relatedParty = [] 
    34 rp = MO_ResponsiblePartyInfo() 
    35 rp.role = MO_RoleValue.cl_author 
    36 rp.party = [] 
    37 party = CI_Party() 
    38 party.name = 'Mau' 
    39 rp.party.append(party) 
    40 ceda_observation.relatedParty.append(rp) 
    41 doInsertOrUpdate([ceda_observation], session, update = False) 
    42 session.commit() 
    43  
    44  
    45 res = session.query(CEDA_Observation).filter(*[_buildFilter('ceda_observation_id', 2)]) 
    46 rp = res[0].relatedParty[0] 
    47 print rp 
    48 ''' 
    49  
    50 pjb = PJB() 
    51  
    52 lr = LoadResources(pjb) 
    53 lr.process() 
    54  
    55  
    56 mp = MigrationProcess(pjb) 
    57 mp.process() 
    58  
    59  
  • mauRepo/MolesManager/trunk/src/libs/migration/processor/dataEntity.py

    r8063 r8077  
    99from libs.migration.processor.commons import getResource, findDeploymentsInDE,\ 
    1010    getResourceRefs, getXMLDocument 
    11 from libs.migration.pjb import PJB 
     11from libs.migration.MigrationEPB import MigrationEPB 
    1212 
    1313class DataEntityProcessor(object): 
     
    9292        for deplName in self._depls: 
    9393            #retrieve the deployment's path 
    94             deplObj = PJB.getObservationMigrationByName(deplName + '.atom', self._migrationSession)             
     94            deplObj = MigrationEPB.getObservationMigrationByName(deplName + '.atom', self._migrationSession)             
    9595            if deplObj is None: 
    9696                print "cannot find deployment %s for resource %s" % (deplName, self._de_path) 
  • mauRepo/MolesManager/trunk/src/libs/migration/processor/deployment.py

    r8074 r8077  
    1010from libs.migration.exception.exceptions import NoDataLineage 
    1111from ea_model.ceda_metadatamodel.ceda_observation.ceda_observation import CEDA_Observation 
    12 from libs.migration.pjb import PJB 
    1312import re 
     13from MolesManager.moles3epb import Moles3EPB 
     14from libs.migration.MigrationEPB import MigrationEPB 
     15from libs.epb import EPB 
    1416 
    1517class DeploymentProcessor(object): 
     
    183185        #Exists already an associated CEDA_Observation associated to this deployment? 
    184186        if self._deplObj is not None and self._deplObj.obs_id is not None: 
    185             self._ceda_observation = PJB.search(CEDA_Observation, self._deplObj.obs_id, self._migrationSession) 
     187            self._ceda_observation = EPB.search(CEDA_Observation, self._deplObj.obs_id, self._migrationSession) 
    186188 
    187189        #If does not exist creates a new one             
     
    195197         
    196198        if add: 
    197             PJB.addCEDA_Observation(self._ceda_observation, self._molesSession) 
     199            Moles3EPB.addCEDA_Observation(self._ceda_observation, self._molesSession) 
    198200        self._molesSession.commit() 
    199201        self._deplObj.obs_id = self._ceda_observation.id 
    200202         
    201203        if add: 
    202             PJB.addObservationMigration(self._deplObj, self._migrationSession) 
     204            MigrationEPB.addObservationMigration(self._deplObj, self._migrationSession) 
    203205        return self._ceda_observation 
    204206 
  • mauRepo/MolesManager/trunk/src/libs/migration/processor/loadResources.py

    r8054 r8077  
    99from libs.migration.exception.exceptions import NoCreationDate 
    1010from libs.migration.db.classes import ObservationMigration,\ 
    11     ObservationCollectionMigration 
    12 from libs.migration.pjb import PJB     
     11    ObservationCollectionMigration     
     12from libs.migration.MigrationEPB import MigrationEPB 
    1313 
    1414class LoadResources(object): 
    1515     
    16     def __init__(self, pjb): 
    17         self._migrationSession = PJB.getNewMigrationSession() 
    18         self._molesSession = PJB.getNewMolesSession() 
     16    def __init__(self): 
     17        self._migrationSession = MigrationEPB.getNewMigrationSession() 
    1918             
    2019    def _loadDataEntities(self, base): 
     
    5857 
    5958            add = False 
    60             coll = PJB.search(ObservationMigration, deID, self._migrationSession) 
     59            coll = MigrationEPB.search(ObservationMigration, deID, self._migrationSession) 
    6160            if coll is None:                 
    6261                coll = ObservationMigration() 
     
    7069             
    7170            if add: 
    72                 PJB.addObservationMigration(coll, self._migrationSession) 
     71                MigrationEPB.addObservationMigration(coll, self._migrationSession) 
    7372                 
    7473            self._migrationSession.commit() 
     
    9695                       
    9796            add = False 
    98             coll = PJB.search(ObservationCollectionMigration, deID, self._migrationSession) 
     97            coll = MigrationEPB.search(ObservationCollectionMigration, deID, self._migrationSession) 
    9998            if coll is None:                 
    10099                coll = ObservationCollectionMigration() 
     
    106105            coll.exist_path = baseOwner 
    107106            if add: 
    108                 PJB.addObservationCollectionMigration(coll, self._migrationSession) 
     107                MigrationEPB.addObservationCollectionMigration(coll, self._migrationSession) 
    109108            self._migrationSession.commit() 
    110109             
     
    117116            self._loadDataEntities(baseStatus)             
    118117            self._loadDeployments(baseStatus) 
    119         self._migrationSession.close() 
    120         self._molesSession.close()                              
     118        self._migrationSession.close()                              
  • mauRepo/MolesManager/trunk/src/libs/migration/processor/migrationProcess.py

    r8054 r8077  
    55''' 
    66from libs.migration.processor.dataEntity import DataEntityProcessor 
    7 from libs.migration.pjb import PJB 
     7from libs.migration.MigrationEPB import MigrationEPB 
     8from MolesManager.moles3epb import Moles3EPB 
    89 
    910class MigrationProcess(): 
    1011     
    11     def __init__(self, pjb): 
    12         self._pjb = pjb 
    13         self._migrationSession = PJB.getNewMigrationSession() 
    14         self._molesSession = PJB.getNewMolesSession() 
     12    def __init__(self): 
     13        self._migrationSession = MigrationEPB.getNewMigrationSession() 
     14        self._molesSession = Moles3EPB.getNewMolesSession() 
    1515     
    1616    def process(self):     
    17         sorted_data_ents = PJB.getObservationCollectionMigrationOrderByDate(session = self._migrationSession) 
     17        sorted_data_ents = MigrationEPB.getObservationCollectionMigrationOrderByDate(session = self._migrationSession) 
    1818        exs = [] 
    1919        #loops over the sorted data entities 
     
    2222            dep = DataEntityProcessor(de_path, self) 
    2323            exs.append(dep.process()) 
    24             self.commitAll() 
     24            #self.commitAll() 
    2525        print exs  
    2626         
  • mauRepo/MolesManager/trunk/src/sqlTables.py

    r8074 r8077  
    417417from ea_model.iso_19109_2005_application_schema.general_feature_model.gf_associationrole import  GF_AssociationRole 
    418418 
    419 clear_mappers() 
    420 metadata = MetaData() 
    421419def next_id(connection, seq_name): 
    422420    seq = Sequence(seq_name) 
     
    95969594class_mapper(TP_DirectedSolid).add_properties({'topo': relationship(TP_Solid, uselist=False, backref='tp_directedsolid_topo', primaryjoin=tp_solid_table.c.tp_directedsolid_topo_id==tp_directedsolid_table.c.id),'hub': relationship(TP_Face, secondary=tp_face_tp_directedsolid_table),'_tp_directedtopo': relationship(TP_DirectedTopo, uselist=False, backref='tp_directedsolid__tp_directedtopo', primaryjoin=tp_directedtopo_table.c.tp_directedsolid__tp_directedtopo_id==tp_directedsolid_table.c.id)}) 
    95979595 
    9598 metadata.create_all(engine) 
    9599  
    9600  
     9596 
Note: See TracChangeset for help on using the changeset viewer.