source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 6685

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@6685
Revision 6685, 21.4 KB checked in by sdonegan, 10 years ago (diff)

roundup updates - ingestion working well

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging, datetime
7#from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, isoDataModel,transformationDir, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33        self.isoDataModel = isoDataModel       
34        self.discovery_dir = transformationDir
35       
36       
37   
38
39    def getRecordID(self):
40        '''
41        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
42        returns '-1'
43        @return: id of record, if it exists, '-1' if it doesn't
44        '''
45        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
46        if self._record.db_id is not None and self._record.db_id > 0:
47            logging.info("Already looked up record - ID is " + str(self._record.db_id))
48            return self._record.db_id
49       
50        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
51            self._record.discovery_id + "';"
52        dbId = self.pgc.runSQLCommand(sql)
53        if dbId:
54            self._record.db_id = dbId[0][0]
55           
56    def getRecordID_using_OriginalDocumentFilename(self):
57        '''
58        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
59        returns '-1'
60        @return: id of record, if it exists, '-1' if it doesn't
61        '''
62        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
63       
64        '''if self._record.db_id is not None and self._record.db_id > 0:
65            logging.info("Already looked up record - ID is " + str(self._record.db_id))
66            return self._record.db_id'''
67       
68        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
69       
70        dbId = self.pgc.runSQLCommand(sql)
71       
72        if dbId:
73            self._record.db_id = dbId[0][0]
74           
75           
76    def getDiscoveryID_using_OriginalDocumentFilename(self):
77        '''
78        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
79        returns '-1'
80        @return: id of record, if it exists, '-1' if it doesn't
81        '''
82        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
83       
84        '''if self._record.db_id is not None and self._record.db_id > 0:
85            logging.info("Already looked up record - ID is " + str(self._record.db_id))
86            return self._record.db_id'''
87       
88        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
89       
90        dbId = self.pgc.runSQLCommand(sql)
91       
92        if dbId:
93            self._record.discovery_id = dbId[0][0]
94           
95       
96    def getTemporalDataId(self):
97       
98        '''
99        Looks up the temporal data id using the original document id
100        '''
101       
102        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
103       
104        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
105
106       
107       
108
109    def createOrUpdateRecord(self):
110        '''
111        Looks up a record in the DB; if it finds it, update it, otherwise create it
112        @return result: True if record created/updated, false if otherwise
113        '''
114       
115        self.getRecordID()
116        returnCode=0 # 0 if failed, 1 if update, 2 if create
117         
118        if self._record.db_id:               
119            if self.updateRecord():             
120                returnCode = 1
121        else:
122                #create the record!         
123            if self.createRecord():
124                returnCode = 2
125               
126        return returnCode
127           
128
129           
130    def createRecord(self):
131        '''
132        Create a record in the postgres DB based on the specified PostgresRecord object
133        @return result: True if record created, false if otherwise
134        '''
135        logging.info("Creating a new record in the DB for the metada document")
136        # firstly store the actual documents contained by the record - i.e. the original doc +
137        # the various transforms required
138        self._insertOriginalRecord()
139        self._insertMetadataRecords()
140       
141        # Now add the spatiotemporal data
142        self._insertSpatioTemporalData()
143       
144        #if iso original metadata format, add in additional fields       
145        self._updateISOcolumns()
146       
147        logging.info("New record created")
148        return True
149       
150       
151   
152    def _updateISOcolumns(self):
153        '''
154        Method to control the update of the extra ISO columns required for full MEDIN ingest
155        '''
156       
157        dbColumns = self.isoDataModel.mapIsoFieldsToDBcolumns()
158       
159        for isoData in dbColumns.keys():
160               
161                logging.info("Attempting to update: " + isoData)
162               
163                columnValue = getattr(self.isoDataModel,isoData)
164                columnName = dbColumns[isoData]
165               
166                if columnValue is not None:
167                        if self._updateDataBaseColumn(self._record.shortFilename, columnName, columnValue):
168                                logging.info("Successfully updated " + isoData)
169                        else:
170                                logging.warn("Could not update " + isoData)
171                else:
172                        logging.info(isoData + " is None - skipping")
173               
174               
175               
176   
177    '''
178    Method to update individual columns within the Discovery database based on unique id (original_document_filename,
179    column name and value to be inserted into that column
180    '''
181    def _updateDataBaseColumn(self,original_document_filename,columnName,columnValue):
182       
183        report = False
184       
185        logging.info("Updating " + columnName + " for record: " + original_document_filename)
186       
187        #get the matching dictionary of postgres datatypes defined in the iso model
188        postgresDataType = self.isoDataModel.mapDBcolumnsToIsoDataTypes()[columnName]
189       
190        #construct the actual sql command here - easier to handle than using sql function (& to adapt!)
191        #remember doing it on a 1:1 basis here - convert raw multi list to space delimited value if text of tsVector
192        if (postgresDataType == 'tsvector') or (postgresDataType == 'text'):
193               
194                logging.info("Data type is text or tsvector!")
195               
196                if len(columnValue[0]) == 0:
197                        logging.info("No data to extract - no point continuing!")
198                        return False
199               
200                elif len(columnValue[0]) > 1:
201                        newColVal = ""
202                        cnt = 0
203                        for val in columnValue[0]:
204                                if cnt == 0:
205                                        newColVal = newColVal + val
206                                else:
207                                        newColVal = newColVal + "; " + val
208                               
209                                cnt = 1
210                               
211                else:
212                        newColVal = columnValue[0][0]
213                       
214        else:
215                logging.info("Data type not text or vector!")
216               
217                if len(columnValue[0]) == 0:
218                        newColVal = "null"
219                elif len(columnValue[0]) > 1:
220                        logging.warn("NOTE: " + columnName + " is attempting to add multiple values - just take first for now!")
221                        newColVal = columnValue[0][0]
222                else:
223                        newColVal = columnValue[0][0]
224               
225                #TODO: at some stage add some checking of timestamps etc
226               
227        #get rid of any special characters
228        newColVal = self._record.escapeSpecialCharacters(newColVal)
229       
230        #build the sql query
231        #i.e. update original_document set original_format_version = 'dooby' where original_document_filename = 'dooby.ac.uk.shgfhf.xml'
232        if postgresDataType == 'tsvector':
233                sqlCmd = "update original_document set " + columnName + " = to_tsvector('english','" + newColVal + "') where original_document_filename = '" + original_document_filename + "';"
234       
235        else:
236                sqlCmd = "update original_document set " + columnName + " = '" + newColVal + "' where original_document_filename = '" + original_document_filename + "';"
237       
238        #submit the sqlQuery
239        try:
240               
241                #deal with nones
242                sqlCmd = sqlCmd.replace("'null'", "null")
243                sqlCmd = sqlCmd.replace("'None'", "null")
244                sqlCmd = sqlCmd.replace("None", "null")
245               
246                self.pgc.runSQLCommand(sqlCmd)
247               
248                report = True
249               
250        except:
251                logging.error("Could not submit query for: " + sqlCmd)
252               
253       
254        return report
255       
256       
257
258    def updateRecord(self):
259        '''
260        Update a record in the postgres DB based on the specified PostgresRecord object
261        @return result: True if record updated, false if otherwise
262        '''
263        logging.info("Record already existing in DB - performing updates")
264        result = False
265       
266        # firstly, check the document is actually new - i.e. not just a repeat of the same data
267        if self._checkIsUpdatedRecord():
268           
269            # firstly, update the original record
270            self._updateOriginalRecord()
271           
272            # now update the actual documents contained by the record - i.e. the original doc +
273            # the various transforms required
274            self._updateMetadataRecords()
275           
276            # If doing an update of an existing record, clear out existing spatiotemporal
277            # data, rather than updating it, to keep things simple
278            logging.info("Clearing out existing data for record - to allow clean update")
279           
280            self._deleteSpatioTemporalData()
281            self._insertSpatioTemporalData()
282            result = True
283           
284            #if iso original metadata format, add in additional fields
285            self._updateISOcolumns()
286       
287        logging.info("Finish processing document...")
288       
289        return result
290       
291       
292       
293    def _checkIsUpdatedRecord(self):
294        '''
295        Looks up an existing record and checks it is not identical to the new record; if it is
296        incremement the harvest_count value and don't process again
297        @return: True if doc contains changes, False otherwise
298        '''
299        logging.info("Checking the updated document actually contains changes")
300
301        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
302            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
303        results = self.pgc.runSQLCommand(sql)
304
305        # NB, if the document is not identical, the sql command will not find anything
306        if not results:
307            logging.info("Ingested document is different to that in the current DB")
308            # get the current SCN
309            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
310                    str(self._record.db_id) + ";"
311            results = self.pgc.runSQLCommand(sql)
312            self._record.scn = results[0][0]
313            return True
314           
315        count = results[0][0]
316
317        # get current system change number
318        scn = results[0][1]
319        self._record.scn = scn
320        logging.info("Ingested document is identical to document currently in DB - " + \
321                     "incrementing harvest_count")
322        count += 1
323        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
324            " WHERE original_document_id = " + str(self._record.db_id)
325        self.pgc.runSQLCommand(sql)
326        return False
327
328
329    def _deleteSpatioTemporalData(self):
330        '''
331        Delete all existing spatiotemporal data for the current record
332        - NB, the delete is set to cascade from the linking table so only need to
333        delete entries from there
334        '''
335        logging.info("Deleting existing spatiotemporal data for record")
336        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
337            str(self._record.db_id) + ";"     
338
339        self.pgc.runSQLCommand(sqlCmd)
340        logging.info("Spatiotemporal data deleted successfully")
341       
342
343
344    def _insertSpatioTemporalRow(self, coords, timeRange):
345        '''
346        Create a single row record in the postgres DB based on the
347        input data.  Updated to operate with dictionary methods for coords and timeRange as part of MEDIN ISO upgrade
348        @param coords: a Coords object representing NSWE coords
349        @param timeRange: a TimeRange object representing a start/end date
350        '''
351        logging.info("Adding spatiotemporal row to DB")
352       
353        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
354            str(coords['north']) + "', '" + str(coords['south']) + "', '" + str(coords['west']) + "', '" + \
355            str(coords['east']) + "', '" + timeRange['start'] + "', '" + timeRange['end'] + "');"
356           
357        # fix any null strings
358        sqlCmd = sqlCmd.replace("'null'", "null")
359               
360        self.pgc.runSQLCommand(sqlCmd)
361        logging.info("Spatiotemporal row added successfully")
362       
363       
364   
365    def _insertSpatioTemporalData(self):
366        '''
367        Create a record in the postgres DB based on the spatiotemporal data
368        specified in the PostgresRecord object
369       
370        Note updated to work with ISO metadata object where coords and times already defined as individual items.
371       
372        '''
373        logging.info("Adding spatiotemporal data to DB record")
374       
375        # Work out the relationship between the spatial and temporal data and handle appropriately
376       
377       
378        TimeRange = self._record.parseTemporalInfo(self._record.datasetTemporalData)
379        SpatialCoords = self._record.parseSpatialInfo(self._record.datasetSpatialData)
380       
381               
382        if (len(TimeRange) == 0) and (len(SpatialCoords) == 0):
383                logging.info("No spatiotemporal data found for record - skipping")
384                return
385       
386       
387        # if both arrays of data are the same length, assume they map together       
388        i = 0
389        if len(TimeRange) == len(SpatialCoords):
390            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
391            for times in TimeRange:
392                self._insertSpatioTemporalRow(SpatialCoords[i], times)
393                i += 1
394
395        # otherwise, map everything to everything
396        else:
397            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
398            for times in TimeRange:
399                for coords in SpatialCoords:
400                   
401                    self._insertSpatioTemporalRow(coords, times)
402                   
403        logging.info("Spatiotemporal data added to DB record")
404       
405       
406   
407    def _insertOriginalRecord(self):
408        '''
409        Insert the original metadata doc into the postgres DB
410        '''
411        logging.info("Inserting new original document in Postgres DB")
412       
413       
414        ''' ndg3 style command
415        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
416            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
417            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
418            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
419            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
420                   
421        '''
422       
423        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
424            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
425            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
426            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
427            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + \
428            self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
429       
430       
431        #sort out any nulls..
432        sqlCmd = sqlCmd.replace("'NULL'","NULL")
433       
434        #sort out any Nones
435        sqlCmd = sqlCmd.replace("'None'","NULL")
436       
437        id = self.pgc.runSQLCommand(sqlCmd)
438        if len(id) == 0:
439            raise ValueError, 'No PK ID returned from INSERT to DB'
440       
441        self._record.db_id = id[0][0] 
442       
443        logging.info("Original document inserted in Postgres DB")
444           
445   
446    def deleteOriginalRecord(self):
447        '''
448        Delete the original metadata doc from the postgres DB
449        '''
450        logging.info("Deleting original document from Postgres DB")
451        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
452
453        self.pgc.runSQLCommand(sqlCmd)
454        logging.info("Original document deleted from Postgres DB")
455       
456   
457    def _updateOriginalRecord(self):
458        '''
459        Update the original doc in the postgres DB
460        '''
461        logging.info("Updating original document in Postgres DB")
462        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
463            self._record.shortFilename + "', '" + \
464            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
465            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
466            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
467            str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + \
468            "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + \
469            self._record.datasetEndNom + "');" 
470           
471       
472        #sort out any NULL values"
473        sqlCmd = sqlCmd.replace("'NULL'","NULL")
474       
475        #sort out any Nones
476        sqlCmd = sqlCmd.replace("'None'","NULL")       
477       
478        logging.info("Submitting SQL command")
479        #logging.info("SQl command:   " + sqlCmd)         
480       
481        self.pgc.runSQLCommand(sqlCmd)
482       
483        # increment scn - saves doing an additional db call
484        self._record.scn += 1
485       
486        logging.info("Original document updated in Postgres DB")
487   
488           
489    def _insertMetadataRecords(self):
490        '''
491        Insert the metadata docs into the postgres DB
492        '''
493        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
494       
495       
496        if self._record.db_id is None:
497            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
498            return
499       
500        for docType, doc in self._record.getAllDocs(self.discovery_dir):
501               
502            #if docType is original input xml then put that in rather than some lossy transformed guff.
503            if docType == self._record.docType:
504                doc = self._record.originalXMLdoc
505           
506                               
507            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
508                "original_document_id, transformed_format, " \
509                "transformed_document, create_date, scn) VALUES (" \
510                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
511                docType + "', '" + doc + "', current_timestamp, 1);"
512           
513           
514            self.pgc.runSQLCommand(sqlCmd)
515       
516        logging.info("Transformed records created in DB")
517   
518   
519    def _updateMetadataRecords(self):
520        '''
521        Update the metadata docs into the postgres DB
522        '''
523       
524        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
525        if self._record.db_id is None:
526            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
527            return
528       
529        for docType, doc in self._record.getAllDocs(self.discovery_dir):
530           
531            #if docType is original input xml then put that in rather than some lossy transformed guff.
532            if docType == self._record.docType:
533                doc = self._record.originalXMLdoc             
534           
535            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
536                "', update_date = current_timestamp WHERE original_document_id = " + \
537                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
538               
539           
540            self.pgc.runSQLCommand(sqlCmd)
541   
542        logging.info("Transformed records updated in DB")
543
544
545    def setRecord(self, record):
546        '''
547        Set the record to use with the DAL - to allow object re-use
548        @param record: PostgresRecord to use with the DAL
549        '''
550        self._record = record
551
Note: See TracBrowser for help on using the repository browser.