Ignore:
Timestamp:
17/06/12 15:40:15 (7 years ago)
Author:
mnagni
Message:

Major refactoring about migration (now handles better create/update, even if the single updates have to be quite fully implemented)
Added the connection pool from SQLAlchemy

File:
1 edited

Legend:

Unmodified
Added
Removed
  • mauRepo/MolesManager/trunk/src/libs/migration/processor/dataEntity.py

    r8351 r8358  
    11''' 
     2BSD Licence 
     3Copyright (c) 2012, Science & Technology Facilities Council (STFC) 
     4All rights reserved. 
     5 
     6Redistribution and use in source and binary forms, with or without modification,  
     7are permitted provided that the following conditions are met: 
     8 
     9    * Redistributions of source code must retain the above copyright notice,  
     10        this list of conditions and the following disclaimer. 
     11    * Redistributions in binary form must reproduce the above copyright notice, 
     12        this list of conditions and the following disclaimer in the documentation 
     13        and/or other materials provided with the distribution. 
     14    * Neither the name of the Science & Technology Facilities Council (STFC)  
     15        nor the names of its contributors may be used to endorse or promote  
     16        products derived from this software without specific prior written permission. 
     17 
     18THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"  
     19AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,  
     20THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR  
     21PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS 
     22BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,  
     23OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF  
     24SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 
     25HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 
     26OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE  
     27OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
     28 
    229Created on 15 Nov 2011 
    330 
    4 @author: mnagni 
     31@author: Maurizio Nagni 
    532''' 
    6 from MolesManager.moles3epb import Moles3EPB 
    733from ea_model.ceda_metadatamodel.ceda_observationcollection.ceda_observationcollection import \ 
    834    CEDA_ObservationCollection 
    9 from libs.migration.MigrationEPB import MigrationEPB 
    1035from libs.migration.exception.exceptions import MigrationObjectException, NoAssociatedAuthor,\ 
    1136    migrationObjectDescription, NoAssociatedDeployments 
    1237from libs.migration.processor.commons import findDeploymentsInDE,\ 
    1338    createMD_Identifier, extractContent,\ 
    14     hasAtomDocumentSameHash, createCI_Citation, createCI_Date, findPublishedDate,\ 
     39    hasMOSameHash, createCI_Citation, createCI_Date, findPublishedDate,\ 
    1540    isoDateTimeStringToTimeDate, findUpdatedDate, createDate,\ 
    16     calculateHash, extractUpdateFrequency, findDOIInMigrationDocument,\ 
    17     getAtomDocumentHash 
     41    calculateHash, findDOIInMigrationDocument,\ 
     42    hasMOBeenProcessed, getAtomDocumentHashByMO 
    1843from libs.migration.processor.deployment import DeploymentProcessor 
    1944from ea_model.moles3_4.utilities.mo_publicationstatevalue import MO_PublicationStateValue 
     
    2247from ea_model.iso_19115_2006_metadata_corrigendum.citation_and_responsible_party_information.ci_datetypecode import CI_DateTypeCode 
    2348from MolesManager.ceda_guid import CedaGUID 
    24 from ea_model.iso_19115_2006_metadata_corrigendum.maintenance_information.md_maintenancefrequencycode import MD_MaintenanceFrequencyCode 
    25 from libs.migration.processor.EPBRepo import EPBRepo 
    2649 
    2750class DataEntityProcessor(object): 
     
    3255        Processes a DataEntityMigration item. Note that each DataEntity is associated to a "dataent_xxxx" file in Moles2 
    3356    '''         
    34     def __init__(self, dataEntityMigration): 
     57    def __init__(self, dataEntityMigration, epbRepo): 
    3558        ''' 
    3659            Initializes the class 
    3760            @param _dataEntityMigration: the DataEntityMigration instance 
     61            @param epbRepo: an instance of EPBRepo 
    3862        ''' 
    39         if dataEntityMigration is None: 
    40             raise MigrationObjectException("DataEntityProcessor cannot process an None item") 
    41         self._dataEntityMigration = dataEntityMigration        
     63        #if dataEntityMigration is None: 
     64        #    raise MigrationObjectException("DataEntityProcessor cannot process an None item") 
     65        self._dataEntityMigration = dataEntityMigration         
     66        self._dataEntityHasSameHash = hasMOSameHash(self._dataEntityMigration)        
     67        self.epbRepo = epbRepo 
    4268     
    4369    def _processCitation(self, ceda_observationCollection): 
    44         # TDB - Check that if is an update or not! 
    4570        contentDict = extractContent(self._dataEntityMigration) 
    4671        if not contentDict.has_key('citation'): 
     
    6287            newIdentifier = createMD_Identifier(code = contentDict['citation'], authority=i_citation) 
    6388            ceda_observationCollection.identifier.append(newIdentifier) 
    64      
    65     def _getObservationCollection(self): 
    66         return EPBRepo.moles3EPB.search(CEDA_ObservationCollection, self._dataEntityMigration.ceda_observation_coll_id) 
    67      
    68     def _processObservationCollection(self):                                    
    69         #Moles3 object exists...  
    70         if self._dataEntityMigration.ceda_observation_coll_id:  
    71             # ...and the data entity document has not changed 
    72             if hasAtomDocumentSameHash(self._dataEntityMigration): 
    73                 return self._getObservationCollection() 
    74             else: 
    75                 self.updateObservationCollection() 
    7689 
    77         #... does not exist so create it 
    78         return self.createNewObservationCollection() 
    79  
    80     def updateObservationCollection(self): 
    81         """ 
    82             Updated the existing CEDA_ObservationCollection instance binded to the self._dataEntityMigration object. 
    83             @return: the persisted and updated CEDA_ObservationCollection element  
    84         """         
    85         ceda_observationCollection = self._getObservationCollection() 
    86         # TBD 
    87         #self._processCitation(ceda_observationCollection) 
    88         return ceda_observationCollection 
    89  
    90     def _update(self):  
    91         # ...and the data entity document has not changed 
    92         if hasAtomDocumentSameHash(self._dataEntityMigration): 
    93                 return self._getObservationCollection() 
    94         else: 
    95                 return self.updateObservationCollection()  
    96  
    97     def _create(self):  
     90    def _execute(self, ceda_observationCollection):  
    9891        """ 
    9992            Creates a new CEDA_ObservationCollection instance in the Moles3DB using the self._dataEntityMigration object. 
     
    10194            @return: the persisted CEDA_ObservationCollection element  
    10295        """ 
    103         ceda_observationCollection = CEDA_ObservationCollection() 
    104         self._processCitation(ceda_observationCollection) 
    105         #self._processResult(ceda_observationCollection) 
    106         ceda_observationCollection.publicationState = MO_PublicationStateValue.cl_working 
    107                    
    108         EPBRepo.moles3EPB.persistInstance(ceda_observationCollection)         
    109         EPBRepo.migrationEPB.updateMigrationObject(self._dataEntityMigration, {'ceda_observation_coll_id': ceda_observationCollection.id}) 
     96        if not self._dataEntityHasSameHash: 
     97            self._processCitation(ceda_observationCollection) 
     98            #self._processResult(ceda_observationCollection) 
     99         
     100        #Is a first time process? 
     101        if not hasMOBeenProcessed(self._dataEntityMigration): 
     102            docHash = getAtomDocumentHashByMO(self._dataEntityMigration) 
     103            ceda_observationCollection.publicationState = MO_PublicationStateValue.cl_working 
     104            self.epbRepo.moles3EPB.persistInstance(ceda_observationCollection) 
     105            self.epbRepo.migrationEPB.updateMigrationObject(self._dataEntityMigration, \ 
     106                {'ceda_observation_coll_id': ceda_observationCollection.id, \ 
     107                 'doc_hash': docHash}) 
    110108 
     109        #Has to updated the hash? 
     110        if not self._dataEntityHasSameHash and hasMOBeenProcessed(self._dataEntityMigration): 
     111            docHash = getAtomDocumentHashByMO(self._dataEntityMigration) 
     112            self.epbRepo.migrationEPB.updateMigrationObject(self._dataEntityMigration, \ 
     113                {'doc_hash': docHash}) 
    111114         
    112         #Adds the CedaGUID 
    113         ceda_guid = CedaGUID() 
    114         ceda_guid.id = calculateHash(self._dataEntityMigration.data_ent_id) 
    115         ceda_guid.ceda_observationcollection = ceda_observationCollection.id 
    116         EPBRepo.moles3EPB.persistInstance(ceda_guid) 
    117         DataEntityProcessor.log.info("GUID for this ObservationCollection: %s" % (ceda_guid.id)) 
    118          
    119         return ceda_observationCollection 
    120  
    121     def _processResultAccumulation(self, ceda_observation):         
    122         ceda_observation = EPBRepo.moles3EPB.loadAttributes(ceda_observation, "resultAccumulation")         
    123         if ceda_observation.resultAccumulation is None: 
    124             updateFrequency = extractUpdateFrequency(self._dataEntityMigration) 
    125             if updateFrequency: 
    126                 resultAccumulation = MD_MaintenanceFrequencyCode.from_string(updateFrequency) 
    127                 EPBRepo.moles3EPB.updateCedaObject(ceda_observation, {'resultAccumulation': resultAccumulation}) 
     115        #Has a proper CEDAGUID? 
     116        if self.epbRepo.moles3EPB.retrieveGUIDFromInstance(ceda_observationCollection) is None: 
     117            #Adds the CedaGUID 
     118            ceda_guid = CedaGUID() 
     119            ceda_guid.id = calculateHash(self._dataEntityMigration.data_ent_id) 
     120            setattr(ceda_guid, 'ceda_observationcollection', ceda_observationCollection.id) 
     121            self.epbRepo.moles3EPB.persistInstance(ceda_guid) 
     122            DataEntityProcessor.log.info("GUID for this ObservationCollection: %s" % (ceda_guid.id)) 
    128123 
    129124    def _processDOI(self, deploymentMigration, ceda_observation, deProcessor, single_deployment):         
     
    134129                    #collection_identifier = Moles3EPB.extractCollectionIdentifierByTitle(MD_CODE_MOLES2_CITATION, self.migrationSessions.molesSession) 
    135130                    #if collection_identifier.count()==1: 
    136                     #    ceda_observation.identifier.append(collection_identifier.first()) 
    137                  
     131                    #    ceda_observation.identifier.append(collection_identifier.first())                 
    138132        deProcessor.assignDOI(ceda_observation, doi) 
    139133 
    140134    def _processDeploymentMigration(self, deploymentMigration, single_deployment):                                       
    141         deProcessor = DeploymentProcessor(self._dataEntityMigration, deploymentMigration) 
     135        deProcessor = DeploymentProcessor(self._dataEntityMigration, deploymentMigration, self.epbRepo) 
    142136        try: 
    143137            DataEntityProcessor.log.info("Processing deployment: %s" % (migrationObjectDescription(deploymentMigration))) 
    144             ceda_observation = deProcessor.process() 
    145             try:             
    146                 self._processResultAccumulation(ceda_observation) 
    147             except Exception as ex: 
    148                 pass     
     138            ceda_observation = deProcessor.process()     
    149139            try:                             
    150140                self._processDOI(deploymentMigration, ceda_observation, deProcessor, single_deployment) 
    151141            except Exception as ex: 
    152                 pass                 
    153  
    154                 if not hasAtomDocumentSameHash(deploymentMigration): 
    155                     doc_hash = getAtomDocumentHash(deploymentMigration.docStatus, deploymentMigration.docType, deploymentMigration.docOwner, deploymentMigration.docName) 
    156                     EPBRepo.migrationEPB.updateMigrationObject(deploymentMigration, {'doc_hash': doc_hash})                 
     142                pass                                 
    157143        except NoAssociatedAuthor as ex: 
    158144            raise ex                  
     
    165151     
    166152    def process(self): 
    167         cedaObservationCollection = None 
     153        obsColl = None 
    168154        exs = [] 
    169155        DataEntityProcessor.log.info("Processing dataEntity: %s" % (migrationObjectDescription(self._dataEntityMigration))) 
    170156        try : 
    171157            if self._dataEntityMigration.ceda_observation_coll_id: 
    172                 cedaObservationCollection = self._update() 
     158                obsColl = self.epbRepo.moles3EPB.search(CEDA_ObservationCollection, self._dataEntityMigration.ceda_observation_coll_id)  
    173159            else: 
    174                 cedaObservationCollection = self._create() 
     160                obsColl = CEDA_ObservationCollection() 
     161            self._execute(obsColl)                  
    175162        except Exception as ex: 
    176163            exs.append(ex) 
    177164            return exs         
    178  
    179165         
    180166        #retrieves the associated deployment links from the data_entity 
    181         deploymentsLinks = findDeploymentsInDE(self._dataEntityMigration) 
    182          
     167        deploymentsLinks = findDeploymentsInDE(self._dataEntityMigration)         
    183168        #retrieves the DataEntityMigration sorted by creation date 
    184         deploymentMigrations = EPBRepo.migrationEPB.getAllDeploymentsMigrationByDataEntitySortedByDate( \ 
     169        deploymentMigrations = self.epbRepo.migrationEPB.getAllDeploymentsMigrationByDataEntitySortedByDate( \ 
    185170                                                self._dataEntityMigration, deploymentsLinks) 
    186171        howManydm = 0 
     
    192177            try: 
    193178                ceda_observation = self._processDeploymentMigration(deploymentMigration, howManydm == 1) 
    194                 EPBRepo.moles3EPB.updateCedaObject(cedaObservationCollection, {'member': ceda_observation}) 
     179                #Is a first time process? 
     180                if not hasMOBeenProcessed(deploymentMigration): 
     181                    self.epbRepo.moles3EPB.updateCedaObject(obsColl, {'member': ceda_observation}) 
    195182                                       
    196183            except Exception as ex: 
    197                 exs.append(ex) 
    198             except RuntimeError as er: 
    199                 print er                 
    200         #self.migrationSessions.molesSession.commit()                         
    201                  
     184                exs.append(ex)   
    202185        return exs 
Note: See TracChangeset for help on using the changeset viewer.