source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 6904

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@6904
Revision 6904, 22.5 KB checked in by sdonegan, 10 years ago (diff)

further debug updates to check dates

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging, datetime
7#from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, isoDataModel,transformationDir, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33        self.isoDataModel = isoDataModel       
34        self.discovery_dir = transformationDir
35       
36       
37   
38
39    def getRecordID(self):
40        '''
41        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
42        returns '-1'
43        @return: id of record, if it exists, '-1' if it doesn't
44        '''
45        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
46        if self._record.db_id is not None and self._record.db_id > 0:
47            logging.info("Already looked up record - ID is " + str(self._record.db_id))
48            return self._record.db_id
49       
50        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
51            self._record.discovery_id + "';"
52           
53        logging.info("sql lookup cmd = " + sql)
54        dbId = self.pgc.runSQLCommand(sql)
55        if dbId:
56            self._record.db_id = dbId[0][0]
57           
58           
59    def getRecordID_using_OriginalDocumentFilename(self):
60        '''
61        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
62        returns '-1'
63        @return: id of record, if it exists, '-1' if it doesn't
64        '''
65        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
66       
67        '''if self._record.db_id is not None and self._record.db_id > 0:
68            logging.info("Already looked up record - ID is " + str(self._record.db_id))
69            return self._record.db_id'''
70       
71        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
72       
73        dbId = self.pgc.runSQLCommand(sql)
74       
75        if dbId:
76            self._record.db_id = dbId[0][0]
77           
78           
79    def getDiscoveryID_using_OriginalDocumentFilename(self):
80        '''
81        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
82        returns '-1'
83        @return: id of record, if it exists, '-1' if it doesn't
84        '''
85        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
86       
87        '''if self._record.db_id is not None and self._record.db_id > 0:
88            logging.info("Already looked up record - ID is " + str(self._record.db_id))
89            return self._record.db_id'''
90       
91        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
92       
93        dbId = self.pgc.runSQLCommand(sql)
94       
95        if dbId:
96            self._record.discovery_id = dbId[0][0]
97           
98       
99    def getTemporalDataId(self):
100       
101        '''
102        Looks up the temporal data id using the original document id
103        '''
104       
105        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
106       
107        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
108
109       
110       
111
112    def createOrUpdateRecord(self):
113        '''
114        Looks up a record in the DB; if it finds it, update it, otherwise create it
115        @return result: True if record created/updated, false if otherwise
116        '''
117       
118        self.getRecordID()
119        returnCode=0 # 0 if failed, 1 if update, 2 if create
120         
121        if self._record.db_id:               
122            if self.updateRecord():             
123                returnCode = 1
124        else:
125                #create the record!         
126            if self.createRecord():
127                returnCode = 2
128               
129        return returnCode
130           
131
132           
133    def createRecord(self):
134        '''
135        Create a record in the postgres DB based on the specified PostgresRecord object
136        @return result: True if record created, false if otherwise
137        '''
138        logging.info("Creating a new record in the DB for the metada document")
139        # firstly store the actual documents contained by the record - i.e. the original doc +
140        # the various transforms required
141        self._insertOriginalRecord()
142        self._insertMetadataRecords()
143       
144        # Now add the spatiotemporal data
145        self._insertSpatioTemporalData()
146       
147        #if iso original metadata format, add in additional fields       
148        self._updateISOcolumns()
149       
150        logging.info("New record created")
151        return True
152       
153       
154   
155    def _updateISOcolumns(self):
156        '''
157        Method to control the update of the extra ISO columns required for full MEDIN ingest
158        '''
159       
160        dbColumns = self.isoDataModel.mapIsoFieldsToDBcolumns()
161       
162       
163        for isoData in dbColumns.keys():
164               
165                logging.info("Attempting to update: " + isoData)
166               
167                columnValue = getattr(self.isoDataModel,isoData)
168                columnName = dbColumns[isoData]
169               
170                if columnValue is not None:
171                        if self._updateDataBaseColumn(self._record.shortFilename, columnName, columnValue):
172                                logging.info("Successfully updated " + isoData)
173                        else:
174                                logging.warn("Could not update " + isoData)
175                else:
176                        logging.info(isoData + " is None - skipping")
177               
178               
179               
180   
181    '''
182    Method to update individual columns within the Discovery database based on unique id (original_document_filename,
183    column name and value to be inserted into that column
184    '''
185    def _updateDataBaseColumn(self,original_document_filename,columnName,columnValue):
186       
187        report = False
188       
189        logging.info("Updating " + columnName + " for record: " + original_document_filename)
190       
191       
192        #get the matching dictionary of postgres datatypes defined in the iso model
193        postgresDataType = self.isoDataModel.mapDBcolumnsToIsoDataTypes()[columnName]
194       
195        #construct the actual sql command here - easier to handle than using sql function (& to adapt!)
196        #remember doing it on a 1:1 basis here - convert raw multi list to space delimited value if text of tsVector
197        if (postgresDataType == 'tsvector') or (postgresDataType == 'text'):
198               
199                logging.info("Data type is text or tsvector!")
200               
201                #need to work out whether value passed here to be updated is a string or list
202                if type(columnValue) is list:
203                       
204                        if len(columnValue[0]) == 0:
205                                logging.info("No data to extract - no point continuing!")
206                                return False
207                         
208                        elif len(columnValue[0]) > 1:
209                                newColVal = ""
210                                cnt = 0
211                               
212                                for val in columnValue[0]:
213                                        if cnt == 0:
214                                                newColVal = newColVal + val
215                                else:
216                                                newColVal = newColVal + "; " + val
217                               
218                                cnt = 1
219                         
220                        else:
221                                newColVal = columnValue[0][0]
222                               
223                else:
224                        logging.info("Value type for %s has already been converted to a simple string" %columnName )
225                        #remaining type to deal with is the value has already been converted to a simple string (i.e. authors, data centre name)
226                        newColVal = columnValue
227               
228               
229                       
230        else:
231                logging.info("Data type not text or vector!")
232               
233                if len(columnValue[0]) == 0:
234                        newColVal = "null"
235                elif len(columnValue[0]) > 1:
236                        logging.warn("NOTE: " + columnName + " is attempting to add multiple values - just take first for now!")
237                        newColVal = columnValue[0][0]
238                else:
239                        newColVal = columnValue[0][0]
240               
241                #TODO: at some stage add some checking of timestamps etc
242               
243        #get rid of any special characters
244        newColVal = self._record.escapeSpecialCharacters(newColVal)
245       
246        #build the sql query
247        #i.e. update original_document set original_format_version = 'dooby' where original_document_filename = 'dooby.ac.uk.shgfhf.xml'
248        if postgresDataType == 'tsvector':
249                sqlCmd = "update original_document set " + columnName + " = to_tsvector('english','" + newColVal + "') where original_document_filename = '" + original_document_filename + "';"
250       
251        else:
252                sqlCmd = "update original_document set " + columnName + " = '" + newColVal + "' where original_document_filename = '" + original_document_filename + "';"
253       
254        #submit the sqlQuery
255        try:
256               
257                #deal with nones
258                sqlCmd = sqlCmd.replace("'null'", "null")
259                sqlCmd = sqlCmd.replace("'None'", "null")
260                sqlCmd = sqlCmd.replace("None", "null")
261               
262                self.pgc.runSQLCommand(sqlCmd)
263               
264                report = True
265               
266        except:
267                logging.error("Could not submit query for: " + sqlCmd)
268               
269       
270        return report
271       
272       
273
274    def updateRecord(self):
275        '''
276        Update a record in the postgres DB based on the specified PostgresRecord object
277        @return result: True if record updated, false if otherwise
278        '''
279        logging.info("Record already existing in DB - performing updates")
280        result = False
281       
282        # firstly, check the document is actually new - i.e. not just a repeat of the same data
283        if self._checkIsUpdatedRecord():
284           
285            # firstly, update the original record
286            self._updateOriginalRecord()
287           
288            # now update the actual documents contained by the record - i.e. the original doc +
289            # the various transforms required
290            self._updateMetadataRecords()
291           
292            # If doing an update of an existing record, clear out existing spatiotemporal
293            # data, rather than updating it, to keep things simple
294            logging.info("Clearing out existing data for record - to allow clean update")
295           
296            self._insertSpatioTemporalData()
297            result = True
298           
299            #if iso original metadata format, add in additional fields
300            self._updateISOcolumns()
301       
302        logging.info("Finish processing document...")
303       
304        return result
305       
306       
307       
308    def _checkIsUpdatedRecord(self):
309        '''
310        Looks up an existing record and checks it is not identical to the new record; if it is
311        incremement the harvest_count value and don't process again
312        @return: True if doc contains changes, False otherwise
313        '''
314        logging.info("Checking the updated document actually contains changes")
315       
316        #sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
317        #    str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
318       
319        #get the harvest count and scn from the original_document table...
320        sql_orig = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + str(self._record.db_id) + ";"
321       
322        results_orig = self.pgc.runSQLCommand(sql_orig)
323       
324        #get the original_document from the transformed docs table       
325        sql_transf = "SELECT transformed_document_id FROM transformed_document where original_document_id = " + str(self._record.db_id) +  \
326                " AND transformed_document = '" + self._record.originalFormat + "';"
327               
328        results_transf = self.pgc.runSQLCommand(sql_transf)       
329       
330        # NB, if the document is not identical, the sql command will not find anything
331        if not results_transf:
332            logging.info("Ingested document is different to that in the current DB")
333           
334            # get the current SCN
335            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
336                    str(self._record.db_id) + ";"
337            results = self.pgc.runSQLCommand(sql)
338            self._record.scn = results[0][0]
339            return True
340           
341        count = results_orig[0][0]
342
343        # get current system change number
344        scn = results_orig[0][1]
345        self._record.scn = scn
346        logging.info("Ingested document is identical to document currently in DB - " + \
347                     "incrementing harvest_count")
348        count += 1
349        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
350            " WHERE original_document_id = " + str(self._record.db_id)
351        self.pgc.runSQLCommand(sql)
352        return False
353
354
355    def _deleteSpatioTemporalData(self):
356        '''
357        Delete all existing spatiotemporal data for the current record
358        - NB, the delete is set to cascade from the linking table so only need to
359        delete entries from there
360        '''
361        logging.info("Deleting existing spatiotemporal data for record")
362        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
363            str(self._record.db_id) + ";"     
364
365        self.pgc.runSQLCommand(sqlCmd)
366        logging.info("Spatiotemporal data deleted successfully")
367       
368
369
370    def _insertSpatioTemporalRow(self, coords, timeRange):
371        '''
372        Create a single row record in the postgres DB based on the
373        input data.  Updated to operate with dictionary methods for coords and timeRange as part of MEDIN ISO upgrade
374        @param coords: a Coords object representing NSWE coords
375        @param timeRange: a TimeRange object representing a start/end date
376        '''
377        logging.info("Adding spatiotemporal row to DB")
378       
379       
380        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
381            str(coords['north']) + "', '" + str(coords['south']) + "', '" + str(coords['west']) + "', '" + \
382            str(coords['east']) + "', '" + timeRange['start'] + "', '" + timeRange['end'] + "');"
383           
384        # fix any null strings
385        sqlCmd = sqlCmd.replace("'null'", "null")
386               
387        self.pgc.runSQLCommand(sqlCmd)
388        logging.info("Spatiotemporal row added successfully")
389       
390       
391   
392    def _insertSpatioTemporalData(self):
393        '''
394        Create a record in the postgres DB based on the spatiotemporal data
395        specified in the PostgresRecord object
396       
397        Note updated to work with ISO metadata object where coords and times already defined as individual items.
398       
399        '''
400        logging.info("Adding spatiotemporal data to DB record")
401       
402        # Work out the relationship between the spatial and temporal data and handle appropriately
403       
404       
405        TimeRange = self._record.parseTemporalInfo(self._record.datasetTemporalData)
406        SpatialCoords = self._record.parseSpatialInfo(self._record.datasetSpatialData)
407       
408               
409        if (len(TimeRange) == 0) and (len(SpatialCoords) == 0):
410                logging.info("No spatiotemporal data found for record - skipping")
411                return
412       
413       
414        # if both arrays of data are the same length, assume they map together       
415        i = 0
416        if len(TimeRange) == len(SpatialCoords):
417            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
418            for times in TimeRange:
419                self._insertSpatioTemporalRow(SpatialCoords[i], times)
420                i += 1
421
422        # otherwise, map everything to everything
423        else:
424            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
425            for times in TimeRange:
426                for coords in SpatialCoords:
427                   
428                    self._insertSpatioTemporalRow(coords, times)
429                   
430        logging.info("Spatiotemporal data added to DB record")
431       
432       
433   
434    def _insertOriginalRecord(self):
435        '''
436        Insert the original metadata doc into the postgres DB
437        '''
438        logging.info("Inserting new original document in Postgres DB")
439       
440       
441        ''' ndg3 style command
442        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
443            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
444            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
445            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
446            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
447                   
448        '''
449       
450        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
451            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
452            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
453            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
454            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + \
455            self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
456       
457       
458        #sort out any nulls..
459        sqlCmd = sqlCmd.replace("'NULL'","NULL")
460       
461        #sort out any Nones
462        sqlCmd = sqlCmd.replace("'None'","NULL")
463       
464        id = self.pgc.runSQLCommand(sqlCmd)
465        if len(id) == 0:
466            raise ValueError, 'No PK ID returned from INSERT to DB'
467       
468        self._record.db_id = id[0][0] 
469       
470        logging.info("Original document inserted in Postgres DB")
471           
472   
473    def deleteOriginalRecord(self):
474        '''
475        Delete the original metadata doc from the postgres DB
476        '''
477        logging.info("Deleting original document from Postgres DB")
478        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
479
480        self.pgc.runSQLCommand(sqlCmd)
481        logging.info("Original document deleted from Postgres DB")
482       
483   
484    def _updateOriginalRecord(self):
485        '''
486        Update the original doc in the postgres DB
487        '''
488       
489        logging.info("Updating original document in Postgres DB")
490        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
491            self._record.shortFilename + "', '" + \
492            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
493            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
494            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
495            str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + \
496            "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + \
497            self._record.datasetEndNom + "');" 
498           
499       
500        #sort out any NULL values"
501        sqlCmd = sqlCmd.replace("'NULL'","NULL")
502       
503        #sort out any Nones
504        sqlCmd = sqlCmd.replace("'None'","NULL")       
505       
506        logging.info("Submitting SQL command")
507        #logging.info("SQl command:   " + sqlCmd)         
508       
509        self.pgc.runSQLCommand(sqlCmd)
510       
511        # increment scn - saves doing an additional db call
512        self._record.scn += 1
513       
514        logging.info("Original document updated in Postgres DB")
515   
516           
517    def _insertMetadataRecords(self):
518        '''
519        Insert the metadata docs into the postgres DB
520        '''
521        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
522       
523       
524        if self._record.db_id is None:
525            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
526            return
527       
528        for docType, doc in self._record.getAllDocs(self.discovery_dir):
529               
530            #if docType is original input xml then put that in rather than some lossy transformed guff.
531            if docType == self._record.docType:
532                doc = self._record.originalXMLdoc
533           
534                               
535            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
536                "original_document_id, transformed_format, " \
537                "transformed_document, create_date, scn) VALUES (" \
538                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
539                docType + "', '" + doc + "', current_timestamp, 1);"
540           
541           
542            self.pgc.runSQLCommand(sqlCmd)
543       
544        logging.info("Transformed records created in DB")
545   
546   
547    def _updateMetadataRecords(self):
548        '''
549        Update the metadata docs into the postgres DB
550        '''
551       
552        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
553        if self._record.db_id is None:
554            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
555            return
556       
557        for docType, doc in self._record.getAllDocs(self.discovery_dir):
558           
559            #if docType is original input xml then put that in rather than some lossy transformed guff.
560            if docType == self._record.docType:
561                doc = self._record.originalXMLdoc             
562           
563            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
564                "', update_date = current_timestamp WHERE original_document_id = " + \
565                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
566               
567           
568            self.pgc.runSQLCommand(sqlCmd)
569   
570        logging.info("Transformed records updated in DB")
571
572
573    def setRecord(self, record):
574        '''
575        Set the record to use with the DAL - to allow object re-use
576        @param record: PostgresRecord to use with the DAL
577        '''
578        self._record = record
579
Note: See TracBrowser for help on using the repository browser.