source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 6639

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@6639
Revision 6639, 21.2 KB checked in by sdonegan, 10 years ago (diff)

Latest code that updates either NDG3 style or MEDIn style DB with all correct fields from MEDIN ISO updated

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging, datetime
7#from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, isoDataModel,transformationDir, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33        self.isoDataModel = isoDataModel       
34        self.discovery_dir = transformationDir
35       
36       
37   
38
39    def getRecordID(self):
40        '''
41        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
42        returns '-1'
43        @return: id of record, if it exists, '-1' if it doesn't
44        '''
45        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
46        if self._record.db_id is not None and self._record.db_id > 0:
47            logging.info("Already looked up record - ID is " + str(self._record.db_id))
48            return self._record.db_id
49       
50        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
51            self._record.discovery_id + "';"
52        dbId = self.pgc.runSQLCommand(sql)
53        if dbId:
54            self._record.db_id = dbId[0][0]
55           
56    def getRecordID_using_OriginalDocumentFilename(self):
57        '''
58        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
59        returns '-1'
60        @return: id of record, if it exists, '-1' if it doesn't
61        '''
62        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
63       
64        '''if self._record.db_id is not None and self._record.db_id > 0:
65            logging.info("Already looked up record - ID is " + str(self._record.db_id))
66            return self._record.db_id'''
67       
68        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
69       
70        dbId = self.pgc.runSQLCommand(sql)
71       
72        if dbId:
73            self._record.db_id = dbId[0][0]
74           
75           
76    def getDiscoveryID_using_OriginalDocumentFilename(self):
77        '''
78        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
79        returns '-1'
80        @return: id of record, if it exists, '-1' if it doesn't
81        '''
82        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
83       
84        '''if self._record.db_id is not None and self._record.db_id > 0:
85            logging.info("Already looked up record - ID is " + str(self._record.db_id))
86            return self._record.db_id'''
87       
88        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
89       
90        dbId = self.pgc.runSQLCommand(sql)
91       
92        if dbId:
93            self._record.discovery_id = dbId[0][0]
94           
95       
96    def getTemporalDataId(self):
97       
98        '''
99        Looks up the temporal data id using the original document id
100        '''
101       
102        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
103       
104        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
105
106       
107       
108
109    def createOrUpdateRecord(self):
110        '''
111        Looks up a record in the DB; if it finds it, update it, otherwise create it
112        @return result: True if record created/updated, false if otherwise
113        '''
114       
115        self.getRecordID()
116        returnCode=0 # 0 if failed, 1 if update, 2 if create
117         
118        if self._record.db_id:               
119            if self.updateRecord():             
120                returnCode = 1
121        else:
122                #create the record!         
123            if self.createRecord():
124                returnCode = 2
125               
126        return returnCode
127           
128
129           
130    def createRecord(self):
131        '''
132        Create a record in the postgres DB based on the specified PostgresRecord object
133        @return result: True if record created, false if otherwise
134        '''
135        logging.info("Creating a new record in the DB for the metada document")
136        # firstly store the actual documents contained by the record - i.e. the original doc +
137        # the various transforms required
138        self._insertOriginalRecord()
139        self._insertMetadataRecords()
140       
141        # Now add the spatiotemporal data
142        self._insertSpatioTemporalData()
143       
144        #if iso original metadata format, add in additional fields       
145        self._updateISOcolumns()
146       
147        logging.info("New record created")
148        return True
149       
150       
151   
152    def _updateISOcolumns(self):
153        '''
154        Method to control the update of the extra ISO columns required for full MEDIN ingest
155        '''
156       
157        dbColumns = self.isoDataModel.mapIsoFieldsToDBcolumns()
158       
159        for isoData in dbColumns.keys():
160               
161                logging.info("Attempting to update: " + isoData)
162               
163                columnValue = getattr(self.isoDataModel,isoData)
164                columnName = dbColumns[isoData]
165               
166                if columnValue is not None:
167                        if self._updateDataBaseColumn(self._record.shortFilename, columnName, columnValue):
168                                logging.info("Successfully updated " + isoData)
169                        else:
170                                logging.warn("Could not update " + isoData)
171                else:
172                        logging.info(isoData + " is None - skipping")
173               
174               
175               
176   
177    '''
178    Method to update individual columns within the Discovery database based on unique id (original_document_filename,
179    column name and value to be inserted into that column
180    '''
181    def _updateDataBaseColumn(self,original_document_filename,columnName,columnValue):
182       
183        report = False
184       
185        logging.info("Updating " + columnName + " for record: " + original_document_filename)
186       
187        #get the matching dictionary of postgres datatypes defined in the iso model
188        postgresDataType = self.isoDataModel.mapDBcolumnsToIsoDataTypes()[columnName]
189       
190        #construct the actual sql command here - easier to handle than using sql function (& to adapt!)
191        #remember doing it on a 1:1 basis here - convert raw multi list to space delimited value if text of tsVector
192        if (postgresDataType == 'tsvector') or (postgresDataType == 'text'):
193               
194                logging.info("Data type is text or tsvector!")
195               
196                if len(columnValue[0]) == 0:
197                        logging.info("No data to extract - no point continuing!")
198                        return False
199               
200                elif len(columnValue[0]) > 1:
201                        newColVal = ""
202                        cnt = 0
203                        for val in columnValue[0]:
204                                if cnt == 0:
205                                        newColVal = newColVal + val
206                                else:
207                                        newColVal = newColVal + ";" + val
208                               
209                                cnt = 1
210                               
211                else:
212                        newColVal = columnValue[0][0]
213                       
214        else:
215                logging.info("Data type not text or vector!")
216               
217                if len(columnValue[0]) > 1:
218                        logging.warn("NOTE: " + columnName + " is attempting to add multiple values - just take first for now!")
219                        newColVal = columnValue[0][0]
220                else:
221                        newColVal = columnValue[0][0]
222               
223                #TODO: at some stage add some checking of timestamps etc
224       
225        #build the sql query
226        #i.e. update original_document set original_format_version = 'dooby' where original_document_filename = 'dooby.ac.uk.shgfhf.xml'
227        if postgresDataType == 'tsvector':
228                sqlCmd = "update original_document set " + columnName + " = to_tsvector('english','" + newColVal + "') where original_document_filename = '" + original_document_filename + "';"
229       
230        else:
231                sqlCmd = "update original_document set " + columnName + " = '" + newColVal + "' where original_document_filename = '" + original_document_filename + "';"
232       
233        #submit the sqlQuery
234        try:
235                sqlCmd = sqlCmd.replace("'null'", "null")
236                sqlCmd = sqlCmd.replace("'None'", "null")
237                sqlCmd = sqlCmd.replace("None", "null")
238               
239                self.pgc.runSQLCommand(sqlCmd)
240               
241                report = True
242        except:
243                logging.error("Could not submit query for: " + sqlCmd)
244               
245       
246        return report
247       
248       
249
250    def updateRecord(self):
251        '''
252        Update a record in the postgres DB based on the specified PostgresRecord object
253        @return result: True if record updated, false if otherwise
254        '''
255        logging.info("Record already existing in DB - performing updates")
256        result = False
257       
258        # firstly, check the document is actually new - i.e. not just a repeat of the same data
259        if self._checkIsUpdatedRecord():
260           
261            # firstly, update the original record
262            self._updateOriginalRecord()
263           
264            # now update the actual documents contained by the record - i.e. the original doc +
265            # the various transforms required
266            self._updateMetadataRecords()
267           
268            # If doing an update of an existing record, clear out existing spatiotemporal
269            # data, rather than updating it, to keep things simple
270            logging.info("Clearing out existing data for record - to allow clean update")
271           
272            self._deleteSpatioTemporalData()
273            self._insertSpatioTemporalData()
274            result = True
275           
276            #if iso original metadata format, add in additional fields
277            self._updateISOcolumns()
278       
279        logging.info("Finish processing document...")
280       
281        return result
282       
283       
284       
285    def _checkIsUpdatedRecord(self):
286        '''
287        Looks up an existing record and checks it is not identical to the new record; if it is
288        incremement the harvest_count value and don't process again
289        @return: True if doc contains changes, False otherwise
290        '''
291        logging.info("Checking the updated document actually contains changes")
292
293        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
294            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
295        results = self.pgc.runSQLCommand(sql)
296
297        # NB, if the document is not identical, the sql command will not find anything
298        if not results:
299            logging.info("Ingested document is different to that in the current DB")
300            # get the current SCN
301            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
302                    str(self._record.db_id) + ";"
303            results = self.pgc.runSQLCommand(sql)
304            self._record.scn = results[0][0]
305            return True
306           
307        count = results[0][0]
308
309        # get current system change number
310        scn = results[0][1]
311        self._record.scn = scn
312        logging.info("Ingested document is identical to document currently in DB - " + \
313                     "incrementing harvest_count")
314        count += 1
315        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
316            " WHERE original_document_id = " + str(self._record.db_id)
317        self.pgc.runSQLCommand(sql)
318        return False
319
320
321    def _deleteSpatioTemporalData(self):
322        '''
323        Delete all existing spatiotemporal data for the current record
324        - NB, the delete is set to cascade from the linking table so only need to
325        delete entries from there
326        '''
327        logging.info("Deleting existing spatiotemporal data for record")
328        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
329            str(self._record.db_id) + ";"     
330
331        self.pgc.runSQLCommand(sqlCmd)
332        logging.info("Spatiotemporal data deleted successfully")
333       
334
335
336    def _insertSpatioTemporalRow(self, coords, timeRange):
337        '''
338        Create a single row record in the postgres DB based on the
339        input data.  Updated to operate with dictionary methods for coords and timeRange as part of MEDIN ISO upgrade
340        @param coords: a Coords object representing NSWE coords
341        @param timeRange: a TimeRange object representing a start/end date
342        '''
343        logging.info("Adding spatiotemporal row to DB")
344       
345        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
346            str(coords['north']) + "', '" + str(coords['south']) + "', '" + str(coords['west']) + "', '" + \
347            str(coords['east']) + "', '" + timeRange['start'] + "', '" + timeRange['end'] + "');"
348           
349        # fix any null strings
350        sqlCmd = sqlCmd.replace("'null'", "null")
351               
352        self.pgc.runSQLCommand(sqlCmd)
353        logging.info("Spatiotemporal row added successfully")
354       
355       
356   
357    def _insertSpatioTemporalData(self):
358        '''
359        Create a record in the postgres DB based on the spatiotemporal data
360        specified in the PostgresRecord object
361       
362        Note updated to work with ISO metadata object where coords and times already defined as individual items.
363       
364        '''
365        logging.info("Adding spatiotemporal data to DB record")
366       
367        # Work out the relationship between the spatial and temporal data and handle appropriately
368       
369       
370        TimeRange = self._record.parseTemporalInfo(self._record.datasetTemporalData)
371        SpatialCoords = self._record.parseSpatialInfo(self._record.datasetSpatialData)
372       
373               
374        if (len(TimeRange) == 0) and (len(SpatialCoords) == 0):
375                logging.info("No spatiotemporal data found for record - skipping")
376                return
377       
378       
379        # if both arrays of data are the same length, assume they map together       
380        i = 0
381        if len(TimeRange) == len(SpatialCoords):
382            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
383            for times in TimeRange:
384                self._insertSpatioTemporalRow(SpatialCoords[i], times)
385                i += 1
386
387        # otherwise, map everything to everything
388        else:
389            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
390            for times in TimeRange:
391                for coords in SpatialCoords:
392                   
393                    self._insertSpatioTemporalRow(coords, times)
394                   
395        logging.info("Spatiotemporal data added to DB record")
396       
397       
398   
399    def _insertOriginalRecord(self):
400        '''
401        Insert the original metadata doc into the postgres DB
402        '''
403        logging.info("Inserting new original document in Postgres DB")
404       
405       
406        ''' ndg3 style command
407        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
408            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
409            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
410            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
411            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
412                   
413        '''
414       
415       
416        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
417            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
418            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
419            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
420            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + \
421            self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
422       
423       
424        #sort out any nulls..
425        sqlCmd = sqlCmd.replace("'NULL'","NULL")
426       
427        #sort out any Nones
428        sqlCmd = sqlCmd.replace("'None'","NULL")
429       
430        id = self.pgc.runSQLCommand(sqlCmd)
431        if len(id) == 0:
432            raise ValueError, 'No PK ID returned from INSERT to DB'
433       
434        self._record.db_id = id[0][0] 
435       
436        logging.info("Original document inserted in Postgres DB")
437           
438   
439    def deleteOriginalRecord(self):
440        '''
441        Delete the original metadata doc from the postgres DB
442        '''
443        logging.info("Deleting original document from Postgres DB")
444        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
445
446        self.pgc.runSQLCommand(sqlCmd)
447        logging.info("Original document deleted from Postgres DB")
448       
449   
450    def _updateOriginalRecord(self):
451        '''
452        Update the original doc in the postgres DB
453        '''
454        logging.info("Updating original document in Postgres DB")
455        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
456            self._record.shortFilename + "', '" + \
457            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
458            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
459            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
460            str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + \
461            "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + \
462            self._record.datasetEndNom + "');" 
463           
464       
465        #sort out any NULL values"
466        sqlCmd = sqlCmd.replace("'NULL'","NULL")
467       
468        #sort out any Nones
469        sqlCmd = sqlCmd.replace("'None'","NULL")       
470       
471        logging.info("Submitting SQL command")
472        #logging.info("SQl command:   " + sqlCmd)         
473       
474        self.pgc.runSQLCommand(sqlCmd)
475       
476        # increment scn - saves doing an additional db call
477        self._record.scn += 1
478       
479        logging.info("Original document updated in Postgres DB")
480   
481           
482    def _insertMetadataRecords(self):
483        '''
484        Insert the metadata docs into the postgres DB
485        '''
486        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
487       
488       
489        if self._record.db_id is None:
490            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
491            return
492       
493        for docType, doc in self._record.getAllDocs(self.discovery_dir):
494           
495            #if docType is original input xml then put that in rather than some lossy transformed guff.
496            if docType == self._record.docType:
497                doc = self._record.originalXMLdoc
498           
499                                 
500            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
501                "original_document_id, transformed_format, " \
502                "transformed_document, create_date, scn) VALUES (" \
503                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
504                docType + "', '" + doc + "', current_timestamp, 1);"
505           
506           
507            self.pgc.runSQLCommand(sqlCmd)
508       
509        logging.info("Transformed records created in DB")
510   
511   
512    def _updateMetadataRecords(self):
513        '''
514        Update the metadata docs into the postgres DB
515        '''
516       
517        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
518        if self._record.db_id is None:
519            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
520            return
521       
522        for docType, doc in self._record.getAllDocs(self.discovery_dir):
523           
524            #if docType is original input xml then put that in rather than some lossy transformed guff.
525            if docType == self._record.docType:
526                doc = self._record.originalXMLdoc             
527           
528            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
529                "', update_date = current_timestamp WHERE original_document_id = " + \
530                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
531               
532           
533            self.pgc.runSQLCommand(sqlCmd)
534   
535        logging.info("Transformed records updated in DB")
536
537
538    def setRecord(self, record):
539        '''
540        Set the record to use with the DAL - to allow object re-use
541        @param record: PostgresRecord to use with the DAL
542        '''
543        self._record = record
544
Note: See TracBrowser for help on using the repository browser.