source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 6865

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@6865
Revision 6865, 22.5 KB checked in by sdonegan, 10 years ago (diff)

Upgrade a set of bug fixes to deal with authors, upgrade stats, reporting and deletion of records. Also includes update to Utilities.py for methods to provide string representation of the ISO data object doubledictionary

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging, datetime
7#from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, isoDataModel,transformationDir, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33        self.isoDataModel = isoDataModel       
34        self.discovery_dir = transformationDir
35       
36       
37   
38
39    def getRecordID(self):
40        '''
41        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
42        returns '-1'
43        @return: id of record, if it exists, '-1' if it doesn't
44        '''
45        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
46        if self._record.db_id is not None and self._record.db_id > 0:
47            logging.info("Already looked up record - ID is " + str(self._record.db_id))
48            return self._record.db_id
49       
50        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
51            self._record.discovery_id + "';"
52           
53        logging.info("sql lookup cmd = " + sql)
54        dbId = self.pgc.runSQLCommand(sql)
55        if dbId:
56            self._record.db_id = dbId[0][0]
57           
58           
59    def getRecordID_using_OriginalDocumentFilename(self):
60        '''
61        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
62        returns '-1'
63        @return: id of record, if it exists, '-1' if it doesn't
64        '''
65        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
66       
67        '''if self._record.db_id is not None and self._record.db_id > 0:
68            logging.info("Already looked up record - ID is " + str(self._record.db_id))
69            return self._record.db_id'''
70       
71        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
72       
73        dbId = self.pgc.runSQLCommand(sql)
74       
75        if dbId:
76            self._record.db_id = dbId[0][0]
77           
78           
79    def getDiscoveryID_using_OriginalDocumentFilename(self):
80        '''
81        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
82        returns '-1'
83        @return: id of record, if it exists, '-1' if it doesn't
84        '''
85        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
86       
87        '''if self._record.db_id is not None and self._record.db_id > 0:
88            logging.info("Already looked up record - ID is " + str(self._record.db_id))
89            return self._record.db_id'''
90       
91        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
92       
93        dbId = self.pgc.runSQLCommand(sql)
94       
95        if dbId:
96            self._record.discovery_id = dbId[0][0]
97           
98       
99    def getTemporalDataId(self):
100       
101        '''
102        Looks up the temporal data id using the original document id
103        '''
104       
105        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
106       
107        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
108
109       
110       
111
112    def createOrUpdateRecord(self):
113        '''
114        Looks up a record in the DB; if it finds it, update it, otherwise create it
115        @return result: True if record created/updated, false if otherwise
116        '''
117       
118        self.getRecordID()
119        returnCode=0 # 0 if failed, 1 if update, 2 if create
120         
121        if self._record.db_id:               
122            if self.updateRecord():             
123                returnCode = 1
124        else:
125                #create the record!         
126            if self.createRecord():
127                returnCode = 2
128               
129        return returnCode
130           
131
132           
133    def createRecord(self):
134        '''
135        Create a record in the postgres DB based on the specified PostgresRecord object
136        @return result: True if record created, false if otherwise
137        '''
138        logging.info("Creating a new record in the DB for the metada document")
139        # firstly store the actual documents contained by the record - i.e. the original doc +
140        # the various transforms required
141        self._insertOriginalRecord()
142        self._insertMetadataRecords()
143       
144        # Now add the spatiotemporal data
145        self._insertSpatioTemporalData()
146       
147        #if iso original metadata format, add in additional fields       
148        self._updateISOcolumns()
149       
150        logging.info("New record created")
151        return True
152       
153       
154   
155    def _updateISOcolumns(self):
156        '''
157        Method to control the update of the extra ISO columns required for full MEDIN ingest
158        '''
159       
160        dbColumns = self.isoDataModel.mapIsoFieldsToDBcolumns()
161       
162       
163        for isoData in dbColumns.keys():
164               
165                logging.info("Attempting to update: " + isoData)
166               
167                columnValue = getattr(self.isoDataModel,isoData)
168                columnName = dbColumns[isoData]
169               
170                if columnValue is not None:
171                        if self._updateDataBaseColumn(self._record.shortFilename, columnName, columnValue):
172                                logging.info("Successfully updated " + isoData)
173                        else:
174                                logging.warn("Could not update " + isoData)
175                else:
176                        logging.info(isoData + " is None - skipping")
177               
178               
179               
180   
181    '''
182    Method to update individual columns within the Discovery database based on unique id (original_document_filename,
183    column name and value to be inserted into that column
184    '''
185    def _updateDataBaseColumn(self,original_document_filename,columnName,columnValue):
186       
187        report = False
188       
189        logging.info("Updating " + columnName + " for record: " + original_document_filename)
190       
191       
192        #get the matching dictionary of postgres datatypes defined in the iso model
193        postgresDataType = self.isoDataModel.mapDBcolumnsToIsoDataTypes()[columnName]
194       
195        #construct the actual sql command here - easier to handle than using sql function (& to adapt!)
196        #remember doing it on a 1:1 basis here - convert raw multi list to space delimited value if text of tsVector
197        if (postgresDataType == 'tsvector') or (postgresDataType == 'text'):
198               
199                logging.info("Data type is text or tsvector!")
200               
201                #need to work out whether value passed here to be updated is a string or list
202                if type(columnValue) is list:
203                       
204                        if len(columnValue[0]) == 0:
205                                logging.info("No data to extract - no point continuing!")
206                                return False
207                         
208                        elif len(columnValue[0]) > 1:
209                                newColVal = ""
210                                cnt = 0
211                               
212                                for val in columnValue[0]:
213                                        if cnt == 0:
214                                                newColVal = newColVal + val
215                                else:
216                                                newColVal = newColVal + "; " + val
217                               
218                                cnt = 1
219                         
220                        else:
221                                newColVal = columnValue[0][0]
222                               
223                else:
224                        logging.info("Value type for %s has already been converted to a simple string" %columnName )
225                        #remaining type to deal with is the value has already been converted to a simple string (i.e. authors, data centre name)
226                        newColVal = columnValue
227               
228               
229                       
230        else:
231                logging.info("Data type not text or vector!")
232               
233                if len(columnValue[0]) == 0:
234                        newColVal = "null"
235                elif len(columnValue[0]) > 1:
236                        logging.warn("NOTE: " + columnName + " is attempting to add multiple values - just take first for now!")
237                        newColVal = columnValue[0][0]
238                else:
239                        newColVal = columnValue[0][0]
240               
241                #TODO: at some stage add some checking of timestamps etc
242               
243        #get rid of any special characters
244        newColVal = self._record.escapeSpecialCharacters(newColVal)
245       
246        #build the sql query
247        #i.e. update original_document set original_format_version = 'dooby' where original_document_filename = 'dooby.ac.uk.shgfhf.xml'
248        if postgresDataType == 'tsvector':
249                sqlCmd = "update original_document set " + columnName + " = to_tsvector('english','" + newColVal + "') where original_document_filename = '" + original_document_filename + "';"
250       
251        else:
252                sqlCmd = "update original_document set " + columnName + " = '" + newColVal + "' where original_document_filename = '" + original_document_filename + "';"
253       
254        #submit the sqlQuery
255        try:
256               
257                #deal with nones
258                sqlCmd = sqlCmd.replace("'null'", "null")
259                sqlCmd = sqlCmd.replace("'None'", "null")
260                sqlCmd = sqlCmd.replace("None", "null")
261               
262                self.pgc.runSQLCommand(sqlCmd)
263               
264                report = True
265               
266        except:
267                logging.error("Could not submit query for: " + sqlCmd)
268               
269       
270        return report
271       
272       
273
274    def updateRecord(self):
275        '''
276        Update a record in the postgres DB based on the specified PostgresRecord object
277        @return result: True if record updated, false if otherwise
278        '''
279        logging.info("Record already existing in DB - performing updates")
280        result = False
281       
282        # firstly, check the document is actually new - i.e. not just a repeat of the same data
283        if self._checkIsUpdatedRecord():
284           
285            # firstly, update the original record
286            self._updateOriginalRecord()
287           
288            # now update the actual documents contained by the record - i.e. the original doc +
289            # the various transforms required
290            self._updateMetadataRecords()
291           
292            # If doing an update of an existing record, clear out existing spatiotemporal
293            # data, rather than updating it, to keep things simple
294            logging.info("Clearing out existing data for record - to allow clean update")
295           
296            self._insertSpatioTemporalData()
297            result = True
298           
299            #if iso original metadata format, add in additional fields
300            self._updateISOcolumns()
301       
302        logging.info("Finish processing document...")
303       
304        return result
305       
306       
307       
308    def _checkIsUpdatedRecord(self):
309        '''
310        Looks up an existing record and checks it is not identical to the new record; if it is
311        incremement the harvest_count value and don't process again
312        @return: True if doc contains changes, False otherwise
313        '''
314        logging.info("Checking the updated document actually contains changes")
315       
316        #sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
317        #    str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
318       
319        #get the harvest count and scn from the original_document table...
320        sql_orig = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + str(self._record.db_id) + ";"
321       
322        results_orig = self.pgc.runSQLCommand(sql_orig)
323       
324        #get the original_document from the transformed docs table       
325        sql_transf = "SELECT transformed_document_id FROM transformed_document where original_document_id = " + str(self._record.db_id) +  \
326                " AND transformed_document = '" + self._record.originalFormat + "';"
327               
328        results_transf = self.pgc.runSQLCommand(sql_transf)       
329       
330        # NB, if the document is not identical, the sql command will not find anything
331        if not results_transf:
332            logging.info("Ingested document is different to that in the current DB")
333           
334            # get the current SCN
335            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
336                    str(self._record.db_id) + ";"
337            results = self.pgc.runSQLCommand(sql)
338            self._record.scn = results[0][0]
339            return True
340           
341        count = results_orig[0][0]
342
343        # get current system change number
344        scn = results_orig[0][1]
345        self._record.scn = scn
346        logging.info("Ingested document is identical to document currently in DB - " + \
347                     "incrementing harvest_count")
348        count += 1
349        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
350            " WHERE original_document_id = " + str(self._record.db_id)
351        self.pgc.runSQLCommand(sql)
352        return False
353
354
355    def _deleteSpatioTemporalData(self):
356        '''
357        Delete all existing spatiotemporal data for the current record
358        - NB, the delete is set to cascade from the linking table so only need to
359        delete entries from there
360        '''
361        logging.info("Deleting existing spatiotemporal data for record")
362        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
363            str(self._record.db_id) + ";"     
364
365        self.pgc.runSQLCommand(sqlCmd)
366        logging.info("Spatiotemporal data deleted successfully")
367       
368
369
370    def _insertSpatioTemporalRow(self, coords, timeRange):
371        '''
372        Create a single row record in the postgres DB based on the
373        input data.  Updated to operate with dictionary methods for coords and timeRange as part of MEDIN ISO upgrade
374        @param coords: a Coords object representing NSWE coords
375        @param timeRange: a TimeRange object representing a start/end date
376        '''
377        logging.info("Adding spatiotemporal row to DB")
378       
379        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
380            str(coords['north']) + "', '" + str(coords['south']) + "', '" + str(coords['west']) + "', '" + \
381            str(coords['east']) + "', '" + timeRange['start'] + "', '" + timeRange['end'] + "');"
382           
383        # fix any null strings
384        sqlCmd = sqlCmd.replace("'null'", "null")
385               
386        self.pgc.runSQLCommand(sqlCmd)
387        logging.info("Spatiotemporal row added successfully")
388       
389       
390   
391    def _insertSpatioTemporalData(self):
392        '''
393        Create a record in the postgres DB based on the spatiotemporal data
394        specified in the PostgresRecord object
395       
396        Note updated to work with ISO metadata object where coords and times already defined as individual items.
397       
398        '''
399        logging.info("Adding spatiotemporal data to DB record")
400       
401        # Work out the relationship between the spatial and temporal data and handle appropriately
402       
403       
404        TimeRange = self._record.parseTemporalInfo(self._record.datasetTemporalData)
405        SpatialCoords = self._record.parseSpatialInfo(self._record.datasetSpatialData)
406       
407               
408        if (len(TimeRange) == 0) and (len(SpatialCoords) == 0):
409                logging.info("No spatiotemporal data found for record - skipping")
410                return
411       
412       
413        # if both arrays of data are the same length, assume they map together       
414        i = 0
415        if len(TimeRange) == len(SpatialCoords):
416            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
417            for times in TimeRange:
418                self._insertSpatioTemporalRow(SpatialCoords[i], times)
419                i += 1
420
421        # otherwise, map everything to everything
422        else:
423            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
424            for times in TimeRange:
425                for coords in SpatialCoords:
426                   
427                    self._insertSpatioTemporalRow(coords, times)
428                   
429        logging.info("Spatiotemporal data added to DB record")
430       
431       
432   
433    def _insertOriginalRecord(self):
434        '''
435        Insert the original metadata doc into the postgres DB
436        '''
437        logging.info("Inserting new original document in Postgres DB")
438       
439       
440        ''' ndg3 style command
441        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
442            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
443            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
444            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
445            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
446                   
447        '''
448       
449        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
450            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
451            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
452            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
453            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + \
454            self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
455       
456       
457        #sort out any nulls..
458        sqlCmd = sqlCmd.replace("'NULL'","NULL")
459       
460        #sort out any Nones
461        sqlCmd = sqlCmd.replace("'None'","NULL")
462       
463        id = self.pgc.runSQLCommand(sqlCmd)
464        if len(id) == 0:
465            raise ValueError, 'No PK ID returned from INSERT to DB'
466       
467        self._record.db_id = id[0][0] 
468       
469        logging.info("Original document inserted in Postgres DB")
470           
471   
472    def deleteOriginalRecord(self):
473        '''
474        Delete the original metadata doc from the postgres DB
475        '''
476        logging.info("Deleting original document from Postgres DB")
477        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
478
479        self.pgc.runSQLCommand(sqlCmd)
480        logging.info("Original document deleted from Postgres DB")
481       
482   
483    def _updateOriginalRecord(self):
484        '''
485        Update the original doc in the postgres DB
486        '''
487       
488        logging.info("Updating original document in Postgres DB")
489        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
490            self._record.shortFilename + "', '" + \
491            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
492            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
493            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
494            str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + \
495            "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + \
496            self._record.datasetEndNom + "');" 
497           
498       
499        #sort out any NULL values"
500        sqlCmd = sqlCmd.replace("'NULL'","NULL")
501       
502        #sort out any Nones
503        sqlCmd = sqlCmd.replace("'None'","NULL")       
504       
505        logging.info("Submitting SQL command")
506        #logging.info("SQl command:   " + sqlCmd)         
507       
508        self.pgc.runSQLCommand(sqlCmd)
509       
510        # increment scn - saves doing an additional db call
511        self._record.scn += 1
512       
513        logging.info("Original document updated in Postgres DB")
514   
515           
516    def _insertMetadataRecords(self):
517        '''
518        Insert the metadata docs into the postgres DB
519        '''
520        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
521       
522       
523        if self._record.db_id is None:
524            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
525            return
526       
527        for docType, doc in self._record.getAllDocs(self.discovery_dir):
528               
529            #if docType is original input xml then put that in rather than some lossy transformed guff.
530            if docType == self._record.docType:
531                doc = self._record.originalXMLdoc
532           
533                               
534            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
535                "original_document_id, transformed_format, " \
536                "transformed_document, create_date, scn) VALUES (" \
537                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
538                docType + "', '" + doc + "', current_timestamp, 1);"
539           
540           
541            self.pgc.runSQLCommand(sqlCmd)
542       
543        logging.info("Transformed records created in DB")
544   
545   
546    def _updateMetadataRecords(self):
547        '''
548        Update the metadata docs into the postgres DB
549        '''
550       
551        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
552        if self._record.db_id is None:
553            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
554            return
555       
556        for docType, doc in self._record.getAllDocs(self.discovery_dir):
557           
558            #if docType is original input xml then put that in rather than some lossy transformed guff.
559            if docType == self._record.docType:
560                doc = self._record.originalXMLdoc             
561           
562            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
563                "', update_date = current_timestamp WHERE original_document_id = " + \
564                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
565               
566           
567            self.pgc.runSQLCommand(sqlCmd)
568   
569        logging.info("Transformed records updated in DB")
570
571
572    def setRecord(self, record):
573        '''
574        Set the record to use with the DAL - to allow object re-use
575        @param record: PostgresRecord to use with the DAL
576        '''
577        self._record = record
578
Note: See TracBrowser for help on using the repository browser.