Changeset 8323 for mauRepo/MolesManager
- Timestamp:
- 01/06/12 13:23:25 (9 years ago)
- Location:
- mauRepo/MolesManager/trunk/src
- Files:
-
- 24 edited
Legend:
- Unmodified
- Added
- Removed
-
mauRepo/MolesManager/trunk/src/MolesManager/djencoder.py
r8248 r8323 33 33 return d 34 34 35 if d['__class__'] == 'CEDA_Project':36 print d37 38 35 if d['__class__'] == 'Decimal': 39 36 d.update({'value': str(obj)}) -
mauRepo/MolesManager/trunk/src/MolesManager/moles3epb.py
r8295 r8323 18 18 from ea_model.moles3_4.utilities.mo_responsiblepartyinfo import MO_ResponsiblePartyInfo 19 19 from ea_model.moles3_4.utilities.mo_rolevalue import MO_RoleValue 20 from sqlalchemy.orm.collections import InstrumentedList 21 from sqlalchemy.orm.util import identity_key 20 22 21 23 … … 64 66 65 67 @classmethod 66 def search(self, clazz, inst_id, session = None): 67 intSession = Moles3EPB._getSession(session) 68 return EPB.search(clazz, inst_id, intSession) 68 def searchEager(self, clazz, inst_id, session = None): 69 intSession = Moles3EPB._getSession(session) 70 ret = EPB.searchEager(clazz, inst_id, intSession) 71 if session is None: 72 intSession.close() 73 return ret 69 74 70 75 @classmethod 71 def searchEager(self, clazz, inst_id, session = None): 72 intSession = Moles3EPB._getSession(session) 73 return EPB.searchEager(clazz, inst_id, intSession) 76 def persistInstance(self, instance, session = None): 77 """ 78 Adds a new migration object. 79 @param migrationObj: the migration object to add 80 @param session: an SQLAlchemy Session object. If not None the session IS NOT closed at the exit, 81 If None (default) a new Session is created from the underlying EPB and closed at the exit. 82 @return an updated, session independent, object instance reflecting the new persisted object 83 """ 84 intSession = Moles3EPB._getSession(session) 85 EPB.persistInstance(instance, intSession) 86 if session is None: 87 intSession.close() 88 #return ret 89 74 90 75 91 @classmethod 76 def addCedaObject(self, ceda_observation, session = None, commit = False): 77 """ 78 Adds and eventually commit a CEDA Object in MOLES3 db 79 @param ceda_observation: the CEDA object to persist 92 def updateCedaObject(self, ceda_object, cols_to_update, session = None): 93 """ 94 Update and eventually commit a CEDA Object in MOLES3 db. 95 NOTE: only the returned instance will reflect the update! 96 @param ceda_object: the CEDA object to persist 97 @param dict: a dictionary containing the columns to update for the given ceda_object 80 98 @param session: the external session to use. If None a new session will be open to add and commit the object and then closed at the exit. The object is committed 81 @param commit: defines if the object has to be committed immediately or not. 82 """ 83 intSession = Moles3EPB._getSession(session) 84 intSession.add(ceda_observation) 85 if commit: 86 intSession.commit() 87 #Moles3EPB._closeSession(session) 99 @return: the given instance with the updated attributes. 100 """ 101 intSession = Moles3EPB._getSession(session) 102 coll = None 103 try: 104 coll = intSession.merge(ceda_object) 105 except Exception as e: 106 print e 107 if coll != None: 108 for k,v in cols_to_update.items(): 109 if hasattr(coll, k): 110 val = None 111 try: 112 val = intSession.merge(v) 113 except Exception: 114 val = v 115 coll_k = getattr(coll, k) 116 if type(coll_k) == list or type(coll_k) == InstrumentedList: 117 if type(val) == list or type(val) == InstrumentedList: 118 coll_k.extend(val) 119 else: 120 if val in coll_k: 121 break 122 coll_k.append(val) 123 else: 124 setattr(coll, k, val) 125 EPB.persistInstance(coll, intSession) 126 if session is None: 127 intSession.close() 88 128 89 129 @classmethod … … 100 140 elif type(instance) == CEDA_Observation: 101 141 return intSession.query(CedaGUID).filter(CedaGUID.ceda_observation==instance.id).first() 102 142 if session is None: 143 intSession.close() 103 144 104 145 @classmethod … … 111 152 """ 112 153 intSession = Moles3EPB._getSession(session) 113 return intSession.query(CEDA_ObservationCollection, CEDA_Observation).filter(CEDA_ObservationCollection.id==obs_coll_id).filter(CEDA_Observation.id==obs_id).count() > 0 154 ret = intSession.query(CEDA_ObservationCollection, CEDA_Observation).filter(CEDA_ObservationCollection.id==obs_coll_id).filter(CEDA_Observation.id==obs_id).count() > 0 155 if session is None: 156 intSession.close() 157 return ret 114 158 115 159 @classmethod … … 124 168 filter(MO_ResponsiblePartyInfo.role == MO_RoleValue.cl_author). \ 125 169 filter(MO_Observation.id == obs_id) 170 if session is None: 171 intSession.close() 126 172 return ret 127 173 … … 184 230 cos = intSession.query(CEDA_ObservationCollection).all() 185 231 co = intSession.query(MO_ObservationCollection).join(MO_ObservationCollection.member).filter(MO_ObservationCollection.member.contains(obsers)) 186 print co187 print co.all()188 232 189 233 observations = intSession.query(MO_ObservationCollection).join(CEDA_Observation). \ … … 192 236 return observations 193 237 194 @classmethod195 def addObservationToObservationCollection(self, observationCollection, observation, session = None, commit = False):196 """197 Adds an Observation instance to an ObservationCollection if still not part of the collection.198 @param observationCollection: the collection to update199 @param observation: the observation to add200 @param session: the session to use for the operation201 @param commit: if True commits at the end (defaul False)202 """203 intSession = Moles3EPB._getSession(session)204 if not self.observationCollectionHasObservation(observationCollection.id, observation.id, intSession):205 observationCollection.member.append(observation)206 if commit:207 intSession.commit()208 209 @classmethod210 def addDataLineageToObservation(self, data_lineage, observation, session = None, commit = False):211 """212 Adds an data_lineage element to an Observation if still not assigned or not equal.213 @param data_lineage: the quality string to persist214 @param observation: the observation to update215 @param session: the session to use for the operation216 @param commit: if True commits at the end (defaul False)217 """218 intSession = Moles3EPB._getSession(session)219 if observation.dataLineage != data_lineage:220 observation.dataLineage = data_lineage221 if commit:222 intSession.commit()223 224 @classmethod225 def addDescriptionToObservation(self, description, observation, session = None, commit = False):226 """227 Adds an description element to an Observation if still not assigned or not equal.228 @param description: the description string to persist229 @param observation: the observation to update230 @param session: the session to use for the operation231 @param commit: if True commits at the end (defaul False)232 """233 intSession = Moles3EPB._getSession(session)234 if observation.description != description:235 observation.description = description236 if commit:237 intSession.commit()238 239 @classmethod240 def addRelatedPartyInfoToObservation(self, responsibleParty, observation, session = None, commit = False):241 """242 Adds a relatedParty element to an Observation if still not assigned or not equal.243 @param responsibleParty: the responsibleParty element to persist244 @param observation: the observation to update245 @param session: the session to use for the operation246 @param commit: if True commits at the end (defaul False)247 """248 intSession = Moles3EPB._getSession(session)249 if not responsibleParty in observation.relatedParty:250 observation.relatedParty.append(responsibleParty)251 #moles3Append(observation, 'relatedParty', responsibleParty)252 if commit:253 intSession.commit()254 255 256 238 @classmethod 257 239 def getNewMolesSession(self): 258 240 return Moles3EPB._getSession() 241 242 @classmethod 243 def search(self, clazz, inst_id, session = None): 244 intSession = Moles3EPB._getSession(session) 245 ret = EPB.search(clazz, inst_id, intSession) 246 if session is None: 247 intSession.close() 248 return ret 249 250 @classmethod 251 def searchSelectiveLoad(self, clazz, inst_id, attributes, session = None): 252 """ 253 Searches a required instance by id loading selectively \ 254 the specified fields. The parameter "attributes" is a single string or a list of attributes 255 owned by the instance of "clazz". Furthermore such list may contain 256 also the children of the main attributes. For example "attrs" may look 257 like 258 ['resultAccumulation', 'identifier.authority', 'resultTime.position.dateTime8601.month', \ 259 'relatedParty.party', 'result.source.function', 'permission', \ 260 'geographicExtent', 'phenomenonTime', 'keywords', 'description', \ 261 'inSupportOf.abstract', 'dataLineage'] 262 the first parameter refers to the main class so is equivalent to 263 clazz.resultAccumulation 264 the second parameter is equivalent to invoke 265 clazz.identifier.authority 266 As single string "attributes" could be as well just 'identifier.authority' 267 @param clazz: the class type to search for 268 @param inst_id: the instance id for which the search is done 269 @param attributes: a single string or a list of attributes to load 270 @param session: a session to use for the query. By default a new one is created automatically at start and closed at the end 271 @return the required instance 272 """ 273 intSession = Moles3EPB._getSession(session) 274 ret = EPB.searchSelectiveLoad(clazz, inst_id, attributes, intSession) 275 if session is None: 276 intSession.close() 277 return ret 278 279 @classmethod 280 def loadAttributes(self, instance, attributes, session = None): 281 """ 282 Returns the attribute of an instance. The parameter "attributes" is a single string or a list of attributes 283 owned by the instance of "clazz". Furthermore such list may contain 284 also the children of the main attributes. For example "attrs" may look 285 like 286 ['resultAccumulation', 'identifier.authority', 'resultTime.position.dateTime8601.month', \ 287 'relatedParty.party', 'result.source.function', 'permission', \ 288 'geographicExtent', 'phenomenonTime', 'keywords', 'description', \ 289 'inSupportOf.abstract', 'dataLineage'] 290 the first parameter refers to the main class so is equivalent to 291 clazz.resultAccumulation 292 the second parameter is equivalent to invoke 293 clazz.identifier.authority 294 As single string "attributes" could be as well just 'identifier.authority' 295 @param instance: an instance containing the appropriate id 296 @param attributes: the attribute value required 297 @param session: the session to use for the operation 298 @return: the given instance filled with the required attributes. 299 """ 300 intSession = Moles3EPB._getSession(session) 301 instance = intSession.merge(instance) 302 EPB.loadAttributes(instance, attributes, intSession) 303 if session is None: 304 intSession.close() 305 return instance 259 306 260 307 @classmethod -
mauRepo/MolesManager/trunk/src/MolesManager/molesSessionMiddleware.py
r8245 r8323 23 23 _epbInitialized = False 24 24 25 26 27 28 25 def _getNewMolesSession(self): 29 session = Moles3EPB.getNewMolesSession() 26 session = Moles3EPB.getNewMolesSession() 30 27 return session 28 31 29 32 30 def _doInitialization(self): … … 39 37 molesDB = DbManager(MOLES3_DB_CONNECTION, MOLES3_DB_SCRIPT, session_event_manager=EVENTS_DB) 40 38 Moles3EPB.overrrideDBManager(molesDB) 41 MolesSessionMiddleware._epbInitialized = True 39 MolesSessionMiddleware._epbInitialized = True 40 41 self._migration() #see the note on MolesSessionMiddleware._migration 42 42 43 """ 43 44 def _getMolesSession(self): 45 ''' 46 @deprecated: db session is going to be removed from all GUI-related instances 47 ''' 44 48 if not MolesSessionMiddleware._epbInitialized: 45 49 self._doInitialization() 46 50 47 51 return self._getNewMolesSession() 52 """ 48 53 49 54 def _migration(self, runMigration = RUN_MIGRATION): … … 58 63 """ 59 64 60 def process_request(self, request): 61 self._migration() #see the note on MolesSessionMiddleware._migration 62 63 request.moles_session = self._getMolesSession() 65 def process_request(self, request): 66 if not MolesSessionMiddleware._epbInitialized: 67 self._doInitialization() 68 69 ''' 70 @deprecated: request.moles_session is going to be removed from all GUI-related instances 71 ''' 72 #request.moles_session = self._getMolesSession() 64 73 65 74 66 75 def process_response(self, request, response): 76 """ 67 77 if hasattr(request, 'moles_session'): 68 78 request.moles_session.close() 69 79 """ 70 80 return response 71 81 72 82 73 83 def process_exception(self, request, exception): 84 pass 85 """ 74 86 try: 75 87 session = request.moles_session … … 77 89 return 78 90 session.rollback() 79 session.close() 91 session.close() 92 """ -
mauRepo/MolesManager/trunk/src/MolesManager/views/cedaInstrument.py
r8120 r8323 18 18 ''' 19 19 if request.POST.has_key('obs_id'): 20 dp = Moles3EPB.search(CEDA_Observation, request.POST['obs_id'] , request.moles_session)20 dp = Moles3EPB.search(CEDA_Observation, request.POST['obs_id']) 21 21 c['records'] = dp 22 22 -
mauRepo/MolesManager/trunk/src/MolesManager/views/cedaObservationView.py
r8264 r8323 32 32 if record: 33 33 c['coObs'] = mark_safe(DJEncoder().escapeForJSON(DJEncoder().encode(record))) 34 guid = Moles3EPB.retrieveGUIDFromInstance(record , request.moles_session)34 guid = Moles3EPB.retrieveGUIDFromInstance(record) 35 35 if guid: 36 36 c['guid'] = guid.id … … 41 41 return render_to_response('cedaObservation.html', c) 42 42 43 def _getCedaObservation(request, obs_id): 44 return Moles3EPB.searchEager(CEDA_Observation, obs_id, request.moles_session) 43 def _getCedaObservation(request, obs_id): 44 eagerloadthese = ['identifier.authority', 'resultTime.position.dateTime8601.month', \ 45 'resultAccumulation', 'relatedParty.party', 'result.source.function', 'permission', \ 46 'geographicExtent', 'phenomenonTime', 'keywords', 'description', \ 47 'inSupportOf.abstract', 'dataLineage'] 48 return Moles3EPB.searchSelectiveLoad(CEDA_Observation, obs_id, eagerloadthese) 49 #return Moles3EPB.searchEager(CEDA_Observation, obs_id) -
mauRepo/MolesManager/trunk/src/MolesManager/views/cedaPlatform.py
r8120 r8323 18 18 ''' 19 19 if request.POST.has_key('obs_id'): 20 dp = Moles3EPB.search(CEDA_Observation, request.POST['obs_id'] , request.moles_session)20 dp = Moles3EPB.search(CEDA_Observation, request.POST['obs_id']) 21 21 c['records'] = dp 22 22 -
mauRepo/MolesManager/trunk/src/MolesManager/views/cedaProjectView.py
r8120 r8323 18 18 ''' 19 19 if request.POST.has_key('obs_id'): 20 dp = Moles3EPB.search(CEDA_Observation, request.POST['obs_id'] , request.moles_session)20 dp = Moles3EPB.search(CEDA_Observation, request.POST['obs_id']) 21 21 c['records'] = dp 22 22 -
mauRepo/MolesManager/trunk/src/MolesManager/views/cedaSearch.py
r8296 r8323 27 27 if not keywords.endswith("'"): 28 28 keywords = keywords + "'" 29 obsevations = Moles3EPB.extractObservationByTitleKeywords(keywords , request.moles_session)29 obsevations = Moles3EPB.extractObservationByTitleKeywords(keywords) 30 30 titles = [] 31 31 ids = [] … … 36 36 [baseCov, 37 37 str(o.id)]) 38 auth_query = Moles3EPB.observationAuthor(o.id , request.moles_session)38 auth_query = Moles3EPB.observationAuthor(o.id) 39 39 ath = auth_query.all() 40 40 ath0 = ath[0] -
mauRepo/MolesManager/trunk/src/MolesManager/views/cedacat.py
r8245 r8323 12 12 13 13 def cedacat(request, guid): 14 ceda_guid = Moles3EPB.search(CedaGUID, guid , request.moles_session)14 ceda_guid = Moles3EPB.search(CedaGUID, guid) 15 15 if ceda_guid and ceda_guid.ceda_observation: 16 16 return redirect(cedaObservationView.coView, ceda_guid.ceda_observation) -
mauRepo/MolesManager/trunk/src/libs/commons_db.py
r8245 r8323 42 42 self._script(self.metadata) 43 43 self.metadata.create_all(bind=self.engine) 44 self._session = scoped_session(sessionmaker()) 44 #self._session = scoped_session(sessionmaker()) 45 if self.engine: 46 self.engine.connect() 45 47 46 48 def createDbSession(self): … … 49 51 @return: a sqlalchemy.orm.session.Session instance if the ORM is fully configured, otherwise a sqlalchemy.engine.base.Connection instance 50 52 """ 51 if self._session: 52 ret = self._session() 53 if self._session_event_manager: 54 self._session_event_manager(ret) 55 return ret 56 if self.engine: 57 return self.engine.connect() 53 #session = scoped_session(sessionmaker(bind=self.engine))() 54 session = sessionmaker(bind=self.engine)() 55 if self._session_event_manager: 56 self._session_event_manager(session) 57 return session 58 #if self._session: 59 # ret = self._session() 60 # if self._session_event_manager: 61 # self._session_event_manager(ret) 62 # return ret 63 #if self.engine: 64 # return self.engine.connect() 58 65 59 66 def closeDbSession(self, dbSession): -
mauRepo/MolesManager/trunk/src/libs/epb.py
r8180 r8323 6 6 from sqlalchemy.orm import subqueryload 7 7 from sqlalchemy.sql.expression import text 8 from sqlalchemy.orm.util import identity_key 8 9 9 10 class EPB(object): … … 17 18 18 19 @classmethod 19 def search(self, clazz, inst_id, session): 20 res = session.query(clazz).get(inst_id) 20 def search(self, clazz, inst_key, session): 21 """ 22 Searches a required instance by id 23 @param clazz: the class type to search for 24 @param inst_key: the instance id for which the search is done 25 @param session: a session to use for the query 26 @return the required instance 27 """ 28 res = session.query(clazz).get(inst_key) 21 29 if res is None: 22 30 return None … … 24 32 25 33 @classmethod 26 def searchEager(self, clazz, inst_id, session): 34 def searchEager(self, clazz, inst_id, session): 35 """ 36 Searches a required instance by id loading eagerly ALL its field. Please use carefully because \ 37 it could impact the performance 38 @param clazz: the class type to search for 39 @param inst_id: the instance id for which the search is done 40 @param session: a session to use for the query 41 @return the required instance 42 """ 27 43 res = session.query(clazz).options(subqueryload('*')).get(inst_id) 28 44 if res is None: 29 45 return None 46 30 47 return res 48 49 @classmethod 50 def searchSelectiveLoad(self, clazz, inst_id, attrs, session): 51 """ 52 Searches a required instance by id loading \ 53 the specified fields. The parameter "attrs" is a single string or a list of attributes 54 owned by the instance of "clazz". Furthermore such list may contain 55 also the children of the main attributes. For example "attrs" may look 56 like 57 ['resultAccumulation', 'identifier.authority', 'resultTime.position.dateTime8601.month', \ 58 'relatedParty.party', 'result.source.function', 'permission', \ 59 'geographicExtent', 'phenomenonTime', 'keywords', 'description', \ 60 'inSupportOf.abstract', 'dataLineage'] 61 the first parameter refers to the main class so is equivalent to 62 clazz.resultAccumulation 63 the second parameter is equivalent to invoke 64 clazz.identifier.authority 65 As single string "attrs" could be as well just 'identifier.authority' 66 @param clazz: the class type to search for 67 @param inst_id: the instance id for which the search is done 68 @param attrs: a single string or a list of attributes to load 69 @param session: a session to use for the query 70 @return the required instance 71 """ 72 if session is None: 73 raise Exception("Session is None!") 74 res = EPB.search(clazz, inst_id, session) 75 if res is None: 76 return None 77 self._drillData(res, attrs) 78 return res 79 80 @classmethod 81 def loadAttributes(self, instance, attributes, session): 82 """ 83 Loads the given instance with the required attributes. 84 The parameter "attributes" is a single string or a list of attributes 85 owned by the instance of "clazz". Furthermore such list may contain 86 also the children of the main attributes. For example "attrs" may look 87 like 88 ['resultAccumulation', 'identifier.authority', 'resultTime.position.dateTime8601.month', \ 89 'relatedParty.party', 'result.source.function', 'permission', \ 90 'geographicExtent', 'phenomenonTime', 'keywords', 'description', \ 91 'inSupportOf.abstract', 'dataLineage'] 92 the first parameter refers to the main class so is equivalent to 93 clazz.resultAccumulation 94 the second parameter is equivalent to invoke 95 clazz.identifier.authority 96 As single string "attributes" could be as well just 'identifier.authority' 97 It does not return anything because it does not close the session 98 @param instance: an instance containing the appropriate id 99 @param attributes: a single string or a list of attributes to load 100 @param session: the session to use for the operation 101 """ 102 if instance is None: 103 raise Exception("Instance is None!") 104 if session is None: 105 raise Exception("Session is None!") 106 session.merge(instance) 107 self._drillData(instance, attributes) 108 31 109 32 110 @classmethod … … 46 124 47 125 @classmethod 126 def persistInstance(self, instance, session): 127 """ 128 Adds a new migration object. 129 @param migrationObj: the migration object to add 130 @param session: an sqlalchemy Session object. If None (default) the method creates 131 @return an updated, session independant, object instance reflecting the new persisted object 132 """ 133 session.add(instance) 134 session.commit() 135 id_key = identity_key(instance=instance) 136 instance = EPB.search(id_key[0], id_key[1], session) 137 #instance = ret 138 139 @classmethod 48 140 def executeNative(self, sqlNative, session): 49 141 return session.execute(text(sqlNative)) … … 56 148 57 149 @classmethod 58 def _closeSession(self, session): 59 if session is not None: 60 return 61 session.commit() 62 session.close() 150 def _drillData(self, obj, attrs): 151 """ 152 @param obj: its an instance already living inside an SQLAlchemy session 153 @param attrs: a list of attributes owned by the obj parameter. It accepts dot separated attributes as childern of obj attributes. 154 """ 155 #if is a single field wrap it in a list and recalls itself 156 if not isinstance(attrs, list): 157 self._drillData(obj, [attrs]) 158 for item in attrs: 159 attr = item.split('.')[0] 160 if isinstance(obj, list): 161 for element in obj: 162 self._drillData(element, [item]) 163 else: 164 if hasattr(obj, attr): 165 nobj = getattr(obj, attr) 166 if len(attr) != len(item): 167 self._drillData(nobj, [item[len(attr) + 1:]]) 168 169 -
mauRepo/MolesManager/trunk/src/libs/migration/InfodbEPB.py
r8222 r8323 6 6 from libs.epb import EPB 7 7 from libs.migration.exception.exceptions import NoDBManager 8 from sqlalchemy.sql.expression import text 8 from sqlalchemy.sql.expression import text, bindparam 9 from sqlalchemy.types import String 9 10 10 11 … … 28 29 @param de_id: the cedainfoapp_dataentity.dataentity_id to look for 29 30 """ 30 connection = InfodbEPB.getNewInfoConnection(); 31 query_string = text("""SELECT cedainfoapp_dataentity.logical_path AS logical_path, \ 31 query_string = text("SELECT cedainfoapp_dataentity.logical_path AS logical_path, \ 32 32 cedainfoapp_curationcategory.category AS category,\ 33 33 cedainfoapp_person.name AS name \ … … 37 37 LEFT JOIN cedainfoapp_person \ 38 38 ON cedainfoapp_dataentity.responsible_officer_id=cedainfoapp_person.id \ 39 WHERE cedainfoapp_dataentity.dataentity_id = :id""")40 records = connection.execute(query_string,id=de_id)39 WHERE cedainfoapp_dataentity.dataentity_id=:i_id") 40 records = InfodbEPB._infoDB.engine.execute(query_string, i_id=de_id) 41 41 ret = records.fetchone() 42 42 records.close() -
mauRepo/MolesManager/trunk/src/libs/migration/MigrationEPB.py
r8269 r8323 9 9 from libs.epb import EPB 10 10 from libs.migration.exception.exceptions import NoDBManager 11 from libs.migration.processor.commons import stringToTimestamp 12 from sqlalchemy.orm.collections import InstrumentedList 13 from sqlalchemy.orm.util import identity_key 11 14 12 15 … … 20 23 Sets the MigrationEPB libs.commons_db.DbManager 21 24 """ 22 MigrationEPB._migrationDB = dbManager 25 MigrationEPB._migrationDB = dbManager 26 27 @classmethod 28 def persistInstance(self, instance, session = None): 29 """ 30 Adds a new migration object. 31 @param migrationObj: the migration object to add 32 @param session: an SQLAlchemy Session object. If None (default) the method creates 33 @return an updated, session independent, object instance reflecting the new persisted object 34 """ 35 intSession = MigrationEPB._getSession(session) 36 EPB.persistInstance(instance, intSession) 37 if session is None: 38 intSession.close() 39 #return ret 23 40 24 41 @classmethod 25 def associateObservationCollectionToDataEntity(self, dataEntityMigration, obs_coll_id, session = None, commit = False): 26 intSession = MigrationEPB._getSession(session) 27 dataEntityMigration.ceda_observation_coll_id = obs_coll_id 28 if commit: 29 intSession.commit() 30 31 @classmethod 32 def associateObservationToDeployment(self, deploymentMigration, obs_id, session = None, commit = False): 33 intSession = MigrationEPB._getSession(session) 34 deploymentMigration.ceda_observation_id = obs_id 35 if commit: 36 intSession.commit() 37 38 @classmethod 39 def addMigration(self, migrationObj, session = None): 40 intSession = MigrationEPB._getSession(session) 41 intSession.add(migrationObj) 42 EPB._closeSession(session) 43 44 @classmethod 45 def getAllDataEntityMigration(self, session = None): 46 intSession = MigrationEPB._getSession(session) 47 return EPB.getAllObjects(DataEntityMigration, intSession) 48 49 @classmethod 50 def getDataEntityMigrationOrderByDate(self, session = None): 51 return MigrationEPB.getAllDataEntityMigration(session).order_by(asc("doc_creation")) 52 53 @classmethod 54 def getAllDeploymentsMigrationByDataEntitySortedByDate(self, dataEntity, deploymentNames, session = None): 55 intSession = MigrationEPB._getSession(session) 56 return EPB.getAllObjects(DeploymentsMigration, intSession).filter(DeploymentsMigration.doc_name.in_(deploymentNames)).order_by(asc("doc_creation")) 42 def getAllDeploymentsMigrationByDataEntitySortedByDate(self, dataEntity, deploymentNames): 43 intSession = MigrationEPB._getSession(None) 44 res = EPB.getAllObjects(DeploymentsMigration, intSession).filter(DeploymentsMigration.doc_name.in_(deploymentNames)).order_by(asc("doc_creation")).all() 45 intSession.close() 46 return res 57 47 #return EPB.getAllObjects(DeploymentsMigration, intSession).filter(*[EPB.buildFilter('doc_owner', dataEntity.doc_owner)]).filter(DeploymentsMigration.doc_name.in_(deploymentNames)).order_by(asc("doc_creation")) 58 48 #return EPB.getAllObjects(DeploymentsMigration, intSession).filter(*[EPB.buildFilter('doc_status', dataEntity.doc_status)]).filter(*[EPB.buildFilter('doc_owner', dataEntity.doc_owner)]).filter(DeploymentsMigration.doc_name.in_(deploymentNames)).order_by(asc("doc_creation")) … … 83 73 @classmethod 84 74 def getDataEntityMigrationbyPath(self, migrationObject, session = None): 85 if migrationObject is None: 86 raise Exception("migrationObject is None") 87 return self._getMigrationObjectByName(DataEntityMigration, migrationObject, migrationObject.doc_name, session) 75 """ 76 Returns the DataEntityMigration associated with the given path 77 @param migrationObject: the migration object to look for. If None returns all the DataEntityMigration items 78 """ 79 intSession = MigrationEPB._getSession(session) 80 if migrationObject: 81 ret = intSession.query(DataEntityMigration).filter(*[EPB.buildFilter('doc_name', migrationObject.doc_name)]).first() 82 else: #then process all the DataEntities 83 ret = EPB.getAllObjects(DataEntityMigration, intSession) 84 if session is None: 85 intSession.close() 86 return ret 87 88 @classmethod 89 def updateMigrationObject(self, migration_object, cols_to_update, session = None): 90 """ 91 Update and eventually commit a Migration Object in Migration db 92 @param ceda_object: the Migration object to persist 93 @param dict: a dictionary containing the columns to update for the given migration_object 94 @param session: the external session to use. If None a new session will be open to add and commit the object and then closed at the exit. The object is committed 95 """ 96 intSession = MigrationEPB._getSession(session) 97 coll = None 98 try: 99 coll = intSession.merge(migration_object) 100 except Exception as e: 101 print e 102 if coll != None: 103 for k,v in cols_to_update.items(): 104 if hasattr(coll, k): 105 val = None 106 try: 107 val = intSession.merge(v) 108 except Exception: 109 val = v 110 coll_k = getattr(coll, k) 111 if type(coll_k) == list or type(coll_k) == InstrumentedList: 112 if type(val) == list or type(val) == InstrumentedList: 113 coll_k.extend(val) 114 else: 115 coll_k.append(val) 116 else: 117 setattr(coll, k, val) 118 EPB.persistInstance(coll, intSession) 119 #intSession.commit() 120 if session is None: 121 intSession.close() 122 #if coll: 123 # return coll 124 88 125 89 126 @classmethod … … 92 129 93 130 @classmethod 131 def loadAttributes(self, instance, attributes, session = None): 132 """ 133 Returns the attribute of an instance. The parameter "attributes" is a single string or a list of attributes 134 owned by the instance of "clazz". Furthermore such list may contain 135 also the children of the main attributes. For example "attrs" may look 136 like 137 ['resultAccumulation', 'identifier.authority', 'resultTime.position.dateTime8601.month', \ 138 'relatedParty.party', 'result.source.function', 'permission', \ 139 'geographicExtent', 'phenomenonTime', 'keywords', 'description', \ 140 'inSupportOf.abstract', 'dataLineage'] 141 the first parameter refers to the main class so is equivalent to 142 clazz.resultAccumulation 143 the second parameter is equivalent to invoke 144 clazz.identifier.authority 145 As single string "attributes" could be as well just 'identifier.authority' 146 @param instance: an instance containing the appropriate id 147 @param attributes: the attribute value required 148 @param session: the session to use for the operation 149 @return: a detached instance (or array of instances) of the required attribute. 150 """ 151 intSession = MigrationEPB._getSession(session) 152 instance = intSession.merge(instance) 153 EPB.loadAttributes(instance, attributes, session) 154 if session is None: 155 intSession.close() 156 return instance 157 158 @classmethod 159 def search(self, clazz, inst_id, session = None): 160 if clazz is None or inst_id is None: 161 return None 162 intSession = MigrationEPB._getSession(session) 163 ret = EPB.search(clazz, inst_id, intSession) 164 if session is None: 165 intSession.close() 166 return ret 167 168 169 @classmethod 94 170 def _getSession(self, session = None): 95 171 if MigrationEPB._migrationDB is None: -
mauRepo/MolesManager/trunk/src/libs/migration/processor/commons.py
r8285 r8323 682 682 ci_party.name = name 683 683 if contactInfo: 684 ci_party.contactInfo = contactInfo684 ci_party.contactInfo.extend(contactInfo) 685 685 return ci_party 686 686 … … 718 718 ci_citation.title = title 719 719 if date and type(date) == list: 720 ci_citation.date = date720 ci_citation.date.extend(date) 721 721 if citedResponsibleParty: 722 722 ci_citation.extend(citedResponsibleParty) -
mauRepo/MolesManager/trunk/src/libs/migration/processor/dataEntity.py
r8274 r8323 7 7 from ea_model.ceda_metadatamodel.ceda_observationcollection.ceda_observationcollection import \ 8 8 CEDA_ObservationCollection 9 from libs.epb import EPB10 9 from libs.migration.MigrationEPB import MigrationEPB 11 10 from libs.migration.exception.exceptions import MigrationObjectException, NoAssociatedAuthor,\ 12 11 migrationObjectDescription, NoAssociatedDeployments 13 12 from libs.migration.processor.commons import findDeploymentsInDE,\ 14 createMD_Identifier, extractContent, MD_CODE_MOLES2_CITATION,\13 createMD_Identifier, extractContent,\ 15 14 hasAtomDocumentSameHash, createCI_Citation, createCI_Date, findPublishedDate,\ 16 15 isoDateTimeStringToTimeDate, findUpdatedDate, createDate,\ 17 calculateHash, extractUpdateFrequency, findDOIInMigrationDocument 16 calculateHash, extractUpdateFrequency, findDOIInMigrationDocument,\ 17 getAtomDocumentHash 18 18 from libs.migration.processor.deployment import DeploymentProcessor 19 19 from ea_model.moles3_4.utilities.mo_publicationstatevalue import MO_PublicationStateValue … … 31 31 Processes a DataEntityMigration item. Note that each DataEntity is associated to a "dataent_xxxx" file in Moles2 32 32 ''' 33 def __init__(self, dataEntityMigration , migrationSessions):33 def __init__(self, dataEntityMigration): 34 34 ''' 35 35 Initializes the class 36 36 @param _dataEntityMigration: the DataEntityMigration instance 37 @param migrationSessions: a MigrationSessions instance38 37 ''' 39 38 if dataEntityMigration is None: 40 39 raise MigrationObjectException("DataEntityProcessor cannot process an None item") 41 40 self._dataEntityMigration = dataEntityMigration 42 self.migrationSessions = migrationSessions43 41 44 42 def _processCitation(self, ceda_observationCollection): … … 65 63 66 64 def _getObservationCollection(self): 67 return EPB.search(CEDA_ObservationCollection, self._dataEntityMigration.ceda_observation_coll_id, self.migrationSessions.molesSession)65 return Moles3EPB.search(CEDA_ObservationCollection, self._dataEntityMigration.ceda_observation_coll_id) 68 66 69 67 def _processObservationCollection(self): … … 88 86 #self._processCitation(ceda_observationCollection) 89 87 return ceda_observationCollection 88 89 def _update(self): 90 # ...and the data entity document has not changed 91 if hasAtomDocumentSameHash(self._dataEntityMigration): 92 return self._getObservationCollection() 93 else: 94 return self.updateObservationCollection() 90 95 91 96 def _create(self): … … 100 105 ceda_observationCollection.publicationState = MO_PublicationStateValue.cl_working 101 106 102 Moles3EPB. addCedaObject(ceda_observationCollection, self.migrationSessions.molesSession, True)103 104 MigrationEPB. associateObservationCollectionToDataEntity(self._dataEntityMigration, ceda_observationCollection.id, \105 self.migrationSessions.migrationSession, True) 107 Moles3EPB.persistInstance(ceda_observationCollection) 108 109 MigrationEPB.updateMigrationObject(self._dataEntityMigration, {'ceda_observation_coll_id': ceda_observationCollection.id}) 110 106 111 107 112 #Adds the CedaGUID … … 109 114 ceda_guid.id = calculateHash(self._dataEntityMigration.data_ent_id) 110 115 ceda_guid.ceda_observationcollection = ceda_observationCollection.id 111 Moles3EPB. addCedaObject(ceda_guid, self.migrationSessions.molesSession, True)116 Moles3EPB.persistInstance(ceda_guid) 112 117 DataEntityProcessor.log.info("GUID for this ObservationCollection: %s" % (ceda_guid.id)) 113 118 114 119 return ceda_observationCollection 115 120 121 def _processResultAccumulation(self, ceda_observation): 122 ceda_observation = Moles3EPB.loadAttributes(ceda_observation, "resultAccumulation") 123 if ceda_observation.resultAccumulation is None: 124 updateFrequency = extractUpdateFrequency(self._dataEntityMigration) 125 if updateFrequency: 126 resultAccumulation = MD_MaintenanceFrequencyCode.from_string(updateFrequency) 127 Moles3EPB.updateCedaObject(ceda_observation, {'resultAccumulation': resultAccumulation}) 128 129 def _processDOI(self, deploymentMigration, ceda_observation, deProcessor, single_deployment): 130 doi = findDOIInMigrationDocument(deploymentMigration) 131 if single_deployment: 132 if doi is None: 133 doi = findDOIInMigrationDocument(self._dataEntityMigration) 134 #collection_identifier = Moles3EPB.extractCollectionIdentifierByTitle(MD_CODE_MOLES2_CITATION, self.migrationSessions.molesSession) 135 #if collection_identifier.count()==1: 136 # ceda_observation.identifier.append(collection_identifier.first()) 137 138 deProcessor.assignDOI(ceda_observation, doi) 139 116 140 def _processDeploymentMigration(self, deploymentMigration, single_deployment): 117 deProcessor = DeploymentProcessor(self._dataEntityMigration, deploymentMigration , self.migrationSessions)141 deProcessor = DeploymentProcessor(self._dataEntityMigration, deploymentMigration) 118 142 try: 119 143 DataEntityProcessor.log.info("Processing deployment: %s" % (migrationObjectDescription(deploymentMigration))) 120 144 ceda_observation = deProcessor.process() 121 if ceda_observation.resultAccumulation is None: 122 updateFrequency = extractUpdateFrequency(self._dataEntityMigration) 123 if updateFrequency: 124 ceda_observation.resultAccumulation = MD_MaintenanceFrequencyCode.from_string(updateFrequency) 125 126 doi = findDOIInMigrationDocument(deploymentMigration) 127 if single_deployment: 128 if doi is None: 129 doi = findDOIInMigrationDocument(self._dataEntityMigration) 130 #collection_identifier = Moles3EPB.extractCollectionIdentifierByTitle(MD_CODE_MOLES2_CITATION, self.migrationSessions.molesSession) 131 #if collection_identifier.count()==1: 132 # ceda_observation.identifier.append(collection_identifier.first()) 133 134 deProcessor.assignDOI(ceda_observation, doi) 135 136 137 self.migrationSessions.molesSession.commit() 145 try: 146 self._processResultAccumulation(ceda_observation) 147 except Exception as ex: 148 pass 149 try: 150 self._processDOI(deploymentMigration, ceda_observation, deProcessor, single_deployment) 151 except Exception as ex: 152 pass 153 154 if not hasAtomDocumentSameHash(deploymentMigration): 155 doc_hash = getAtomDocumentHash(deploymentMigration.docStatus, deploymentMigration.docType, deploymentMigration.docOwner, deploymentMigration.docName) 156 MigrationEPB.updateMigrationObject(deploymentMigration, {'doc_hash': doc_hash}) 138 157 except NoAssociatedAuthor as ex: 139 158 raise ex 140 159 except Exception as ex: 141 self.migrationSessions.molesSession.rollback()142 self.migrationSessions.migrationSession.rollback()160 #self.migrationSessions.molesSession.rollback() 161 #self.migrationSessions.migrationSession.rollback() 143 162 raise MigrationObjectException(ex) 144 163 145 164 return ceda_observation 146 147 def _update(self):148 # ...and the data entity document has not changed149 if hasAtomDocumentSameHash(self._dataEntityMigration):150 return self._getObservationCollection()151 else:152 return self.updateObservationCollection()153 165 154 166 def process(self): … … 171 183 #retrieves the DataEntityMigration sorted by creation date 172 184 deploymentMigrations = MigrationEPB.getAllDeploymentsMigrationByDataEntitySortedByDate( \ 173 self._dataEntityMigration, deploymentsLinks, self.migrationSessions.migrationSession) 174 howManydm = deploymentMigrations.count() 185 self._dataEntityMigration, deploymentsLinks) 186 howManydm = 0 187 if deploymentMigrations: 188 howManydm = len(deploymentMigrations) 175 189 if howManydm == 0: 176 190 exs.append(NoAssociatedDeployments(self._dataEntityMigration)) … … 178 192 try: 179 193 ceda_observation = self._processDeploymentMigration(deploymentMigration, howManydm == 1) 180 cedaObservationCollection.member.append(ceda_observation) 194 195 #Check if a doi has been already assigned 196 cedaObservationCollection = Moles3EPB.loadAttributes(cedaObservationCollection, 'member') 197 member = cedaObservationCollection.member 198 if member and not (ceda_observation in member): 199 Moles3EPB.updateCedaObject(cedaObservationCollection, {'member': ceda_observation}) 200 181 201 except Exception as ex: 182 202 exs.append(ex) 183 203 except RuntimeError as er: 184 204 print er 185 self.migrationSessions.molesSession.commit()205 #self.migrationSessions.molesSession.commit() 186 206 187 207 return exs -
mauRepo/MolesManager/trunk/src/libs/migration/processor/deployment.py
r8286 r8323 60 60 log.addHandler(StreamHandler()) 61 61 log.setLevel(logging.INFO) 62 def __init__(self, dataEntityMigration, deploymentMigration , migrationSessions):62 def __init__(self, dataEntityMigration, deploymentMigration): 63 63 ''' 64 64 Initializes the class 65 65 @param dataEntityMigration: a DataEntityMigration instance 66 @param deploymentMigration: the DeploymentMigration instance 67 @param migrationSessions: a MigrationSessions instance 66 @param deploymentMigration: the DeploymentMigration instance 68 67 ''' 69 68 self._dataEntityMigration = dataEntityMigration … … 71 70 self._dataEntityHasSameHash = hasAtomDocumentSameHash(self._dataEntityMigration) and self._dataEntityMigration.doc_hash is not None 72 71 self._deploymentHasSameHash = hasAtomDocumentSameHash(self._deploymentMigration) and self._deploymentMigration.doc_hash is not None 73 74 self._migrationSessions = migrationSessions75 72 76 73 def _existsCEDAasPublisher(self): … … 211 208 if data_lineage is None: 212 209 raise NoDataLineage(self._dataEntityMigration) 213 Moles3EPB.addDataLineageToObservation(data_lineage, observation, self._migrationSessions.molesSession)210 observation.dataLineage = data_lineage 214 211 215 212 def _assignTitle(self, observation): … … 233 230 obsList = [] 234 231 for obs in links['OBS']: 235 observationStation = MigrationEPB.getDeploymentDataMigrationByName(self._deploymentMigration, obs + \ 236 '.atom', self._migrationSessions.migrationSession) 232 observationStation = MigrationEPB.getDeploymentDataMigrationByName(self._deploymentMigration, obs + '.atom') 237 233 obsList.append((extractTitle(observationStation), findSubTypeInDPT(observationStation))) 238 234 … … 250 246 if links.has_key('ACTIVITY'): 251 247 for link in links['ACTIVITY']: 252 activity = MigrationEPB.getDeploymentDataMigrationByName(self._deploymentMigration, link + '.atom', \ 253 self._migrationSessions.migrationSession) 248 activity = MigrationEPB.getDeploymentDataMigrationByName(self._deploymentMigration, link + '.atom') 254 249 255 250 projSubType = findSubTypeInDPT(activity) … … 268 263 269 264 def _assignDescription(self, observation): 270 summary = extractSummary(self._deploymentMigration, self._dataEntityMigration) 271 Moles3EPB.addDescriptionToObservation(summary, observation, self._migrationSessions.molesSession) 265 description = extractSummary(self._deploymentMigration, self._dataEntityMigration) 266 if description: 267 observation.description = description 272 268 273 269 def _assignQuality(self, observation): … … 288 284 289 285 def updateObservation(self): 290 ceda_observation = EPB.searchOrCreate(CEDA_Observation, self._migrationSessions.molesSession, \ 291 self._deploymentMigration.ceda_observation_id) 292 return ceda_observation 286 return EPB.searchOrCreate(CEDA_Observation, self._deploymentMigration.ceda_observation_id) 293 287 294 288 … … 351 345 if doi and doi.has_key('href'): 352 346 doi = doi['href'][22:] 347 348 #Check if a doi has been already assigned 349 observation = Moles3EPB.loadAttributes(observation, 'identifier') 350 obs_identifier = observation.identifier 351 if obs_identifier: 352 for ident in obs_identifier: 353 if ident.code == doi: 354 return 355 353 356 py_date = None 354 357 cited_responsible = createCI_ResponsibleParty(role=CI_RoleCode.cl_publisher, \ … … 367 370 ci_date = createCI_Date(CI_DateTypeCode.cl_publication, date = dt) 368 371 i_authority = createCI_Citation("DOI", date = ci_date) 369 identifier = createMD_Identifier(code = doi, authority=i_authority) 370 observation.identifier.append(identifier)372 identifier = createMD_Identifier(code = doi, authority=i_authority) 373 Moles3EPB.updateCedaObject(observation, {'identifier': identifier}) 371 374 DeploymentProcessor.log.info("DOI: %s" % (doi)) 372 375 … … 379 382 i_address = createCI_Address(deliveryPoint = ['British Atmospheric Data Centre, STFC Rutherford Appleton Laboratory'], \ 380 383 electronicMailAddress=['badc@rl.ac.uk'], postalCode='OX11 0QX', country='UK', city='Harwell Oxford') 381 i_phone = createCI_Telephone(voice=['+44(0)1235 446432']) 384 i_phone = createCI_Telephone(voice=['+44(0)1235 446432']) 382 385 contact = createCI_Contact(phone=i_phone, address=i_address, onlineResource=i_onlineResources) 383 386 party = createMO_Organization(name = "NERC - British Atmospheric Data Centre", contactInfo = [contact]) … … 491 494 def _create(self): 492 495 ceda_observation = CEDA_Observation() 496 self._assignKeywords(ceda_observation) 497 self._assignLineage(ceda_observation) 498 self._assignResult(ceda_observation) 499 self._assignPublisherCurator(ceda_observation) 500 493 501 ceda_observation.publicationState = MO_PublicationStateValue.cl_working 494 self._assignQuality(ceda_observation) 495 self._assignLineage(ceda_observation) 502 self._assignQuality(ceda_observation) 496 503 self._assignDescription(ceda_observation) 497 self._assignTitle(ceda_observation) 498 self._assignResult(ceda_observation) 504 self._assignTitle(ceda_observation) 499 505 self._assignGeographicExtent(ceda_observation) 500 506 #self._assignDOI(ceda_observation) 501 self._assignCreationDate(ceda_observation) 502 self._assignPublisherCurator(ceda_observation) 507 self._assignCreationDate(ceda_observation) 503 508 self._assignPhenomenonTime(ceda_observation) 504 509 self._assignPermission(ceda_observation) … … 508 513 self._assignUpdateFrequency(ceda_observation) 509 514 self._assignName(ceda_observation) 510 self._assignKeywords(ceda_observation) 511 Moles3EPB.addCedaObject(ceda_observation, self._migrationSessions.molesSession, True) 512 513 MigrationEPB.associateObservationToDeployment(self._deploymentMigration, ceda_observation.id, self._migrationSessions.migrationSession, True) 515 Moles3EPB.persistInstance(ceda_observation) 516 MigrationEPB.updateMigrationObject(self._deploymentMigration, {'ceda_observation_id': ceda_observation.id}) 514 517 515 518 #Adds the CedaGUID … … 517 520 ceda_guid.id = calculateHash(self._deploymentMigration.depl_id) 518 521 ceda_guid.ceda_observation = ceda_observation.id 519 Moles3EPB. addCedaObject(ceda_guid, self._migrationSessions.molesSession, True)522 Moles3EPB.persistInstance(ceda_guid) 520 523 DeploymentProcessor.log.info("GUID for this Observation: %s" % (ceda_guid.id)) 524 525 #process the CEDA_Observation.procedure 526 deploymentDataProcessor = DeploymentDataProcessor(self._deploymentMigration) 527 528 links = findLinksInDeployment(self._deploymentMigration) 529 procedure = deploymentDataProcessor.createProcess(links) 530 531 #Temporary commented because CEDA_Project.subProject is not correctly mapped to the DB 532 project = deploymentDataProcessor.createProject(links) 533 Moles3EPB.updateCedaObject(ceda_observation, {'procedure': procedure, 'inSupportOf': project}) 521 534 522 535 return ceda_observation 523 536 524 537 def _getObservation(self): 525 return EPB.search(CEDA_Observation, self._deploymentMigration.ceda_observation_id, self._migrationSessions.molesSession)538 return Moles3EPB.search(CEDA_Observation, self._deploymentMigration.ceda_observation_id) 526 539 527 540 def _update(self): … … 539 552 #... does not exist so create it 540 553 ceda_observation = self._create() 541 542 #process the CEDA_Observation.procedure 543 deploymentDataProcessor = DeploymentDataProcessor(self._deploymentMigration, self._migrationSessions) 544 links = findLinksInDeployment(self._deploymentMigration) 545 procedure = deploymentDataProcessor.createProcess(links) 546 547 #Temporary commented because CEDA_Project.subProject is not correctly mapped to the DB 548 project = deploymentDataProcessor.createProject(links) 549 if procedure: 550 ceda_observation.procedure = procedure 551 552 if project: 553 ceda_observation.inSupportOf = project 554 555 if procedure or project: 556 self._migrationSessions.molesSession.commit() 557 554 558 555 if not self._deploymentHasSameHash: 559 556 self._deploymentMigration.doc_hash = getAtomDocumentHashByMO(self._deploymentMigration) 560 557 self._commitMigration() 561 562 558 563 559 return ceda_observation -
mauRepo/MolesManager/trunk/src/libs/migration/processor/deployment_data.py
r8210 r8323 7 7 createCEDA_Processing, createCEDA_Instrument, createCEDA_Project,\ 8 8 findSummary, findDocumentationInMigrationDocument, createCI_Citation,\ 9 createMO_OnlineResource 9 createMO_OnlineResource, hasAtomDocumentSameHash 10 10 from libs.migration.MigrationEPB import MigrationEPB 11 11 from MolesManager.moles3epb import Moles3EPB … … 14 14 class DeploymentDataProcessor(object): 15 15 16 def __init__(self, deploymentMigration, migrationSessions): 17 self._migrationSessions = migrationSessions 16 def __init__(self, deploymentMigration): 18 17 self._deploymentMigration = deploymentMigration 19 18 20 19 def _commitDeploymentMigration(self, associateWithCedaObservation, dataProductionTool, dataProductionToolField): 21 Moles3EPB. addCedaObject(associateWithCedaObservation, self._migrationSessions.molesSession, True)22 setattr(dataProductionTool, dataProductionToolField, associateWithCedaObservation.id)23 self._migrationSessions.migrationSession.commit()20 Moles3EPB.persistInstance(associateWithCedaObservation) 21 MigrationEPB.updateMigrationObject(dataProductionTool, {dataProductionToolField: associateWithCedaObservation.id}) 22 24 23 25 24 def createProject(self, links): … … 27 26 if links.has_key('ACTIVITY'): 28 27 for link in links['ACTIVITY']: 29 activity = MigrationEPB.getDeploymentDataMigrationByName(self._deploymentMigration, link + '.atom', \ 30 self._migrationSessions.migrationSession) 28 activity = MigrationEPB.getDeploymentDataMigrationByName(self._deploymentMigration, link + '.atom') 31 29 i_abstract = findSummary(activity) 32 30 doc_link = findDocumentationInMigrationDocument(activity) … … 47 45 hasCedaAcquisition = False 48 46 for dpt in links['DPT']: 49 dataProductionTool = MigrationEPB.getDeploymentDataMigrationByName(self._deploymentMigration, dpt + \ 50 '.atom', self._migrationSessions.migrationSession) 47 dataProductionTool = MigrationEPB.getDeploymentDataMigrationByName(self._deploymentMigration, dpt + '.atom') 51 48 52 49 #has the document changed? … … 57 54 58 55 subType = findSubTypeInDPT(dataProductionTool) 56 59 57 if subType == 'model': 60 associateWithCedaObservation = createCEDA_Processing() 61 self._commitDeploymentMigration(associateWithCedaObservation, dataProductionTool, 'ceda_processing_id') 62 if not (hasCedaComposite or hasCedaProcessing): 63 hasCedaProcessing = True 64 hasCedaComposite = False 65 hasCedaAcquisition = False 66 else: 67 associateWithCedaObservation = createCEDA_Instrument() 68 self._commitDeploymentMigration(associateWithCedaObservation, dataProductionTool, 'ceda_instrument_id') 58 #MigrationEPB.loadAttributes(dataProductionTool, 'ceda_processing_id') 59 if dataProductionTool.ceda_processing_id is None: 60 associateWithCedaObservation = createCEDA_Processing() 61 self._commitDeploymentMigration(associateWithCedaObservation, dataProductionTool, 'ceda_processing_id') 62 if not (hasCedaComposite or hasCedaProcessing): 63 hasCedaProcessing = True 64 hasCedaComposite = False 65 hasCedaAcquisition = False 66 else: 67 #should update 68 pass 69 else: 70 #MigrationEPB.loadAttributes(dataProductionTool, 'ceda_instrument_id') 71 if dataProductionTool.ceda_instrument_id is None: 72 associateWithCedaObservation = createCEDA_Instrument() 73 self._commitDeploymentMigration(associateWithCedaObservation, dataProductionTool, 'ceda_instrument_id') 74 else: 75 #should update 76 pass 69 77 70 78 71 79 #if not a DPT.subType == 'model' then.... 72 80 for obs in links['OBS']: 73 observationStation = MigrationEPB.getDeploymentDataMigrationByName(self._deploymentMigration, obs + \ 74 '.atom', self._migrationSessions.migrationSession) 81 observationStation = MigrationEPB.getDeploymentDataMigrationByName(self._deploymentMigration, obs + '.atom') 75 82 76 83 #has the document changed? … … 82 89 subType = findSubTypeInDPT(observationStation) 83 90 if subType == 'satellite': 91 #MigrationEPB.loadAttributes(dataProductionTool, 'ceda_compositeprocess_id') 92 if dataProductionTool.ceda_compositeprocess_id is None: 84 93 associateWithCedaObservation = createCEDA_Processing() 85 94 self._commitDeploymentMigration(associateWithCedaObservation, dataProductionTool, 'ceda_compositeprocess_id') … … 87 96 hasCedaProcessing = True 88 97 hasCedaComposite = False 89 hasCedaAcquisition = False 98 hasCedaAcquisition = False 99 else: 100 #should update 101 pass 90 102 else: 91 pass 92 ''' 93 associateWithCedaObservation = createCEDA_Acquisition() 94 self._commitDeploymentMigration(associateWithCedaObservation, dataProductionTool, 'ceda_acquisition_id') 95 ''' 96 if not (hasCedaComposite or hasCedaProcessing or hasCedaAcquisition): 97 hasCedaProcessing = True 98 hasCedaComposite = False 99 hasCedaAcquisition = True 103 #MigrationEPB.loadAttributes(dataProductionTool, 'ceda_acquisition_id') 104 if dataProductionTool.ceda_acquisition_id is None: 105 pass 106 ''' 107 associateWithCedaObservation = createCEDA_Acquisition() 108 self._commitDeploymentMigration(associateWithCedaObservation, dataProductionTool, 'ceda_acquisition_id') 109 ''' 110 else: 111 #should update 112 pass 113 if not (hasCedaComposite or hasCedaProcessing or hasCedaAcquisition): 114 hasCedaProcessing = True 115 hasCedaComposite = False 116 hasCedaAcquisition = True 100 117 return associateWithCedaObservation 101 118 -
mauRepo/MolesManager/trunk/src/libs/migration/processor/loadResources.py
r8180 r8323 18 18 """ 19 19 def __init__(self): 20 self._migrationSession = MigrationEPB.getNewMigrationSession()20 pass 21 21 22 22 def process(self): … … 29 29 ex.extend(self._loadCollections(status, DT_DEPLOYMENTS, DeploymentsMigration)) 30 30 ex.extend(self._loadCollections(status, DT_DEPLOYMENT_DATA, DeploymentDataMigration)) 31 32 self._migrationSession.close()33 31 return ex 34 32 35 def updateMigrationDoc(self, migrationClass, doc_id, docHash, docCreation): 36 """ 37 Updates an existing migration document 38 @param migrationClass: the migration class associated with the document 39 @param doc_id: the document id 40 @param docHash: the document hashCode calculated by libs.migration.processor.commons.calculateHash 41 @param docCreation:the creation date 42 @return: True if the document exists, False otherwise 43 """ 44 coll = MigrationEPB.search(migrationClass, doc_id, self._migrationSession) 45 if coll != None: 46 #... and has the same hash nothing has changed 47 if coll.doc_hash == docHash: 48 return True 49 else: 50 coll.doc_creation = stringToTimestamp(docCreation) 51 coll.doc_hash = docHash 52 self._migrationSession.commit() 53 return True 54 return False 55 56 57 def createMigrationDoc(self, migrationClass, doc_id, docName, docOwner, docStatus, docHash, docCreation): 58 coll = None 33 def _createMigrationObject(self, migrationClass, doc_id, docName, docOwner, docStatus, docHash, docCreation): 34 migrationObject = None 59 35 if migrationClass == DeploymentsMigration: 60 coll= DeploymentsMigration()61 coll.depl_id = doc_id36 migrationObject = DeploymentsMigration() 37 migrationObject.depl_id = doc_id 62 38 elif migrationClass == DataEntityMigration: 63 coll= DataEntityMigration()64 coll.data_ent_id = doc_id39 migrationObject = DataEntityMigration() 40 migrationObject.data_ent_id = doc_id 65 41 elif migrationClass == DeploymentDataMigration: 66 coll = DeploymentDataMigration() 67 coll.deployment_data_id = doc_id 68 69 self._processNotExistingDoc(coll, docName, docOwner, docStatus, docHash, docCreation) 42 migrationObject = DeploymentDataMigration() 43 migrationObject.deployment_data_id = doc_id 44 45 if migrationObject is None: 46 raise Exception("migrationObject is None") 47 migrationObject.doc_name = docName 48 migrationObject.doc_owner = docOwner 49 migrationObject.doc_status = docStatus 50 migrationObject.doc_hash = docHash 51 migrationObject.doc_creation = stringToTimestamp(docCreation) 52 try: 53 MigrationEPB.persistInstance(migrationObject) 54 except Exception as e: 55 print e 70 56 71 57 def _loadCollections(self, docStatus, docType, migrationClass): … … 101 87 @param docOwner: a common.docOwner item 102 88 """ 89 103 90 docHash = getAtomDocumentHash(docStatus, docType, docOwner, docName) 104 91 xmlDocument = getAtomDocumentAsElementtree(docStatus, docType, docOwner, docName) 105 106 docCreation = self._extractCreationDate(xmlDocument) 107 doc_id = findID(xmlDocument) 92 doc_id = findID(xmlDocument) 93 94 #The document has been already classified 95 if doc_id is None or MigrationEPB.search(migrationClass, doc_id) is not None: 96 return 108 97 109 if self.updateMigrationDoc(migrationClass, doc_id, docHash, docCreation): 110 return 111 112 self.createMigrationDoc(migrationClass, doc_id, docName, docOwner, docStatus, docHash, docCreation) 113 114 def _processNotExistingDoc(self, migrationObject, docName, docOwner, docStatus, docHash, docCreation): 115 if migrationObject is None: 116 raise Exception("migrationObject is None") 117 migrationObject.doc_name = docName 118 migrationObject.doc_owner = docOwner 119 migrationObject.doc_status = docStatus 120 migrationObject.doc_hash = docHash 121 migrationObject.doc_creation = stringToTimestamp(docCreation) 122 MigrationEPB.addMigration(migrationObject, self._migrationSession) 123 self._migrationSession.commit() 98 99 #The document shall be classified 100 docCreation = self._extractCreationDate(xmlDocument) 101 self._createMigrationObject(migrationClass, doc_id, docName, docOwner, docStatus, docHash, docCreation) 124 102 125 103 -
mauRepo/MolesManager/trunk/src/libs/migration/processor/migrationProcess.py
r8258 r8323 6 6 from libs.migration.processor.dataEntity import DataEntityProcessor 7 7 from libs.migration.MigrationEPB import MigrationEPB 8 from MolesManager.moles3epb import Moles3EPB9 from libs.migration.migration_sessions import MigrationSessions10 8 import logging 11 9 from logging import StreamHandler 12 from libs.migration.exception.exceptions import migrationObjectDescription13 10 14 11 class MigrationProcess(): … … 21 18 """ 22 19 def __init__(self): 23 ''' 24 Because this is usually the starting point of a migration 25 process, it initialize the necessaries DB sessions 26 ''' 27 self.migrationSessions = MigrationSessions(MigrationEPB.getNewMigrationSession(), Moles3EPB.getNewMolesSession()) 20 pass 28 21 29 22 def process(self, dataEntityObject = None): … … 38 31 exs = [] 39 32 data_ents = [] 40 33 data_ents.append(MigrationEPB.getDataEntityMigrationbyPath(dataEntityObject)) 34 """ 41 35 #Does the migrationObject exists? 42 36 if dataEntityObject: … … 44 38 else: #then process all the DataEntities 45 39 data_ents = MigrationEPB.getAllDataEntityMigration(session = self.migrationSessions.migrationSession) 40 """ 46 41 47 42 #loops over the data entities 48 for dataEntityMigration in [f for f in data_ents if f != None]: 49 MigrationProcess.log.info("Start processing dataEntity %s" % (migrationObjectDescription(dataEntityMigration))) 50 dep = DataEntityProcessor(dataEntityMigration, self.migrationSessions) 43 for dataEntityMigration in [f for f in data_ents if f != None]: 44 dep = DataEntityProcessor(dataEntityMigration) 51 45 exs.extend(dep.process()) 52 46 MigrationProcess.log.info("Done") -
mauRepo/MolesManager/trunk/src/libs/migration/tests/dropAllTables.sql
r8254 r8323 556 556 DROP TABLE IF EXISTS deployments_migration CASCADE; 557 557 DROP TABLE IF EXISTS cv_solidvaluepair_cv_discretecoverage CASCADE; 558 DROP TABLE IF EXISTS ceda_gui CASCADE; 558 DROP TABLE IF EXISTS ceda_guid CASCADE; 559 560 559 561 560 562 DROP SEQUENCE IF EXISTS angle_id_seq CASCADE; -
mauRepo/MolesManager/trunk/src/libs/migration/tests/migrationprocess.py
r8254 r8323 23 23 24 24 def setUp(self): 25 #self._dropAllTables()26 27 25 migrationDB = DbManager(MIGRATION_DB_CONNECTION, MIGRATION_DB_SCRIPT, sql_echo=True) 26 #migrationDB = DbManager(MIGRATION_DB_CONNECTION, MIGRATION_DB_SCRIPT) 28 27 MigrationEPB.overrrideDBManager(migrationDB) 29 28 30 29 molesDB = DbManager(MOLES3_DB_CONNECTION, MOLES3_DB_SCRIPT, sql_echo=True, session_event_manager=EVENTS_DB) 30 #molesDB = DbManager(MOLES3_DB_CONNECTION, MOLES3_DB_SCRIPT, session_event_manager=EVENTS_DB) 31 31 Moles3EPB.overrrideDBManager(molesDB) 32 32 33 infoDB = DbManager(INFO_DB_CONNECTION, sql_echo=True) 33 #infoDB = DbManager(INFO_DB_CONNECTION, sql_echo=True) 34 infoDB = DbManager(INFO_DB_CONNECTION) 34 35 InfodbEPB.overrrideDBManager(infoDB) 35 36 36 37 lr = LoadResources() 37 ex = [] 38 #ex = lr.process() 38 ex = lr.process() 39 39 for e in ex: 40 40 print e 41 42 41 43 def testMigrationProcess(self): 42 def testMigrationProcess(self): 44 43 mp = MigrationProcess() 45 44 46 45 dataEntity = MigrationObject() 47 dataEntity.doc_status = DS_pUBLISHED48 46 dataEntity.doc_owner = DO_BADC 47 48 #dataEntity.doc_status = DS_pUBLISHED 49 49 #dataEntity.doc_name = 'dataent_csip.atom' 50 50 51 dataEntity.doc_status = DS_PUBLISHED 52 dataEntity.doc_name = 'DE_095e8da2-cf02-11e0-8b7a-00e081470265.atom' 51 #Has a DOI 52 #dataEntity.doc_status = DS_PUBLISHED 53 #dataEntity.doc_name = 'DE_095e8da2-cf02-11e0-8b7a-00e081470265.atom' 54 55 #Has 3 DOI 56 dataEntity.doc_status = DS_pUBLISHED 57 dataEntity.doc_name = 'dataent_chobs.atom' 58 53 59 54 60 ex = mp.process(dataEntity) 61 #ex = [] 55 62 #ex = mp.process() 56 63 for e in ex: 57 64 print e 65 #self._dropAllTables() 58 66 59 67 def _dropAllTables(self): 60 molesDB = DbManager(MOLES3_DB_CONNECTION, None)61 Moles3EPB.overrrideDBManager(molesDB)62 68 session = Moles3EPB.getNewMolesSession() 63 69 f = open('dropAllTables.sql', 'r') … … 65 71 stripped = line.strip() 66 72 if len(stripped) > 0: 67 Moles3EPB.executeNative(line.strip() )73 Moles3EPB.executeNative(line.strip(), session) 68 74 session.commit() 69 session.close() 70 molesDB = None 75 session.close() 71 76 72 77 -
mauRepo/MolesManager/trunk/src/libs/migration/tests/moles3epbmothods.py
r8295 r8323 12 12 from logging import StreamHandler 13 13 from test_utils import createObservationCollection, createObservation, createProject 14 from testconfig import TESTMOLES3_DB_CONNECTION15 14 from ea_model.ceda_metadatamodel.ceda_observationcollection.ceda_observationcollection import CEDA_ObservationCollection 16 15 from ea_model.moles3_4.observationcollection.mo_observationcollection import MO_ObservationCollection … … 19 18 from ea_model.moles3_4.observation.mo_observation import MO_Observation 20 19 from ea_model.moles3_4.utilities.mo_rolevalue import MO_RoleValue 20 from sqlalchemy.orm import subqueryload_all, subqueryload, joinedload,\ 21 joinedload_all, contains_eager 22 from ea_model.iso_19115_2006_metadata_corrigendum.reference_system_information.md_identifier import MD_Identifier 23 from sqlalchemy.orm.util import aliased 21 24 22 25 … … 41 44 #self.checkExtractProjectObservationCollections() 42 45 #self.checkExtractObservationCollectionsForObservation() 43 self.checkObservationAuthors() 46 #self.checkObservationAuthors() 47 self.checkSubqueryload() 48 49 def checkSubqueryload(self): 50 eagerloadthese = ['identifier', 'resultTime', 'relatedParty.party'] 51 obs_id = 1 52 ret = Moles3EPB.searchSelectiveEager(CEDA_Observation, obs_id, eagerloadthese) 53 print ret.relatedParty[0].party 54 55 def _drillData(self, obj, attrs): 56 """ 57 @param obj: its an instance already living inside an SQLAlchemy session 58 @param attrs: a list of attributes owned by the obj parameter. It accepts dot separated attributes as childern of obj attributes. 59 """ 60 for item in attrs: 61 attr = item.split('.')[0] 62 if isinstance(obj, list): 63 for element in obj: 64 self._drillData(element, [item]) 65 else: 66 if hasattr(obj, attr): 67 nobj = getattr(obj, attr) 68 if len(attr) != len(item): 69 self._drillData(nobj, [item[len(attr) + 1:]]) 70 44 71 45 72 def checkObservationAuthors(self): … … 62 89 63 90 self.logging.info('Stores an new CEDA_ObservationCollection') 64 Moles3EPB.addCedaObject(observationCollection, session) 65 session.commit() 91 Moles3EPB.persistInstance(observationCollection) 66 92 67 93 my_identifier = Moles3EPB.extractCollectionIdentifierByTitle('test_title', session) … … 79 105 80 106 self.logging.info('Stores an new CEDA_Observation') 81 Moles3EPB.addCedaObject(observation, session) 82 session.commit() 107 Moles3EPB.persistInstance(observation) 83 108 84 109 obs = Moles3EPB.extractObservationByTitleKeywords('test_code', session) … … 162 187 def _create(self, objects, session): 163 188 for item in objects: 164 Moles3EPB.addCedaObject(item, session) 165 session.commit() 189 Moles3EPB.persistInstance(item) 166 190 167 191 def _delete(self, objects, session): -
mauRepo/MolesManager/trunk/src/libs/migration/tests/moles3epbtests.py
r8236 r8323 48 48 #self.checkTM_Instant() 49 49 #self.checkTM_Position() 50 self.checkCI_Responsibility() 50 self.checkCI_Responsibility() 51 51 52 52 def checkEmptyObservationCollection(self): … … 55 55 56 56 self.logging.info('Stores an empty new CEDA_ObservationCollection') 57 Moles3EPB. addCedaObject(observationCollection, session)57 Moles3EPB.persistInstance(observationCollection, session) 58 58 session.commit() 59 59 … … 72 72 73 73 self.logging.info('Tries to stores an empty new %s' % (observation.__class__.__name__)) 74 Moles3EPB. addCedaObject(observation, session)74 Moles3EPB.persistInstance(observation, session) 75 75 self.assertRaises(IntegrityError, session.commit) 76 76 self.logging.info('Catches that Ceda_Observation.dataLineage is missing') … … 79 79 self.logging.info('Adds the dataLineage and tries to store it again') 80 80 observation.dataLineage = "aNewDataLineage" 81 Moles3EPB. addCedaObject(observation, session)81 Moles3EPB.persistInstance(observation, session) 82 82 session.commit() 83 83 … … 96 96 97 97 self.logging.info('Stores an empty new CEDA_ObservationCollection') 98 Moles3EPB. addCedaObject(observationCollection, session)98 Moles3EPB.persistInstance(observationCollection, session) 99 99 session.commit() 100 100 … … 122 122 123 123 self.logging.info('Tries to stores a new %s' % (observation.__class__.__name__)) 124 Moles3EPB. addCedaObject(observation, session)124 Moles3EPB.persistInstance(observation, session) 125 125 session.commit() 126 126 … … 141 141 142 142 self.logging.info('Stores an empty new CEDA_ObservationCollection') 143 Moles3EPB. addCedaObject(ci_date, session)143 Moles3EPB.persistInstance(ci_date, session) 144 144 session.commit() 145 145 … … 158 158 159 159 self.logging.info('Stores an empty new TM_Position') 160 Moles3EPB. addCedaObject(tm_position, session)160 Moles3EPB.persistInstance(tm_position, session) 161 161 session.commit() 162 162 … … 176 176 177 177 self.logging.info('Stores an empty new TM_Instant') 178 Moles3EPB. addCedaObject(tm_instant, session)178 Moles3EPB.persistInstance(tm_instant, session) 179 179 session.commit() 180 180 … … 193 193 194 194 self.logging.info('Stores an empty new CEDA_Project') 195 Moles3EPB. addCedaObject(project, session)195 Moles3EPB.persistInstance(project, session) 196 196 session.commit() 197 197 … … 211 211 responsibility = createCI_Responsibility() 212 212 self.logging.info('Stores an empty new CEDA_Project') 213 Moles3EPB. addCedaObject(responsibility, session)213 Moles3EPB.persistInstance(responsibility, session) 214 214 session.commit() 215 215 session.expunge(responsibility) -
mauRepo/MolesManager/trunk/src/libs/migration/tests/testconfig.py
r8254 r8323 8 8 INFO_DB_CONNECTION = 'postgresql://cedainfo:ler239b@bora.badc.rl.ac.uk:5432/cedainfo' 9 9 10 TESTMIGRATION_DB_CONNECTION = 'postgresql://badc:rotyn217m@neptune.badc.rl.ac.uk:5432/ceda_moles3_test'11 TESTMOLES3_DB_CONNECTION = 'postgresql://badc:rotyn217m@neptune.badc.rl.ac.uk:5432/ceda_moles3_test'10 #TESTMIGRATION_DB_CONNECTION = 'postgresql://badc:rotyn217m@neptune.badc.rl.ac.uk:5432/ceda_moles3_test' 11 #TESTMOLES3_DB_CONNECTION = 'postgresql://badc:rotyn217m@neptune.badc.rl.ac.uk:5432/ceda_moles3_test' 12 12 13 13 from libs.migration.db.migrationTables import doTables as doMigration
Note: See TracChangeset
for help on using the changeset viewer.