source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 6618

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@6618
Revision 6618, 21.0 KB checked in by sdonegan, 12 years ago (diff)

Commit of code that updates successfully MEDIN ISO as well as GCMD DIF into NDG3 style postgres DB..

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging, datetime
7#from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, isoDataModel,transformationDir, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33        self.isoDataModel = isoDataModel       
34        self.discovery_dir = transformationDir
35       
36       
37   
38
39    def getRecordID(self):
40        '''
41        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
42        returns '-1'
43        @return: id of record, if it exists, '-1' if it doesn't
44        '''
45        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
46        if self._record.db_id is not None and self._record.db_id > 0:
47            logging.info("Already looked up record - ID is " + str(self._record.db_id))
48            return self._record.db_id
49       
50        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
51            self._record.discovery_id + "';"
52        dbId = self.pgc.runSQLCommand(sql)
53        if dbId:
54            self._record.db_id = dbId[0][0]
55           
56    def getRecordID_using_OriginalDocumentFilename(self):
57        '''
58        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
59        returns '-1'
60        @return: id of record, if it exists, '-1' if it doesn't
61        '''
62        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
63       
64        '''if self._record.db_id is not None and self._record.db_id > 0:
65            logging.info("Already looked up record - ID is " + str(self._record.db_id))
66            return self._record.db_id'''
67       
68        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
69       
70        dbId = self.pgc.runSQLCommand(sql)
71       
72        if dbId:
73            self._record.db_id = dbId[0][0]
74           
75           
76    def getDiscoveryID_using_OriginalDocumentFilename(self):
77        '''
78        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
79        returns '-1'
80        @return: id of record, if it exists, '-1' if it doesn't
81        '''
82        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
83       
84        '''if self._record.db_id is not None and self._record.db_id > 0:
85            logging.info("Already looked up record - ID is " + str(self._record.db_id))
86            return self._record.db_id'''
87       
88        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
89       
90        dbId = self.pgc.runSQLCommand(sql)
91       
92        if dbId:
93            self._record.discovery_id = dbId[0][0]
94           
95       
96    def getTemporalDataId(self):
97       
98        '''
99        Looks up the temporal data id using the original document id
100        '''
101       
102        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
103       
104        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
105
106       
107       
108
109    def createOrUpdateRecord(self):
110        '''
111        Looks up a record in the DB; if it finds it, update it, otherwise create it
112        @return result: True if record created/updated, false if otherwise
113        '''
114       
115        self.getRecordID()
116        returnCode=0 # 0 if failed, 1 if update, 2 if create
117         
118        if self._record.db_id:               
119            if self.updateRecord():             
120                returnCode = 1
121        else:
122                #create the record!         
123            if self.createRecord():
124                returnCode = 2
125               
126        return returnCode
127           
128
129           
130    def createRecord(self):
131        '''
132        Create a record in the postgres DB based on the specified PostgresRecord object
133        @return result: True if record created, false if otherwise
134        '''
135        logging.info("Creating a new record in the DB for the metada document")
136        # firstly store the actual documents contained by the record - i.e. the original doc +
137        # the various transforms required
138        self._insertOriginalRecord()
139        self._insertMetadataRecords()
140       
141        # Now add the spatiotemporal data
142        self._insertSpatioTemporalData()
143       
144        #if iso original metadata format, add in additional fields       
145        self._updateISOcolumns()
146       
147        logging.info("New record created")
148        return True
149       
150   
151    def _updateISOcolumns(self):
152        '''
153        Method to control the update of the extra ISO columns required for full MEDIN ingest
154        '''
155       
156        dbColumns = self.isoDataModel.mapIsoFieldsToDBcolumns()
157       
158        for isoData in dbColumns.keys():
159               
160                logging.info("Attempting to update: " + isoData)
161               
162                columnValue = getattr(self.isoDataModel,isoData)
163                columnName = dbColumns[isoData]
164               
165                if columnValue is not None:
166                        if self._updateDataBaseColumn(self._record.shortFilename, columnName, columnValue):
167                                logging.info("Successfully updated " + isoData)
168                        else:
169                                logging.warn("Could not update " + isoData)
170                else:
171                        logging.info(isoData + " is None - skipping")
172               
173               
174               
175   
176    '''
177    Method to update individual columns within the Discovery database based on unique id (original_document_filename,
178    column name and value to be inserted into that column
179    '''
180    def _updateDataBaseColumn(self,original_document_filename,columnName,columnValue):
181       
182        report = False
183       
184        logging.info("Updating " + columnName + " for record: " + original_document_filename)
185       
186        #get the matching dictionary of postgres datatypes defined in the iso model
187        postgresDataType = self.isoDataModel.mapDBcolumnsToIsoDataTypes()[columnName]
188       
189        #construct the actual sql command here - easier to handle than using sql function (& to adapt!)
190        #remember doing it on a 1:1 basis here - convert raw multi list to space delimited value if text of tsVector
191        if (postgresDataType == 'tsvector') or (postgresDataType == 'text'):
192               
193                logging.info("Data type is text or tsvector!")
194               
195                if len(columnValue[0]) == 0:
196                        logging.info("No data to extract - no point continuing!")
197                        return False
198               
199                elif len(columnValue[0]) > 1:
200                        newColVal = ""
201                        for val in columnValue[0]:
202                                newColVal = newColVal + val
203                               
204                else:
205                        newColVal = columnValue[0][0]
206                       
207        else:
208                logging.info("Data type not text or vector!")
209               
210                if len(columnValue[0]) > 1:
211                        logging.warn("NOTE: " + columnName + " is attempting to add multiple values - just take first for now!")
212                        newColVal = columnValue[0][0]
213                else:
214                        newColVal = columnValue[0][0]
215               
216                #TODO: at some stage add some checking of timestamps etc
217       
218        #build the sql query
219        #i.e. update original_document set original_format_version = 'dooby' where original_document_filename = 'dooby.ac.uk.shgfhf.xml'
220        if postgresDataType == 'tsvector':
221                sqlCmd = "update original_document set " + columnName + " = to_tsvector('english','" + newColVal + "') where original_document_filename = '" + original_document_filename + "';"
222       
223        else:
224                sqlCmd = "update original_document set " + columnName + " = '" + newColVal + "' where original_document_filename = '" + original_document_filename + "';"
225       
226        #submit the sqlQuery
227        try:
228                sqlCmd = sqlCmd.replace("'null'", "null")
229                sqlCmd = sqlCmd.replace("'None'", "null")
230                sqlCmd = sqlCmd.replace("None", "null")
231               
232                self.pgc.runSQLCommand(sqlCmd)
233               
234                report = True
235        except:
236                logging.error("Could not submit query for: " + sqlCmd)
237               
238       
239        return report
240       
241       
242
243    def updateRecord(self):
244        '''
245        Update a record in the postgres DB based on the specified PostgresRecord object
246        @return result: True if record updated, false if otherwise
247        '''
248        logging.info("Record already existing in DB - performing updates")
249        result = False
250       
251        # firstly, check the document is actually new - i.e. not just a repeat of the same data
252        if self._checkIsUpdatedRecord():
253           
254            # firstly, update the original record
255            self._updateOriginalRecord()
256           
257            # now update the actual documents contained by the record - i.e. the original doc +
258            # the various transforms required
259            self._updateMetadataRecords()
260           
261            # If doing an update of an existing record, clear out existing spatiotemporal
262            # data, rather than updating it, to keep things simple
263            logging.info("Clearing out existing data for record - to allow clean update")
264           
265            self._deleteSpatioTemporalData()
266            self._insertSpatioTemporalData()
267            result = True
268           
269            #if iso original metadata format, add in additional fields
270            self._updateISOcolumns()
271       
272        logging.info("Finish processing document...")
273       
274        return result
275       
276       
277       
278    def _checkIsUpdatedRecord(self):
279        '''
280        Looks up an existing record and checks it is not identical to the new record; if it is
281        incremement the harvest_count value and don't process again
282        @return: True if doc contains changes, False otherwise
283        '''
284        logging.info("Checking the updated document actually contains changes")
285
286        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
287            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
288        results = self.pgc.runSQLCommand(sql)
289
290        # NB, if the document is not identical, the sql command will not find anything
291        if not results:
292            logging.info("Ingested document is different to that in the current DB")
293            # get the current SCN
294            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
295                    str(self._record.db_id) + ";"
296            results = self.pgc.runSQLCommand(sql)
297            self._record.scn = results[0][0]
298            return True
299           
300        count = results[0][0]
301
302        # get current system change number
303        scn = results[0][1]
304        self._record.scn = scn
305        logging.info("Ingested document is identical to document currently in DB - " + \
306                     "incrementing harvest_count")
307        count += 1
308        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
309            " WHERE original_document_id = " + str(self._record.db_id)
310        self.pgc.runSQLCommand(sql)
311        return False
312
313
314    def _deleteSpatioTemporalData(self):
315        '''
316        Delete all existing spatiotemporal data for the current record
317        - NB, the delete is set to cascade from the linking table so only need to
318        delete entries from there
319        '''
320        logging.info("Deleting existing spatiotemporal data for record")
321        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
322            str(self._record.db_id) + ";"     
323
324        self.pgc.runSQLCommand(sqlCmd)
325        logging.info("Spatiotemporal data deleted successfully")
326       
327
328
329    def _insertSpatioTemporalRow(self, coords, timeRange):
330        '''
331        Create a single row record in the postgres DB based on the
332        input data.  Updated to operate with dictionary methods for coords and timeRange as part of MEDIN ISO upgrade
333        @param coords: a Coords object representing NSWE coords
334        @param timeRange: a TimeRange object representing a start/end date
335        '''
336        logging.info("Adding spatiotemporal row to DB")
337               
338        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
339            str(coords['north']) + "', '" + str(coords['south']) + "', '" + str(coords['west']) + "', '" + \
340            str(coords['east']) + "', '" + timeRange['start'] + "', '" + timeRange['end'] + "');"
341           
342        # fix any null strings
343        sqlCmd = sqlCmd.replace("'null'", "null")
344       
345       
346
347        self.pgc.runSQLCommand(sqlCmd)
348        logging.info("Spatiotemporal row added successfully")
349       
350       
351   
352    def _insertSpatioTemporalData(self):
353        '''
354        Create a record in the postgres DB based on the spatiotemporal data
355        specified in the PostgresRecord object
356       
357        Note updated to work with ISO metadata object where coords and times already defined as individual items.
358       
359        '''
360        logging.info("Adding spatiotemporal data to DB record")
361       
362        # Work out the relationship between the spatial and temporal data and handle appropriately
363       
364       
365        TimeRange = self._record.parseTemporalInfo(self._record.datasetTemporalData)
366        SpatialCoords = self._record.parseSpatialInfo(self._record.datasetSpatialData)
367       
368               
369        if (len(TimeRange) == 0) and (len(SpatialCoords) == 0):
370                logging.info("No spatiotemporal data found for record - skipping")
371                return
372       
373       
374        # if both arrays of data are the same length, assume they map together       
375        i = 0
376        if len(TimeRange) == len(SpatialCoords):
377            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
378            for times in TimeRange:
379                self._insertSpatioTemporalRow(SpatialCoords[i], times)
380                i += 1
381
382        # otherwise, map everything to everything
383        else:
384            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
385            for times in TimeRange:
386                for coords in SpatialCoords:
387                   
388                    self._insertSpatioTemporalRow(coords, times)
389                   
390        logging.info("Spatiotemporal data added to DB record")
391       
392       
393   
394    def _insertOriginalRecord(self):
395        '''
396        Insert the original metadata doc into the postgres DB
397        '''
398        logging.info("Inserting new original document in Postgres DB")
399       
400       
401        ''' ndg3 style command
402        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
403            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
404            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
405            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
406            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
407                   
408        '''
409       
410       
411        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
412            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
413            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
414            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
415            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + \
416            self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
417       
418       
419        #sort out any nulls..
420        sqlCmd = sqlCmd.replace("'NULL'","NULL")
421       
422        #sort out any Nones
423        sqlCmd = sqlCmd.replace("'None'","NULL")
424       
425        id = self.pgc.runSQLCommand(sqlCmd)
426        if len(id) == 0:
427            raise ValueError, 'No PK ID returned from INSERT to DB'
428       
429        self._record.db_id = id[0][0] 
430       
431        logging.info("Original document inserted in Postgres DB")
432           
433   
434    def deleteOriginalRecord(self):
435        '''
436        Delete the original metadata doc from the postgres DB
437        '''
438        logging.info("Deleting original document from Postgres DB")
439        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
440
441        self.pgc.runSQLCommand(sqlCmd)
442        logging.info("Original document deleted from Postgres DB")
443       
444   
445    def _updateOriginalRecord(self):
446        '''
447        Update the original doc in the postgres DB
448        '''
449        logging.info("Updating original document in Postgres DB")
450        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
451            self._record.shortFilename + "', '" + \
452            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
453            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
454            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');" 
455           
456       
457        #sort out any NULL values"
458        sqlCmd = sqlCmd.replace("'NULL'","NULL")
459       
460        #sort out any Nones
461        sqlCmd = sqlCmd.replace("'None'","NULL")       
462       
463        logging.info("Submitting SQL command")
464        #logging.info("SQl command:   " + sqlCmd)         
465       
466        self.pgc.runSQLCommand(sqlCmd)
467       
468        # increment scn - saves doing an additional db call
469        self._record.scn += 1
470       
471        logging.info("Original document updated in Postgres DB")
472   
473           
474    def _insertMetadataRecords(self):
475        '''
476        Insert the metadata docs into the postgres DB
477        '''
478        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
479       
480       
481        if self._record.db_id is None:
482            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
483            return
484       
485        for docType, doc in self._record.getAllDocs(self.discovery_dir):
486           
487            #if docType is original input xml then put that in rather than some lossy transformed guff.
488            if docType == self._record.docType:
489                doc = self._record.originalXMLdoc
490           
491                                 
492            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
493                "original_document_id, transformed_format, " \
494                "transformed_document, create_date, scn) VALUES (" \
495                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
496                docType + "', '" + doc + "', current_timestamp, 1);"
497           
498           
499            self.pgc.runSQLCommand(sqlCmd)
500       
501        logging.info("Transformed records created in DB")
502   
503   
504    def _updateMetadataRecords(self):
505        '''
506        Update the metadata docs into the postgres DB
507        '''
508       
509        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
510        if self._record.db_id is None:
511            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
512            return
513       
514        for docType, doc in self._record.getAllDocs(self.discovery_dir):
515           
516            #if docType is original input xml then put that in rather than some lossy transformed guff.
517            if docType == self._record.docType:
518                doc = self._record.originalXMLdoc             
519           
520            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
521                "', update_date = current_timestamp WHERE original_document_id = " + \
522                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
523               
524           
525            self.pgc.runSQLCommand(sqlCmd)
526   
527        logging.info("Transformed records updated in DB")
528
529
530    def setRecord(self, record):
531        '''
532        Set the record to use with the DAL - to allow object re-use
533        @param record: PostgresRecord to use with the DAL
534        '''
535        self._record = record
536
Note: See TracBrowser for help on using the repository browser.