source: mauRepo/MolesManager/trunk/src/libs/migration/processor/dataEntity.py @ 8460

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/mauRepo/MolesManager/trunk/src/libs/migration/processor/dataEntity.py@8460
Revision 8460, 14.5 KB checked in by mnagni, 7 years ago (diff)

Complete - # 22489: CEDA Observation Collection - phenomenonTime
 http://team.ceda.ac.uk/trac/ceda/ticket/22489
Complete - # 22518: The description is broken
 http://team.ceda.ac.uk/trac/ceda/ticket/22518
Complete - # 22488: CEDA Observation Collection - Geographical Extent
 http://team.ceda.ac.uk/trac/ceda/ticket/22488

Now the Moles3EPB explicitly call the "synchronise" method in the SQLAlchemy mapped classes to assure the persistence of the data
Uses the CedaMolesModel? v 0.1.5 which correct a major problem in synchronise the instances with database

RevLine 
[8014]1'''
[8358]2BSD Licence
3Copyright (c) 2012, Science & Technology Facilities Council (STFC)
4All rights reserved.
5
6Redistribution and use in source and binary forms, with or without modification,
7are permitted provided that the following conditions are met:
8
9    * Redistributions of source code must retain the above copyright notice,
10        this list of conditions and the following disclaimer.
11    * Redistributions in binary form must reproduce the above copyright notice,
12        this list of conditions and the following disclaimer in the documentation
13        and/or other materials provided with the distribution.
14    * Neither the name of the Science & Technology Facilities Council (STFC)
15        nor the names of its contributors may be used to endorse or promote
16        products derived from this software without specific prior written permission.
17
18THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
20THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
22BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
23OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
[8014]29Created on 15 Nov 2011
30
[8358]31@author: Maurizio Nagni
[8014]32'''
[8158]33from libs.migration.exception.exceptions import MigrationObjectException, NoAssociatedAuthor,\
[8433]34    migrationObjectDescription, NoAssociatedDeployments,\
35    NoGeographicalExtensionException, NoCitationException
[8144]36from libs.migration.processor.commons import findDeploymentsInDE,\
[8323]37    createMD_Identifier, extractContent,\
[8358]38    hasMOSameHash, createCI_Citation, createCI_Date, findPublishedDate,\
[8236]39    isoDateTimeStringToTimeDate, findUpdatedDate, createDate,\
[8358]40    calculateHash, findDOIInMigrationDocument,\
[8416]41    hasMOBeenProcessed, getAtomDocumentHashByMO, extractTitle, extractSummary,\
[8445]42    createEX_GeographicBoundingBox, fromDateStringToPhenomenonTime,\
43    comparePhenomenonTimes, compareGeographicBoundingBoxes
[8014]44from libs.migration.processor.deployment import DeploymentProcessor
[8158]45from logging import StreamHandler
46import logging
[8202]47from ea_model.iso_19115_2006_metadata_corrigendum.citation_and_responsible_party_information.ci_datetypecode import CI_DateTypeCode
[8236]48from MolesManager.ceda_guid import CedaGUID
[8408]49from MolesManager.codelist import MM_ObservationPublicationStateValue,\
50    getCLValue
[8424]51from ea_model.ceda_metadatamodel.ceda_observationcollection.ceda_observationcollection import CEDA_ObservationCollection
[8445]52from MolesManager.moles3epb import getUnifyObservationCollectionPhenomenonTime
[8391]53CEDA_TITLE = 'ceda_title'
54
[8014]55class DataEntityProcessor(object):
[8158]56    log = logging.getLogger('DataEntityProcessor')
57    log.addHandler(StreamHandler())
[8269]58    log.setLevel(logging.INFO) 
[8014]59    '''
[8089]60        Processes a DataEntityMigration item. Note that each DataEntity is associated to a "dataent_xxxx" file in Moles2
61    '''       
[8358]62    def __init__(self, dataEntityMigration, epbRepo):
[8014]63        '''
64            Initializes the class
[8089]65            @param _dataEntityMigration: the DataEntityMigration instance
[8358]66            @param epbRepo: an instance of EPBRepo
[8147]67        '''
[8358]68        #if dataEntityMigration is None:
69        #    raise MigrationObjectException("DataEntityProcessor cannot process an None item")
70        self._dataEntityMigration = dataEntityMigration       
[8416]71        self._dataEntityHasSameHash = hasMOSameHash(self._dataEntityMigration)   
72        self._dataEntityHasBeenProcessed = hasMOBeenProcessed(self._dataEntityMigration)   
[8358]73        self.epbRepo = epbRepo
[8433]74        self._report = []
[8392]75
[8416]76    def _assignGeographicExtent(self, ceda_observationCollection):
77        bbox = self.epbRepo.moles3EPB.getUnifyObservationCollectionGEAsBBox(ceda_observationCollection)
[8423]78        if bbox is not None:
[8416]79            upperCornerData, lowerCornerData = bbox[4:len(bbox)-1].split(',')
80            east, north = upperCornerData.split()
81            west, south = lowerCornerData.split()
82            geographicExtent = createEX_GeographicBoundingBox(float(east), float(north), \
83                                                              float(west), float(south))
84           
[8445]85            if len(ceda_observationCollection.geographicExtent) == 0 or \
86                (len(ceda_observationCollection.geographicExtent) > 0 and \
87                    not compareGeographicBoundingBoxes(geographicExtent, \
88                                                  ceda_observationCollection.geographicExtent[0])):
89                self.epbRepo.moles3EPB.updateCedaObject(ceda_observationCollection, \
90                                                        {'geographicExtent': geographicExtent})                       
[8416]91        else:
[8433]92            self._report.append(NoGeographicalExtensionException(self._dataEntityMigration))         
[8416]93       
[8445]94    def _assignPhenomenonTime(self, ceda_observationCollection):
95        start, end = getUnifyObservationCollectionPhenomenonTime(ceda_observationCollection)
96        dateString = start
97        if end is not None:
98            dateString = '%s/%s' % (start, end)
99        pt = fromDateStringToPhenomenonTime(dateString)
100       
[8460]101        if pt is not None and ceda_observationCollection.phenomenonTime is not None \
[8445]102            and (len(ceda_observationCollection.phenomenonTime) == 0 \
103                 or (len(ceda_observationCollection.phenomenonTime) == 1 and \
104                     not comparePhenomenonTimes(ceda_observationCollection.phenomenonTime[0], pt))):
105            self.epbRepo.moles3EPB.updateCedaObject(ceda_observationCollection, {'phenomenonTime': pt})       
[8409]106
107    def _assignDescription(self, ceda_observationCollection):
108        description = extractSummary(self._dataEntityMigration)
109           
110        if description:
111            ceda_observationCollection.description = description
[8082]112   
[8391]113    def _processTitle(self, ceda_observationCollection):
114        ititle = extractTitle(self._dataEntityMigration)
115        if ceda_observationCollection.identifier:
116            for ident in ceda_observationCollection.identifier:
117                if ident.authority.title == CEDA_TITLE:
118                    if ident.code != ititle:
119                        ident.code = ititle
120                    else:
121                        return           
122 
123        #Else create new
124        i_citation = createCI_Citation(title = CEDA_TITLE)
125        newIdentifier = createMD_Identifier(code = ititle, authority=i_citation)
[8416]126       
127        if self._dataEntityHasBeenProcessed:
[8433]128            self._report.append('The _assignGeographicExtent update is skipped because not implemented')
[8416]129            DataEntityProcessor.log.warn('The _assignGeographicExtent update is skipped because not implemented')
130            return
131           
132        self.epbRepo.moles3EPB.updateCedaObject(ceda_observationCollection, {'identifier': newIdentifier})
[8391]133   
[8144]134    def _processCitation(self, ceda_observationCollection):
[8158]135        contentDict = extractContent(self._dataEntityMigration)
136        if not contentDict.has_key('citation'):
[8433]137            self._report(NoCitationException(self._dataEntityMigration))
[8258]138            DataEntityProcessor.log.info("The migration object "+ migrationObjectDescription(self._dataEntityMigration) \
[8198]139                                         + " has not associated cedacat:citation")
[8416]140            return
[8202]141
[8416]142        ci_dates = []
143        doc_date = findPublishedDate(self._dataEntityMigration)           
144        if doc_date:
145            i_date = createDate(isoDateTimeStringToTimeDate(doc_date))           
146            ci_dates.append(createCI_Date(getCLValue(CI_DateTypeCode.cl_publication), date = i_date))
[8082]147
[8416]148        doc_date = findUpdatedDate(self._dataEntityMigration)           
149        if doc_date:
150            i_date = createDate(isoDateTimeStringToTimeDate(doc_date))               
151            ci_dates.append(createCI_Date(getCLValue(CI_DateTypeCode.cl_revision), date = i_date))
152           
153        i_citation = createCI_Citation(title = 'ceda_moles2_citation', date=ci_dates)
154        newIdentifier = createMD_Identifier(code = contentDict['citation'], authority=i_citation)
155
156        if self._dataEntityHasBeenProcessed:
157            DataEntityProcessor.log.warn('The _assignGeographicExtent update is skipped because not implemented')
158            return
159       
160        ceda_observationCollection = self.epbRepo.moles3EPB.updateCedaObject(ceda_observationCollection, {'identifier': newIdentifier})
161       
162
[8358]163    def _execute(self, ceda_observationCollection): 
[8147]164        """
165            Creates a new CEDA_ObservationCollection instance in the Moles3DB using the self._dataEntityMigration object.
166            If successful adds the new instance ID to the related DataEntityMigration object
167            @return: the persisted CEDA_ObservationCollection element
168        """
[8391]169       
[8358]170        if not self._dataEntityHasSameHash:
[8416]171            self._processTitle(ceda_observationCollection)
172            self._assignDescription(ceda_observationCollection)
173            self._processCitation(ceda_observationCollection)
[8358]174       
175        #Is a first time process?
176        if not hasMOBeenProcessed(self._dataEntityMigration):
177            docHash = getAtomDocumentHashByMO(self._dataEntityMigration)
[8408]178            ceda_observationCollection.publicationState = getCLValue(MM_ObservationPublicationStateValue.cl_working)
[8358]179            self.epbRepo.moles3EPB.persistInstance(ceda_observationCollection)
180            self.epbRepo.migrationEPB.updateMigrationObject(self._dataEntityMigration, \
181                {'ceda_observation_coll_id': ceda_observationCollection.id, \
182                 'doc_hash': docHash})
[8323]183
[8358]184        #Has to updated the hash?
185        if not self._dataEntityHasSameHash and hasMOBeenProcessed(self._dataEntityMigration):
186            docHash = getAtomDocumentHashByMO(self._dataEntityMigration)
187            self.epbRepo.migrationEPB.updateMigrationObject(self._dataEntityMigration, \
188                {'doc_hash': docHash})
[8245]189       
[8358]190        #Has a proper CEDAGUID?
191        if self.epbRepo.moles3EPB.retrieveGUIDFromInstance(ceda_observationCollection) is None:
192            #Adds the CedaGUID
193            ceda_guid = CedaGUID()
194            ceda_guid.id = calculateHash(self._dataEntityMigration.data_ent_id)
195            setattr(ceda_guid, 'ceda_observationcollection', ceda_observationCollection.id)
196            self.epbRepo.moles3EPB.persistInstance(ceda_guid)
197            DataEntityProcessor.log.info("GUID for this ObservationCollection: %s" % (ceda_guid.id))
[8082]198
[8323]199    def _processDOI(self, deploymentMigration, ceda_observation, deProcessor, single_deployment):       
200        doi = findDOIInMigrationDocument(deploymentMigration)                                                             
201        if single_deployment:
202            if doi is None:
203                doi = findDOIInMigrationDocument(self._dataEntityMigration)
204                    #collection_identifier = Moles3EPB.extractCollectionIdentifierByTitle(MD_CODE_MOLES2_CITATION, self.migrationSessions.molesSession)
205                    #if collection_identifier.count()==1:
[8358]206                    #    ceda_observation.identifier.append(collection_identifier.first())               
[8323]207        deProcessor.assignDOI(ceda_observation, doi)
208
[8254]209    def _processDeploymentMigration(self, deploymentMigration, single_deployment):                                     
[8358]210        deProcessor = DeploymentProcessor(self._dataEntityMigration, deploymentMigration, self.epbRepo)
[8089]211        try:
[8266]212            DataEntityProcessor.log.info("Processing deployment: %s" % (migrationObjectDescription(deploymentMigration)))
[8460]213            obs_ex_report, ceda_observation = deProcessor.process()
214            self._report.extend(obs_ex_report)
[8323]215            try:                           
216                self._processDOI(deploymentMigration, ceda_observation, deProcessor, single_deployment)
217            except Exception as ex:
[8358]218                pass                               
[8147]219        except NoAssociatedAuthor as ex:
220            raise ex                 
[8433]221        except Exception as ex:               
[8144]222            raise MigrationObjectException(ex)                     
[8082]223       
[8144]224        return ceda_observation
[8089]225   
226    def process(self):
[8358]227        obsColl = None
[8266]228        DataEntityProcessor.log.info("Processing dataEntity: %s" % (migrationObjectDescription(self._dataEntityMigration)))
[8245]229        try :
230            if self._dataEntityMigration.ceda_observation_coll_id:
[8408]231                #obsColl = self.epbRepo.moles3EPB.search(CEDA_ObservationCollection, self._dataEntityMigration.ceda_observation_coll_id)
232                obsColl = self.epbRepo.moles3EPB.searchSelectiveLoad(CEDA_ObservationCollection, self._dataEntityMigration.ceda_observation_coll_id, \
233                                                           ['identifier'])
[8245]234            else:
[8358]235                obsColl = CEDA_ObservationCollection()
236            self._execute(obsColl)                 
[8237]237        except Exception as ex:
[8433]238            self._report.append(ex)
239            return self._report     
[8089]240       
[8237]241        #retrieves the associated deployment links from the data_entity
[8358]242        deploymentsLinks = findDeploymentsInDE(self._dataEntityMigration)       
[8237]243        #retrieves the DataEntityMigration sorted by creation date
[8358]244        deploymentMigrations = self.epbRepo.migrationEPB.getAllDeploymentsMigrationByDataEntitySortedByDate( \
[8323]245                                                self._dataEntityMigration, deploymentsLinks)
[8363]246   
247        if deploymentMigrations is None or len(deploymentMigrations) == 0:
[8433]248            self._report.append(NoAssociatedDeployments(self._dataEntityMigration))
249            return self._report
[8363]250       
[8433]251        howManydm = len(deploymentMigrations)           
[8089]252        for deploymentMigration in deploymentMigrations:
[8144]253            try:
[8460]254                ceda_observation = self._processDeploymentMigration(deploymentMigration, howManydm == 1)               
[8358]255                #Is a first time process?
[8361]256                if not self.epbRepo.moles3EPB.observationCollectionHasObservation(getattr(obsColl, 'id'), getattr(ceda_observation, 'id')):
[8358]257                    self.epbRepo.moles3EPB.updateCedaObject(obsColl, {'member': ceda_observation})
[8323]258                                     
[8144]259            except Exception as ex:
[8433]260                self._report.append(ex)               
[8445]261                       
[8416]262        self._assignGeographicExtent(obsColl)
[8445]263        self._assignPhenomenonTime(obsColl)       
[8392]264         
[8433]265        return self._report
Note: See TracBrowser for help on using the repository browser.