source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 7457

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@7457
Revision 7457, 24.6 KB checked in by sdonegan, 10 years ago (diff)

simple bug fixes

Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging, datetime
7#from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, isoDataModel,transformationDir, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33        self.isoDataModel = isoDataModel       
34        self.discovery_dir = transformationDir
35       
36       
37   
38
39    def getRecordID(self):
40        '''
41        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
42        returns '-1'
43        @return: id of record, if it exists, '-1' if it doesn't
44        '''
45        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
46        if self._record.db_id is not None and self._record.db_id > 0:
47            logging.info("Already looked up record - ID is " + str(self._record.db_id))
48            return self._record.db_id
49       
50        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
51            self._record.discovery_id + "';"
52           
53        logging.info("sql lookup cmd = " + sql)
54        dbId = self.pgc.runSQLCommand(sql)
55        if dbId:
56            self._record.db_id = dbId[0][0]
57           
58           
59    def getRecordID_using_OriginalDocumentFilename(self):
60        '''
61        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
62        returns '-1'
63        @return: id of record, if it exists, '-1' if it doesn't
64        '''
65        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
66       
67        '''if self._record.db_id is not None and self._record.db_id > 0:
68            logging.info("Already looked up record - ID is " + str(self._record.db_id))
69            return self._record.db_id'''
70       
71        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
72       
73        dbId = self.pgc.runSQLCommand(sql)
74       
75        if dbId:
76            self._record.db_id = dbId[0][0]
77           
78           
79    def getDiscoveryID_using_OriginalDocumentFilename(self):
80        '''
81        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
82        returns '-1'
83        @return: id of record, if it exists, '-1' if it doesn't
84        '''
85        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
86       
87        '''if self._record.db_id is not None and self._record.db_id > 0:
88            logging.info("Already looked up record - ID is " + str(self._record.db_id))
89            return self._record.db_id'''
90       
91        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
92       
93        dbId = self.pgc.runSQLCommand(sql)
94       
95        if dbId:
96            self._record.discovery_id = dbId[0][0]
97           
98       
99    def getTemporalDataId(self):
100       
101        '''
102        Looks up the temporal data id using the original document id
103        '''
104       
105        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
106       
107        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
108
109       
110       
111
112    def createOrUpdateRecord(self):
113        '''
114        Looks up a record in the DB; if it finds it, update it, otherwise create it
115        @return result: True if record created/updated, false if otherwise
116        '''
117       
118        self.getRecordID()
119        returnCode=0 # 0 if failed, 1 if update, 2 if create
120         
121        if self._record.db_id:               
122            if self.updateRecord():             
123                returnCode = 1
124        else:
125                #create the record!         
126            if self.createRecord():
127                returnCode = 2
128               
129        return returnCode
130           
131
132           
133    def createRecord(self):
134        '''
135        Create a record in the postgres DB based on the specified PostgresRecord object
136        @return result: True if record created, false if otherwise
137        '''
138        logging.info("Creating a new record in the DB for the metada document")
139        # firstly store the actual documents contained by the record - i.e. the original doc +
140        # the various transforms required
141        self._insertOriginalRecord()
142        self._insertMetadataRecords()
143       
144        # Now add the spatiotemporal data
145        self._insertSpatioTemporalData()
146       
147        #if iso original metadata format, add in additional fields       
148        self._updateISOcolumns()
149       
150        logging.info("New record created")
151        return True
152       
153       
154   
155    def _updateISOcolumns(self):
156        '''
157        Method to control the update of the extra ISO columns required for full MEDIN ingest
158        '''
159       
160        dbColumns = self.isoDataModel.mapIsoFieldsToDBcolumns()
161       
162        for isoData in dbColumns.keys():
163               
164                logging.info("Attempting to update: " + isoData)
165               
166               
167               
168                                       
169                columnValue = getattr(self.isoDataModel,isoData)
170                columnName = dbColumns[isoData]
171               
172               
173               
174                if columnValue is not None:
175                        if self._updateDataBaseColumn(self._record.shortFilename, columnName, columnValue):
176                                logging.info("Successfully updated " + isoData)
177                        else:
178                                logging.warn("Could not update " + isoData)
179                else:
180                        logging.info(isoData + " is None - skipping")
181               
182               
183               
184   
185    '''
186    Method to update individual columns within the Discovery database based on unique id (original_document_filename,
187    column name and value to be inserted into that column
188    '''
189    def _updateDataBaseColumn(self,original_document_filename,columnName,columnValue):
190       
191        report = False
192       
193        logging.info("Updating " + columnName + " for record: " + original_document_filename)
194                       
195        #if columnName == 'resource_locator':
196        #               import pdb
197        #               pdb.set_trace()         
198       
199        #get the matching dictionary of postgres datatypes defined in the iso model
200        postgresDataType = self.isoDataModel.mapDBcolumnsToIsoDataTypes()[columnName]
201       
202        #construct the actual sql command here - easier to handle than using sql function (& to adapt!)
203        #remember doing it on a 1:1 basis here - convert raw multi list to space delimited value if text of tsVector
204        if (postgresDataType == 'tsvector') or (postgresDataType == 'text'):
205               
206                logging.info("Data type is text or tsvector!")
207               
208               
209                #need to work out whether value passed here to be updated is a string or list
210                if type(columnValue) is list:
211                       
212                        if len(columnValue[0]) == 0:
213                                logging.info("No data to extract - no point continuing!")
214                                return False
215                         
216                        elif len(columnValue[0]) > 1:
217                                newColVal = ""
218                                cnt = 0
219                               
220                                for val in columnValue[0]:
221                                        if cnt == 0:
222                                                newColVal = newColVal + val
223                                else:
224                                                newColVal = newColVal + "; " + val
225                               
226                                cnt = 1
227                         
228                        else:
229                                #get rid of any special characters
230                                newColVal = self._record.escapeSpecialCharacters(columnValue[0][0])
231                               
232                else:
233                        logging.info("Value type for %s has already been converted to a simple string" %columnName )
234         
235                        #remaining type to deal with is the value has already been converted to a simple string (i.e. authors, data centre name)
236                        newColVal = self._record.escapeSpecialCharacters(columnValue)
237               
238        elif postgresDataType == 'boolean':
239                logging.info("Data type is Boolean!")
240                newColVal = columnValue
241                       
242        else:
243                logging.info("Data type not text or vector!")
244               
245                if len(columnValue[0]) == 0:
246                        newColVal = "null"
247                elif len(columnValue[0]) > 1:
248                        logging.warn("NOTE: " + columnName + " is attempting to add multiple values - just take first for now!")
249                        newColVal = self._record.escapeSpecialCharacters(columnValue[0][0])
250                else:
251                        newColVal = self._record.escapeSpecialCharacters(columnValue[0][0])
252               
253                #TODO: at some stage add some checking of timestamps etc
254               
255       
256       
257        #build the sql query
258        #i.e. update original_document set original_format_version = 'dooby' where original_document_filename = 'dooby.ac.uk.shgfhf.xml'
259        if postgresDataType == 'tsvector':
260                sqlCmd = "update original_document set " + columnName + " = to_tsvector('english','" + newColVal + "') where original_document_filename = '" + original_document_filename + "';"
261       
262        elif postgresDataType == 'boolean':
263                sqlCmd = "update original_document set  %s = %s where original_document_filename = '%s';" %(columnName,newColVal,original_document_filename)
264               
265        else:
266                sqlCmd = "update original_document set " + columnName + " = '" + newColVal + "' where original_document_filename = '" + original_document_filename + "';"
267       
268        #submit the sqlQuery
269        try:
270               
271                #deal with nones
272                sqlCmd = sqlCmd.replace("'null'", "null")
273                sqlCmd = sqlCmd.replace("'None'", "null")
274                sqlCmd = sqlCmd.replace("None", "null")
275               
276                #sort out escape chars from previous catches (i.e. parameters)
277                sqlCmd = sqlCmd.replace("\\'","\'")             
278               
279                self.pgc.runSQLCommand(sqlCmd)
280               
281                report = True
282               
283        except:
284                logging.error("Could not submit query for: " + sqlCmd)
285               
286       
287        return report
288       
289       
290
291    def updateRecord(self):
292        '''
293        Update a record in the postgres DB based on the specified PostgresRecord object
294        @return result: True if record updated, false if otherwise
295        '''
296        logging.info("Record already existing in DB - performing updates")
297        result = False
298       
299        # firstly, check the document is actually new - i.e. not just a repeat of the same data
300        if self._checkIsUpdatedRecord():
301           
302            # firstly, update the original record
303            self._updateOriginalRecord()
304           
305            # now update the actual documents contained by the record - i.e. the original doc +
306            # the various transforms required
307            self._updateMetadataRecords()
308           
309            # If doing an update of an existing record, clear out existing spatiotemporal
310            # data, rather than updating it, to keep things simple
311            logging.info("Clearing out existing data for record - to allow clean update")
312            self._deleteSpatioTemporalData()
313            self._insertSpatioTemporalData()
314            result = True
315           
316            #if iso original metadata format, add in additional fields
317            self._updateISOcolumns()
318       
319        logging.info("Finish processing document...")
320       
321        return result
322       
323       
324       
325    def _checkIsUpdatedRecord(self):
326        '''
327        Looks up an existing record and checks it is not identical to the new record; if it is
328        incremement the harvest_count value and don't process again
329        @return: True if doc contains changes, False otherwise
330        '''
331        logging.info("Checking the updated document actually contains changes")
332       
333        #sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
334        #    str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
335       
336        #get the harvest count and scn from the original_document table...
337        sql_orig = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + str(self._record.db_id) + ";"
338       
339        results_orig = self.pgc.runSQLCommand(sql_orig)
340       
341        #we MUST make sure we compare the original format - any xquery conversions will produce differences even if like for like
342        if self._record.docType == 'DIF_9.4':
343               
344                #make sure we escape any special characters....
345                origXMLToCompare = self._record.escapeSpecialCharacters(self._record.difXML)
346        else:           
347                origXMLToCompare = self._record.escapeSpecialCharacters(self._record.isoXML)
348       
349        #get the original_document from the transformed docs table       
350        sql_transf = "SELECT transformed_document_id FROM transformed_document where original_document_id = " + str(self._record.db_id) +  \
351                " AND transformed_document = '" + origXMLToCompare + "';"
352       
353        #import pdb
354        #pdb.set_trace()
355        results_transf = self.pgc.runSQLCommand(sql_transf)       
356       
357        # NB, if the document is not identical, the sql command will not find anything
358        if not results_transf:
359            logging.info("Ingested document is different to that in the current DB")
360           
361            # get the current SCN
362            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
363                    str(self._record.db_id) + ";"
364            results = self.pgc.runSQLCommand(sql)
365            self._record.scn = results[0][0]
366            return True
367           
368        count = results_orig[0][0]
369
370        # get current system change number
371        scn = results_orig[0][1]
372        self._record.scn = scn
373        logging.info("Ingested document is identical to document currently in DB - " + \
374                     "incrementing harvest_count")
375        count += 1
376        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
377            " WHERE original_document_id = " + str(self._record.db_id)
378        self.pgc.runSQLCommand(sql)
379        return False
380
381
382    def _deleteSpatioTemporalData(self):
383        '''
384        Delete all existing spatiotemporal data for the current record
385        - NB, the delete is set to cascade from the linking table so only need to
386        delete entries from there
387        '''
388        logging.info("Deleting existing spatial data for record")
389       
390        try:
391                sqlCmd = "DELETE FROM spatial_data WHERE original_document_id = " + str(self._record.db_id) + ";"
392               
393                self.pgc.runSQLCommand(sqlCmd)
394               
395                logging.info("Spatial data for %s deleted ok" %self._record.db_id)
396           
397        except:
398                logging.error("Could not delete spatial record %s" %self._record.db_id)
399       
400       
401        logging.info("Deleting existing temporal data for record")
402       
403        try:
404                sqlCmd = "DELETE FROM temporal_data WHERE original_document_id = " + str(self._record.db_id) + ";"
405               
406                self.pgc.runSQLCommand(sqlCmd)
407               
408                logging.info("temporal data for %s deleted ok" %self._record.db_id)
409           
410        except:
411                logging.error("Could not delete temporal record %s" %self._record.db_id)
412       
413        logging.info("Spatio - temporal data deleted successfully")
414       
415
416
417    def _insertSpatioTemporalRow(self, coords, timeRange):
418        '''
419        Create a single row record in the postgres DB based on the
420        input data.  Updated to operate with dictionary methods for coords and timeRange as part of MEDIN ISO upgrade
421        @param coords: a Coords object representing NSWE coords
422        @param timeRange: a TimeRange object representing a start/end date
423        '''
424        logging.info("Adding spatiotemporal row to DB")
425       
426       
427        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
428            str(coords['north']) + "', '" + str(coords['south']) + "', '" + str(coords['west']) + "', '" + \
429            str(coords['east']) + "', '" + timeRange['start'] + "', '" + timeRange['end'] + "');"
430           
431        # fix any null strings
432        sqlCmd = sqlCmd.replace("'null'", "null")
433               
434        self.pgc.runSQLCommand(sqlCmd)
435        logging.info("Spatiotemporal row added successfully")
436       
437       
438   
439    def _insertSpatioTemporalData(self):
440        '''
441        Create a record in the postgres DB based on the spatiotemporal data
442        specified in the PostgresRecord object
443       
444        Note updated to work with ISO metadata object where coords and times already defined as individual items.
445       
446        '''
447        logging.info("Adding spatiotemporal data to DB record")
448       
449        # Work out the relationship between the spatial and temporal data and handle appropriately
450       
451       
452        TimeRange = self._record.parseTemporalInfo(self._record.datasetTemporalData)
453        SpatialCoords = self._record.parseSpatialInfo(self._record.datasetSpatialData)
454       
455       
456        if (len(TimeRange) == 0) and (len(SpatialCoords) == 0):
457                logging.info("No spatiotemporal data found for record - skipping")
458                return
459       
460       
461        # if both arrays of data are the same length, assume they map together       
462        i = 0
463        if len(TimeRange) == len(SpatialCoords):
464            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
465            for times in TimeRange:
466                self._insertSpatioTemporalRow(SpatialCoords[i], times)
467                i += 1
468
469        # otherwise, map everything to everything
470        else:
471            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
472            for times in TimeRange:
473                for coords in SpatialCoords:
474                   
475                    self._insertSpatioTemporalRow(coords, times)
476                   
477        logging.info("Spatiotemporal data added to DB record")
478       
479       
480   
481    def _insertOriginalRecord(self):
482        '''
483        Insert the original metadata doc into the postgres DB
484        '''
485        logging.info("Inserting new original document in Postgres DB")
486       
487       
488        ''' ndg3 style command
489        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
490            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
491            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
492            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
493            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
494                   
495        '''
496       
497        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
498            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
499            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
500            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
501            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + \
502            self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
503       
504       
505        #sort out any nulls..
506        sqlCmd = sqlCmd.replace("'NULL'","NULL")
507       
508        #sort out any Nones
509        sqlCmd = sqlCmd.replace("'None'","NULL")
510       
511       
512        id = self.pgc.runSQLCommand(sqlCmd)
513        if len(id) == 0:
514            raise ValueError, 'No PK ID returned from INSERT to DB'
515       
516        self._record.db_id = id[0][0] 
517       
518        logging.info("Original document inserted in Postgres DB")
519           
520   
521    def deleteOriginalRecord(self):
522        '''
523        Delete the original metadata doc from the postgres DB
524        '''
525        logging.info("Deleting original document from Postgres DB")
526        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
527
528        self.pgc.runSQLCommand(sqlCmd)
529        logging.info("Original document deleted from Postgres DB")
530       
531   
532    def _updateOriginalRecord(self):
533        '''
534        Update the original doc in the postgres DB
535        '''
536       
537        logging.info("Updating original document in Postgres DB")
538        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
539            self._record.shortFilename + "', '" + \
540            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
541            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
542            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
543            str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + \
544            "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + \
545            self._record.datasetEndNom + "');" 
546           
547       
548        #sort out any NULL values"
549        sqlCmd = sqlCmd.replace("'NULL'","NULL")
550       
551        #sort out any Nones
552        sqlCmd = sqlCmd.replace("'None'","NULL")       
553       
554        logging.info("Submitting SQL command")
555        #logging.info("SQl command:   " + sqlCmd)         
556       
557        self.pgc.runSQLCommand(sqlCmd)
558       
559        # increment scn - saves doing an additional db call
560        self._record.scn += 1
561       
562        logging.info("Original document updated in Postgres DB")
563   
564           
565    def _insertMetadataRecords(self):
566        '''
567        Insert the metadata docs into the postgres DB
568        '''
569        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
570       
571       
572        if self._record.db_id is None:
573            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
574            return
575       
576        #for docType, doc in self._record.getAllDocs(self.discovery_dir):       
577        metadataDocs = self._record.getAllDocs(self.discovery_dir)       
578               
579        for docType in metadataDocs.keys():
580                               
581            logging.info("Inserting metadata of type %s" %docType)
582           
583            #fudge!
584            if docType == 'DIF_9.4':
585                doc = self._record.escapeSpecialCharacters(metadataDocs[docType])
586            else:
587                doc = metadataDocs[docType]
588                                               
589            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
590                "original_document_id, transformed_format, " \
591                "transformed_document, create_date, scn) VALUES (" \
592                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
593                docType + "', '" + doc + "', current_timestamp, 1);"
594               
595           
596            #print "\n\n\n%s\n\n\n" %sqlCmd
597       
598           
599            self.pgc.runSQLCommand(sqlCmd)
600       
601        logging.info("Transformed records created in DB")
602   
603   
604    def _updateMetadataRecords(self):
605        '''
606        Update the metadata docs into the postgres DB
607        '''
608       
609        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
610        if self._record.db_id is None:
611            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
612            return
613       
614        #for docType, doc in self._record.getAllDocs(self.discovery_dir):
615        metadataDocs = self._record.getAllDocs(self.discovery_dir)
616               
617        for docType in metadataDocs.keys():
618               
619                logging.info("Updating metadata of type %s" %docType)
620               
621                sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" +  metadataDocs[docType] + "', update_date = current_timestamp WHERE original_document_id = " + str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
622               
623                self.pgc.runSQLCommand(sqlCmd)
624   
625        logging.info("Transformed records updated in DB")
626
627
628    def setRecord(self, record):
629        '''
630        Set the record to use with the DAL - to allow object re-use
631        @param record: PostgresRecord to use with the DAL
632        '''
633        self._record = record
634
Note: See TracBrowser for help on using the repository browser.