source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 6660

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@6660
Revision 6660, 21.3 KB checked in by sdonegan, 11 years ago (diff)

Update includes latest changes to allow full MEDIN xml and DIF xml ingest - now updates stubISO as proper MEDIN format in transformed docs table

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging, datetime
7#from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, isoDataModel,transformationDir, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33        self.isoDataModel = isoDataModel       
34        self.discovery_dir = transformationDir
35       
36       
37   
38
39    def getRecordID(self):
40        '''
41        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
42        returns '-1'
43        @return: id of record, if it exists, '-1' if it doesn't
44        '''
45        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
46        if self._record.db_id is not None and self._record.db_id > 0:
47            logging.info("Already looked up record - ID is " + str(self._record.db_id))
48            return self._record.db_id
49       
50        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
51            self._record.discovery_id + "';"
52        dbId = self.pgc.runSQLCommand(sql)
53        if dbId:
54            self._record.db_id = dbId[0][0]
55           
56    def getRecordID_using_OriginalDocumentFilename(self):
57        '''
58        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
59        returns '-1'
60        @return: id of record, if it exists, '-1' if it doesn't
61        '''
62        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
63       
64        '''if self._record.db_id is not None and self._record.db_id > 0:
65            logging.info("Already looked up record - ID is " + str(self._record.db_id))
66            return self._record.db_id'''
67       
68        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
69       
70        dbId = self.pgc.runSQLCommand(sql)
71       
72        if dbId:
73            self._record.db_id = dbId[0][0]
74           
75           
76    def getDiscoveryID_using_OriginalDocumentFilename(self):
77        '''
78        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
79        returns '-1'
80        @return: id of record, if it exists, '-1' if it doesn't
81        '''
82        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
83       
84        '''if self._record.db_id is not None and self._record.db_id > 0:
85            logging.info("Already looked up record - ID is " + str(self._record.db_id))
86            return self._record.db_id'''
87       
88        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
89       
90        dbId = self.pgc.runSQLCommand(sql)
91       
92        if dbId:
93            self._record.discovery_id = dbId[0][0]
94           
95       
96    def getTemporalDataId(self):
97       
98        '''
99        Looks up the temporal data id using the original document id
100        '''
101       
102        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
103       
104        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
105
106       
107       
108
109    def createOrUpdateRecord(self):
110        '''
111        Looks up a record in the DB; if it finds it, update it, otherwise create it
112        @return result: True if record created/updated, false if otherwise
113        '''
114       
115        self.getRecordID()
116        returnCode=0 # 0 if failed, 1 if update, 2 if create
117         
118        if self._record.db_id:               
119            if self.updateRecord():             
120                returnCode = 1
121        else:
122                #create the record!         
123            if self.createRecord():
124                returnCode = 2
125               
126        return returnCode
127           
128
129           
130    def createRecord(self):
131        '''
132        Create a record in the postgres DB based on the specified PostgresRecord object
133        @return result: True if record created, false if otherwise
134        '''
135        logging.info("Creating a new record in the DB for the metada document")
136        # firstly store the actual documents contained by the record - i.e. the original doc +
137        # the various transforms required
138        self._insertOriginalRecord()
139        self._insertMetadataRecords()
140       
141        # Now add the spatiotemporal data
142        self._insertSpatioTemporalData()
143       
144        #if iso original metadata format, add in additional fields       
145        self._updateISOcolumns()
146       
147        logging.info("New record created")
148        return True
149       
150       
151   
152    def _updateISOcolumns(self):
153        '''
154        Method to control the update of the extra ISO columns required for full MEDIN ingest
155        '''
156       
157        dbColumns = self.isoDataModel.mapIsoFieldsToDBcolumns()
158       
159        for isoData in dbColumns.keys():
160               
161                logging.info("Attempting to update: " + isoData)
162               
163                columnValue = getattr(self.isoDataModel,isoData)
164                columnName = dbColumns[isoData]
165               
166                if columnValue is not None:
167                        if self._updateDataBaseColumn(self._record.shortFilename, columnName, columnValue):
168                                logging.info("Successfully updated " + isoData)
169                        else:
170                                logging.warn("Could not update " + isoData)
171                else:
172                        logging.info(isoData + " is None - skipping")
173               
174               
175               
176   
177    '''
178    Method to update individual columns within the Discovery database based on unique id (original_document_filename,
179    column name and value to be inserted into that column
180    '''
181    def _updateDataBaseColumn(self,original_document_filename,columnName,columnValue):
182       
183        report = False
184       
185        logging.info("Updating " + columnName + " for record: " + original_document_filename)
186       
187        #get the matching dictionary of postgres datatypes defined in the iso model
188        postgresDataType = self.isoDataModel.mapDBcolumnsToIsoDataTypes()[columnName]
189       
190        #construct the actual sql command here - easier to handle than using sql function (& to adapt!)
191        #remember doing it on a 1:1 basis here - convert raw multi list to space delimited value if text of tsVector
192        if (postgresDataType == 'tsvector') or (postgresDataType == 'text'):
193               
194                logging.info("Data type is text or tsvector!")
195               
196                if len(columnValue[0]) == 0:
197                        logging.info("No data to extract - no point continuing!")
198                        return False
199               
200                elif len(columnValue[0]) > 1:
201                        newColVal = ""
202                        cnt = 0
203                        for val in columnValue[0]:
204                                if cnt == 0:
205                                        newColVal = newColVal + val
206                                else:
207                                        newColVal = newColVal + "; " + val
208                               
209                                cnt = 1
210                               
211                else:
212                        newColVal = columnValue[0][0]
213                       
214        else:
215                logging.info("Data type not text or vector!")
216               
217                if len(columnValue[0]) > 1:
218                        logging.warn("NOTE: " + columnName + " is attempting to add multiple values - just take first for now!")
219                        newColVal = columnValue[0][0]
220                else:
221                        newColVal = columnValue[0][0]
222               
223                #TODO: at some stage add some checking of timestamps etc
224               
225        #get rid of any special characters
226        newColVal = self._record.escapeSpecialCharacters(newColVal)
227       
228        #build the sql query
229        #i.e. update original_document set original_format_version = 'dooby' where original_document_filename = 'dooby.ac.uk.shgfhf.xml'
230        if postgresDataType == 'tsvector':
231                sqlCmd = "update original_document set " + columnName + " = to_tsvector('english','" + newColVal + "') where original_document_filename = '" + original_document_filename + "';"
232       
233        else:
234                sqlCmd = "update original_document set " + columnName + " = '" + newColVal + "' where original_document_filename = '" + original_document_filename + "';"
235       
236        #submit the sqlQuery
237        try:
238               
239                #deal with nones
240                sqlCmd = sqlCmd.replace("'null'", "null")
241                sqlCmd = sqlCmd.replace("'None'", "null")
242                sqlCmd = sqlCmd.replace("None", "null")
243               
244                self.pgc.runSQLCommand(sqlCmd)
245               
246                report = True
247               
248        except:
249                logging.error("Could not submit query for: " + sqlCmd)
250               
251       
252        return report
253       
254       
255
256    def updateRecord(self):
257        '''
258        Update a record in the postgres DB based on the specified PostgresRecord object
259        @return result: True if record updated, false if otherwise
260        '''
261        logging.info("Record already existing in DB - performing updates")
262        result = False
263       
264        # firstly, check the document is actually new - i.e. not just a repeat of the same data
265        if self._checkIsUpdatedRecord():
266           
267            # firstly, update the original record
268            self._updateOriginalRecord()
269           
270            # now update the actual documents contained by the record - i.e. the original doc +
271            # the various transforms required
272            self._updateMetadataRecords()
273           
274            # If doing an update of an existing record, clear out existing spatiotemporal
275            # data, rather than updating it, to keep things simple
276            logging.info("Clearing out existing data for record - to allow clean update")
277           
278            self._deleteSpatioTemporalData()
279            self._insertSpatioTemporalData()
280            result = True
281           
282            #if iso original metadata format, add in additional fields
283            self._updateISOcolumns()
284       
285        logging.info("Finish processing document...")
286       
287        return result
288       
289       
290       
291    def _checkIsUpdatedRecord(self):
292        '''
293        Looks up an existing record and checks it is not identical to the new record; if it is
294        incremement the harvest_count value and don't process again
295        @return: True if doc contains changes, False otherwise
296        '''
297        logging.info("Checking the updated document actually contains changes")
298
299        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
300            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
301        results = self.pgc.runSQLCommand(sql)
302
303        # NB, if the document is not identical, the sql command will not find anything
304        if not results:
305            logging.info("Ingested document is different to that in the current DB")
306            # get the current SCN
307            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
308                    str(self._record.db_id) + ";"
309            results = self.pgc.runSQLCommand(sql)
310            self._record.scn = results[0][0]
311            return True
312           
313        count = results[0][0]
314
315        # get current system change number
316        scn = results[0][1]
317        self._record.scn = scn
318        logging.info("Ingested document is identical to document currently in DB - " + \
319                     "incrementing harvest_count")
320        count += 1
321        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
322            " WHERE original_document_id = " + str(self._record.db_id)
323        self.pgc.runSQLCommand(sql)
324        return False
325
326
327    def _deleteSpatioTemporalData(self):
328        '''
329        Delete all existing spatiotemporal data for the current record
330        - NB, the delete is set to cascade from the linking table so only need to
331        delete entries from there
332        '''
333        logging.info("Deleting existing spatiotemporal data for record")
334        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
335            str(self._record.db_id) + ";"     
336
337        self.pgc.runSQLCommand(sqlCmd)
338        logging.info("Spatiotemporal data deleted successfully")
339       
340
341
342    def _insertSpatioTemporalRow(self, coords, timeRange):
343        '''
344        Create a single row record in the postgres DB based on the
345        input data.  Updated to operate with dictionary methods for coords and timeRange as part of MEDIN ISO upgrade
346        @param coords: a Coords object representing NSWE coords
347        @param timeRange: a TimeRange object representing a start/end date
348        '''
349        logging.info("Adding spatiotemporal row to DB")
350       
351        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
352            str(coords['north']) + "', '" + str(coords['south']) + "', '" + str(coords['west']) + "', '" + \
353            str(coords['east']) + "', '" + timeRange['start'] + "', '" + timeRange['end'] + "');"
354           
355        # fix any null strings
356        sqlCmd = sqlCmd.replace("'null'", "null")
357               
358        self.pgc.runSQLCommand(sqlCmd)
359        logging.info("Spatiotemporal row added successfully")
360       
361       
362   
363    def _insertSpatioTemporalData(self):
364        '''
365        Create a record in the postgres DB based on the spatiotemporal data
366        specified in the PostgresRecord object
367       
368        Note updated to work with ISO metadata object where coords and times already defined as individual items.
369       
370        '''
371        logging.info("Adding spatiotemporal data to DB record")
372       
373        # Work out the relationship between the spatial and temporal data and handle appropriately
374       
375       
376        TimeRange = self._record.parseTemporalInfo(self._record.datasetTemporalData)
377        SpatialCoords = self._record.parseSpatialInfo(self._record.datasetSpatialData)
378       
379               
380        if (len(TimeRange) == 0) and (len(SpatialCoords) == 0):
381                logging.info("No spatiotemporal data found for record - skipping")
382                return
383       
384       
385        # if both arrays of data are the same length, assume they map together       
386        i = 0
387        if len(TimeRange) == len(SpatialCoords):
388            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
389            for times in TimeRange:
390                self._insertSpatioTemporalRow(SpatialCoords[i], times)
391                i += 1
392
393        # otherwise, map everything to everything
394        else:
395            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
396            for times in TimeRange:
397                for coords in SpatialCoords:
398                   
399                    self._insertSpatioTemporalRow(coords, times)
400                   
401        logging.info("Spatiotemporal data added to DB record")
402       
403       
404   
405    def _insertOriginalRecord(self):
406        '''
407        Insert the original metadata doc into the postgres DB
408        '''
409        logging.info("Inserting new original document in Postgres DB")
410       
411       
412        ''' ndg3 style command
413        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
414            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
415            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
416            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
417            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
418                   
419        '''
420       
421        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
422            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
423            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
424            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
425            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + \
426            self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
427       
428       
429        #sort out any nulls..
430        sqlCmd = sqlCmd.replace("'NULL'","NULL")
431       
432        #sort out any Nones
433        sqlCmd = sqlCmd.replace("'None'","NULL")
434       
435        id = self.pgc.runSQLCommand(sqlCmd)
436        if len(id) == 0:
437            raise ValueError, 'No PK ID returned from INSERT to DB'
438       
439        self._record.db_id = id[0][0] 
440       
441        logging.info("Original document inserted in Postgres DB")
442           
443   
444    def deleteOriginalRecord(self):
445        '''
446        Delete the original metadata doc from the postgres DB
447        '''
448        logging.info("Deleting original document from Postgres DB")
449        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
450
451        self.pgc.runSQLCommand(sqlCmd)
452        logging.info("Original document deleted from Postgres DB")
453       
454   
455    def _updateOriginalRecord(self):
456        '''
457        Update the original doc in the postgres DB
458        '''
459        logging.info("Updating original document in Postgres DB")
460        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
461            self._record.shortFilename + "', '" + \
462            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
463            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
464            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
465            str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + \
466            "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + \
467            self._record.datasetEndNom + "');" 
468           
469       
470        #sort out any NULL values"
471        sqlCmd = sqlCmd.replace("'NULL'","NULL")
472       
473        #sort out any Nones
474        sqlCmd = sqlCmd.replace("'None'","NULL")       
475       
476        logging.info("Submitting SQL command")
477        #logging.info("SQl command:   " + sqlCmd)         
478       
479        self.pgc.runSQLCommand(sqlCmd)
480       
481        # increment scn - saves doing an additional db call
482        self._record.scn += 1
483       
484        logging.info("Original document updated in Postgres DB")
485   
486           
487    def _insertMetadataRecords(self):
488        '''
489        Insert the metadata docs into the postgres DB
490        '''
491        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
492       
493       
494        if self._record.db_id is None:
495            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
496            return
497       
498        for docType, doc in self._record.getAllDocs(self.discovery_dir):
499               
500            #if docType is original input xml then put that in rather than some lossy transformed guff.
501            if docType == self._record.docType:
502                doc = self._record.originalXMLdoc
503           
504                               
505            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
506                "original_document_id, transformed_format, " \
507                "transformed_document, create_date, scn) VALUES (" \
508                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
509                docType + "', '" + doc + "', current_timestamp, 1);"
510           
511           
512            self.pgc.runSQLCommand(sqlCmd)
513       
514        logging.info("Transformed records created in DB")
515   
516   
517    def _updateMetadataRecords(self):
518        '''
519        Update the metadata docs into the postgres DB
520        '''
521       
522        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
523        if self._record.db_id is None:
524            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
525            return
526       
527        for docType, doc in self._record.getAllDocs(self.discovery_dir):
528           
529            #if docType is original input xml then put that in rather than some lossy transformed guff.
530            if docType == self._record.docType:
531                doc = self._record.originalXMLdoc             
532           
533            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
534                "', update_date = current_timestamp WHERE original_document_id = " + \
535                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
536               
537           
538            self.pgc.runSQLCommand(sqlCmd)
539   
540        logging.info("Transformed records updated in DB")
541
542
543    def setRecord(self, record):
544        '''
545        Set the record to use with the DAL - to allow object re-use
546        @param record: PostgresRecord to use with the DAL
547        '''
548        self._record = record
549
Note: See TracBrowser for help on using the repository browser.