source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 6851

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@6851
Revision 6851, 22.1 KB checked in by sdonegan, 10 years ago (diff)

Updated to handled checking changes to xml - looks in transformed_doc now as db structure changed

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging, datetime
7#from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, isoDataModel,transformationDir, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33        self.isoDataModel = isoDataModel       
34        self.discovery_dir = transformationDir
35       
36       
37   
38
39    def getRecordID(self):
40        '''
41        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
42        returns '-1'
43        @return: id of record, if it exists, '-1' if it doesn't
44        '''
45        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
46        if self._record.db_id is not None and self._record.db_id > 0:
47            logging.info("Already looked up record - ID is " + str(self._record.db_id))
48            return self._record.db_id
49       
50        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
51            self._record.discovery_id + "';"
52           
53        logging.info("sql lookup cmd = " + sql)
54        dbId = self.pgc.runSQLCommand(sql)
55        if dbId:
56            self._record.db_id = dbId[0][0]
57           
58    def getRecordID_using_OriginalDocumentFilename(self):
59        '''
60        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
61        returns '-1'
62        @return: id of record, if it exists, '-1' if it doesn't
63        '''
64        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
65       
66        '''if self._record.db_id is not None and self._record.db_id > 0:
67            logging.info("Already looked up record - ID is " + str(self._record.db_id))
68            return self._record.db_id'''
69       
70        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
71       
72        dbId = self.pgc.runSQLCommand(sql)
73       
74        if dbId:
75            self._record.db_id = dbId[0][0]
76           
77           
78    def getDiscoveryID_using_OriginalDocumentFilename(self):
79        '''
80        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
81        returns '-1'
82        @return: id of record, if it exists, '-1' if it doesn't
83        '''
84        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
85       
86        '''if self._record.db_id is not None and self._record.db_id > 0:
87            logging.info("Already looked up record - ID is " + str(self._record.db_id))
88            return self._record.db_id'''
89       
90        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
91       
92        dbId = self.pgc.runSQLCommand(sql)
93       
94        if dbId:
95            self._record.discovery_id = dbId[0][0]
96           
97       
98    def getTemporalDataId(self):
99       
100        '''
101        Looks up the temporal data id using the original document id
102        '''
103       
104        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
105       
106        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
107
108       
109       
110
111    def createOrUpdateRecord(self):
112        '''
113        Looks up a record in the DB; if it finds it, update it, otherwise create it
114        @return result: True if record created/updated, false if otherwise
115        '''
116       
117        self.getRecordID()
118        returnCode=0 # 0 if failed, 1 if update, 2 if create
119         
120        if self._record.db_id:               
121            if self.updateRecord():             
122                returnCode = 1
123        else:
124                #create the record!         
125            if self.createRecord():
126                returnCode = 2
127               
128        return returnCode
129           
130
131           
132    def createRecord(self):
133        '''
134        Create a record in the postgres DB based on the specified PostgresRecord object
135        @return result: True if record created, false if otherwise
136        '''
137        logging.info("Creating a new record in the DB for the metada document")
138        # firstly store the actual documents contained by the record - i.e. the original doc +
139        # the various transforms required
140        self._insertOriginalRecord()
141        self._insertMetadataRecords()
142       
143        # Now add the spatiotemporal data
144        self._insertSpatioTemporalData()
145       
146        #if iso original metadata format, add in additional fields       
147        self._updateISOcolumns()
148       
149        logging.info("New record created")
150        return True
151       
152       
153   
154    def _updateISOcolumns(self):
155        '''
156        Method to control the update of the extra ISO columns required for full MEDIN ingest
157        '''
158       
159        dbColumns = self.isoDataModel.mapIsoFieldsToDBcolumns()
160       
161        for isoData in dbColumns.keys():
162               
163                logging.info("Attempting to update: " + isoData)
164               
165                columnValue = getattr(self.isoDataModel,isoData)
166                columnName = dbColumns[isoData]
167               
168                if columnValue is not None:
169                        if self._updateDataBaseColumn(self._record.shortFilename, columnName, columnValue):
170                                logging.info("Successfully updated " + isoData)
171                        else:
172                                logging.warn("Could not update " + isoData)
173                else:
174                        logging.info(isoData + " is None - skipping")
175               
176               
177               
178   
179    '''
180    Method to update individual columns within the Discovery database based on unique id (original_document_filename,
181    column name and value to be inserted into that column
182    '''
183    def _updateDataBaseColumn(self,original_document_filename,columnName,columnValue):
184       
185        report = False
186       
187        logging.info("Updating " + columnName + " for record: " + original_document_filename)
188       
189        #get the matching dictionary of postgres datatypes defined in the iso model
190        postgresDataType = self.isoDataModel.mapDBcolumnsToIsoDataTypes()[columnName]
191       
192        #construct the actual sql command here - easier to handle than using sql function (& to adapt!)
193        #remember doing it on a 1:1 basis here - convert raw multi list to space delimited value if text of tsVector
194        if (postgresDataType == 'tsvector') or (postgresDataType == 'text'):
195               
196                logging.info("Data type is text or tsvector!")
197               
198                if len(columnValue[0]) == 0:
199                        logging.info("No data to extract - no point continuing!")
200                        return False
201               
202                elif len(columnValue[0]) > 1:
203                        newColVal = ""
204                        cnt = 0
205                        for val in columnValue[0]:
206                                if cnt == 0:
207                                        newColVal = newColVal + val
208                                else:
209                                        newColVal = newColVal + "; " + val
210                               
211                                cnt = 1
212                               
213                else:
214                        newColVal = columnValue[0][0]
215                       
216        else:
217                logging.info("Data type not text or vector!")
218               
219                if len(columnValue[0]) == 0:
220                        newColVal = "null"
221                elif len(columnValue[0]) > 1:
222                        logging.warn("NOTE: " + columnName + " is attempting to add multiple values - just take first for now!")
223                        newColVal = columnValue[0][0]
224                else:
225                        newColVal = columnValue[0][0]
226               
227                #TODO: at some stage add some checking of timestamps etc
228               
229        #get rid of any special characters
230        newColVal = self._record.escapeSpecialCharacters(newColVal)
231       
232        #build the sql query
233        #i.e. update original_document set original_format_version = 'dooby' where original_document_filename = 'dooby.ac.uk.shgfhf.xml'
234        if postgresDataType == 'tsvector':
235                sqlCmd = "update original_document set " + columnName + " = to_tsvector('english','" + newColVal + "') where original_document_filename = '" + original_document_filename + "';"
236       
237        else:
238                sqlCmd = "update original_document set " + columnName + " = '" + newColVal + "' where original_document_filename = '" + original_document_filename + "';"
239       
240        #submit the sqlQuery
241        try:
242               
243                #deal with nones
244                sqlCmd = sqlCmd.replace("'null'", "null")
245                sqlCmd = sqlCmd.replace("'None'", "null")
246                sqlCmd = sqlCmd.replace("None", "null")
247               
248                self.pgc.runSQLCommand(sqlCmd)
249               
250                report = True
251               
252        except:
253                logging.error("Could not submit query for: " + sqlCmd)
254               
255       
256        return report
257       
258       
259
260    def updateRecord(self):
261        '''
262        Update a record in the postgres DB based on the specified PostgresRecord object
263        @return result: True if record updated, false if otherwise
264        '''
265        logging.info("Record already existing in DB - performing updates")
266        result = False
267       
268        # firstly, check the document is actually new - i.e. not just a repeat of the same data
269        if self._checkIsUpdatedRecord():
270           
271            # firstly, update the original record
272            self._updateOriginalRecord()
273           
274            # now update the actual documents contained by the record - i.e. the original doc +
275            # the various transforms required
276            self._updateMetadataRecords()
277           
278            # If doing an update of an existing record, clear out existing spatiotemporal
279            # data, rather than updating it, to keep things simple
280            logging.info("Clearing out existing data for record - to allow clean update")
281           
282            self._deleteSpatioTemporalData()
283            self._insertSpatioTemporalData()
284            result = True
285           
286            #if iso original metadata format, add in additional fields
287            self._updateISOcolumns()
288       
289        logging.info("Finish processing document...")
290       
291        return result
292       
293       
294       
295    def _checkIsUpdatedRecord(self):
296        '''
297        Looks up an existing record and checks it is not identical to the new record; if it is
298        incremement the harvest_count value and don't process again
299        @return: True if doc contains changes, False otherwise
300        '''
301        logging.info("Checking the updated document actually contains changes")
302       
303        #sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
304        #    str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
305       
306        #get the harvest count and scn from the original_document table...
307        sql_orig = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + str(self._record.db_id) + ";"
308       
309        results_orig = self.pgc.runSQLCommand(sql_orig)
310       
311        #get the original_document from the transformed docs table       
312        sql_transf = "SELECT transformed_document_id FROM transformed_document where original_document_id = " + str(self._record.db_id) +  \
313                " AND transformed_document = '" + self._record.originalFormat + "';"
314               
315        results_transf = self.pgc.runSQLCommand(sql_transf)       
316       
317        # NB, if the document is not identical, the sql command will not find anything
318        if not results_transf:
319            logging.info("Ingested document is different to that in the current DB")
320           
321            # get the current SCN
322            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
323                    str(self._record.db_id) + ";"
324            results = self.pgc.runSQLCommand(sql)
325            self._record.scn = results[0][0]
326            return True
327           
328        count = results_orig[0][0]
329
330        # get current system change number
331        scn = results_orig[0][1]
332        self._record.scn = scn
333        logging.info("Ingested document is identical to document currently in DB - " + \
334                     "incrementing harvest_count")
335        count += 1
336        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
337            " WHERE original_document_id = " + str(self._record.db_id)
338        self.pgc.runSQLCommand(sql)
339        return False
340
341
342    def _deleteSpatioTemporalData(self):
343        '''
344        Delete all existing spatiotemporal data for the current record
345        - NB, the delete is set to cascade from the linking table so only need to
346        delete entries from there
347        '''
348        logging.info("Deleting existing spatiotemporal data for record")
349        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
350            str(self._record.db_id) + ";"     
351
352        self.pgc.runSQLCommand(sqlCmd)
353        logging.info("Spatiotemporal data deleted successfully")
354       
355
356
357    def _insertSpatioTemporalRow(self, coords, timeRange):
358        '''
359        Create a single row record in the postgres DB based on the
360        input data.  Updated to operate with dictionary methods for coords and timeRange as part of MEDIN ISO upgrade
361        @param coords: a Coords object representing NSWE coords
362        @param timeRange: a TimeRange object representing a start/end date
363        '''
364        logging.info("Adding spatiotemporal row to DB")
365       
366        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
367            str(coords['north']) + "', '" + str(coords['south']) + "', '" + str(coords['west']) + "', '" + \
368            str(coords['east']) + "', '" + timeRange['start'] + "', '" + timeRange['end'] + "');"
369           
370        # fix any null strings
371        sqlCmd = sqlCmd.replace("'null'", "null")
372               
373        self.pgc.runSQLCommand(sqlCmd)
374        logging.info("Spatiotemporal row added successfully")
375       
376       
377   
378    def _insertSpatioTemporalData(self):
379        '''
380        Create a record in the postgres DB based on the spatiotemporal data
381        specified in the PostgresRecord object
382       
383        Note updated to work with ISO metadata object where coords and times already defined as individual items.
384       
385        '''
386        logging.info("Adding spatiotemporal data to DB record")
387       
388        # Work out the relationship between the spatial and temporal data and handle appropriately
389       
390       
391        TimeRange = self._record.parseTemporalInfo(self._record.datasetTemporalData)
392        SpatialCoords = self._record.parseSpatialInfo(self._record.datasetSpatialData)
393       
394               
395        if (len(TimeRange) == 0) and (len(SpatialCoords) == 0):
396                logging.info("No spatiotemporal data found for record - skipping")
397                return
398       
399       
400        # if both arrays of data are the same length, assume they map together       
401        i = 0
402        if len(TimeRange) == len(SpatialCoords):
403            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
404            for times in TimeRange:
405                self._insertSpatioTemporalRow(SpatialCoords[i], times)
406                i += 1
407
408        # otherwise, map everything to everything
409        else:
410            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
411            for times in TimeRange:
412                for coords in SpatialCoords:
413                   
414                    self._insertSpatioTemporalRow(coords, times)
415                   
416        logging.info("Spatiotemporal data added to DB record")
417       
418       
419   
420    def _insertOriginalRecord(self):
421        '''
422        Insert the original metadata doc into the postgres DB
423        '''
424        logging.info("Inserting new original document in Postgres DB")
425       
426       
427        ''' ndg3 style command
428        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
429            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
430            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
431            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
432            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
433                   
434        '''
435       
436        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
437            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
438            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
439            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
440            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + \
441            self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
442       
443       
444        #sort out any nulls..
445        sqlCmd = sqlCmd.replace("'NULL'","NULL")
446       
447        #sort out any Nones
448        sqlCmd = sqlCmd.replace("'None'","NULL")
449       
450        id = self.pgc.runSQLCommand(sqlCmd)
451        if len(id) == 0:
452            raise ValueError, 'No PK ID returned from INSERT to DB'
453       
454        self._record.db_id = id[0][0] 
455       
456        logging.info("Original document inserted in Postgres DB")
457           
458   
459    def deleteOriginalRecord(self):
460        '''
461        Delete the original metadata doc from the postgres DB
462        '''
463        logging.info("Deleting original document from Postgres DB")
464        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
465
466        self.pgc.runSQLCommand(sqlCmd)
467        logging.info("Original document deleted from Postgres DB")
468       
469   
470    def _updateOriginalRecord(self):
471        '''
472        Update the original doc in the postgres DB
473        '''
474        logging.info("Updating original document in Postgres DB")
475        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
476            self._record.shortFilename + "', '" + \
477            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
478            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
479            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
480            str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + \
481            "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + \
482            self._record.datasetEndNom + "');" 
483           
484       
485        #sort out any NULL values"
486        sqlCmd = sqlCmd.replace("'NULL'","NULL")
487       
488        #sort out any Nones
489        sqlCmd = sqlCmd.replace("'None'","NULL")       
490       
491        logging.info("Submitting SQL command")
492        #logging.info("SQl command:   " + sqlCmd)         
493       
494        self.pgc.runSQLCommand(sqlCmd)
495       
496        # increment scn - saves doing an additional db call
497        self._record.scn += 1
498       
499        logging.info("Original document updated in Postgres DB")
500   
501           
502    def _insertMetadataRecords(self):
503        '''
504        Insert the metadata docs into the postgres DB
505        '''
506        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
507       
508       
509        if self._record.db_id is None:
510            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
511            return
512       
513        for docType, doc in self._record.getAllDocs(self.discovery_dir):
514               
515            #if docType is original input xml then put that in rather than some lossy transformed guff.
516            if docType == self._record.docType:
517                doc = self._record.originalXMLdoc
518           
519                               
520            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
521                "original_document_id, transformed_format, " \
522                "transformed_document, create_date, scn) VALUES (" \
523                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
524                docType + "', '" + doc + "', current_timestamp, 1);"
525           
526           
527            self.pgc.runSQLCommand(sqlCmd)
528       
529        logging.info("Transformed records created in DB")
530   
531   
532    def _updateMetadataRecords(self):
533        '''
534        Update the metadata docs into the postgres DB
535        '''
536       
537        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
538        if self._record.db_id is None:
539            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
540            return
541       
542        for docType, doc in self._record.getAllDocs(self.discovery_dir):
543           
544            #if docType is original input xml then put that in rather than some lossy transformed guff.
545            if docType == self._record.docType:
546                doc = self._record.originalXMLdoc             
547           
548            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
549                "', update_date = current_timestamp WHERE original_document_id = " + \
550                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
551               
552           
553            self.pgc.runSQLCommand(sqlCmd)
554   
555        logging.info("Transformed records updated in DB")
556
557
558    def setRecord(self, record):
559        '''
560        Set the record to use with the DAL - to allow object re-use
561        @param record: PostgresRecord to use with the DAL
562        '''
563        self._record = record
564
Note: See TracBrowser for help on using the repository browser.