source: TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/PostgresDAO.py @ 5272

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/PostgresDAO.py@5272
Revision 5272, 17.1 KB checked in by sdonegan, 11 years ago (diff)

Fixed bug so can successfully reingest changed files

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging,re
7import db_funcs
8from SpatioTemporalData import *
9
10class PostgresDAO:
11   
12    def __init__(self, record, connection=None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        '''
17        if record == "":
18            logging.error("Invalid call to PostgresDAO!")
19            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
20        elif record == "deleting":
21            logging.info("Record object not supplied as DELETING records FROM database..")
22        else:
23            logging.info("Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if connection is None:
27            connection = db_funcs.db_connect()
28        self._connection = connection
29        self._record = record
30       
31
32    def getRecordID(self):
33        '''
34        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
35        returns '-1'
36        @return: id of record, if it exists, '-1' if it doesn't
37        '''
38        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
39        if self._record.db_id is not None and self._record.db_id > 0:
40            logging.info("Already looked up record - ID is " + str(self._record.db_id))
41            return self._record.db_id
42       
43        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
44            self._record.discovery_id + "';"
45        dbId = db_funcs.runSQLCommand(self._connection, sql)
46        if dbId:
47            self._record.db_id = dbId[0][0]
48           
49
50    def getRecordID_using_OriginalDocumentFilename(self):
51        '''
52        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
53        returns '-1'
54        @return: id of record, if it exists, '-1' if it doesn't
55        '''
56        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
57       
58        '''if self._record.db_id is not None and self._record.db_id > 0:
59            logging.info("Already looked up record - ID is " + str(self._record.db_id))
60            return self._record.db_id'''
61       
62        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
63       
64        dbId = db_funcs.runSQLCommand(self._connection, sql)
65       
66        if dbId:
67            self._record.db_id = dbId[0][0]
68           
69           
70    def getDiscoveryID_using_OriginalDocumentFilename(self):
71        '''
72        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
73        returns '-1'
74        @return: id of record, if it exists, '-1' if it doesn't
75        '''
76        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
77       
78        '''if self._record.db_id is not None and self._record.db_id > 0:
79            logging.info("Already looked up record - ID is " + str(self._record.db_id))
80            return self._record.db_id'''
81       
82        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
83       
84        dbId = db_funcs.runSQLCommand(self._connection, sql)
85       
86        if dbId:
87            self._record.discovery_id = dbId[0][0]
88           
89       
90    def getTemporalDataId(self):
91       
92        '''
93        Looks up the temporal data id using the original document id
94        '''
95       
96        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
97       
98        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
99
100    def createOrUpdateRecord(self):
101        '''
102        Looks up a record in the DB; if it finds it, update it, otherwise create it
103        @return result: True if record created/updated, false if otherwise
104        '''
105        self.getRecordID()
106        if self._record.db_id:
107            return self.updateRecord()
108        else:
109            return self.createRecord()
110           
111
112           
113    def createRecord(self):
114        '''
115        Create a record in the postgres DB based on the specified PostgresRecord object
116        @return result: True if record created, false if otherwise
117        '''
118        logging.info("Creating a new record in the DB for the metada document")
119        # firstly store the actual documents contained by the record - i.e. the original doc +
120        # the various transforms required
121        self._insertOriginalRecord()
122        self._insertMetadataRecords()
123       
124        # Now add the spatiotemporal data
125        self._insertSpatioTemporalData()
126        logging.info("New record created")
127        return True
128           
129
130    def updateRecord(self):
131        '''
132        Update a record in the postgres DB based on the specified PostgresRecord object
133        @return result: True if record updated, false if otherwise
134        '''
135        logging.info("Record already existing in DB - performing updates")
136        result = False
137        # firstly, check the document is actually new - i.e. not just a repeat of the same data
138        if self._checkIsUpdatedRecord():
139
140            # firstly, update the original record
141            self._updateOriginalRecord()
142       
143            # now update the actual documents contained by the record - i.e. the original doc +
144            # the various transforms required
145            self._updateMetadataRecords()
146
147            # If doing an update of an existing record, clear out existing spatiotemporal
148            # data, rather than updating it, to keep things simple
149            logging.info("Clearing out existing data for record - to allow clean update")
150            self._deleteSpatioTemporalData()
151            self._insertSpatioTemporalData()
152            result = True
153
154        logging.info("Finish processing document...")
155        return result
156       
157       
158    def _checkIsUpdatedRecord(self):
159        '''
160        Looks up an existing record and checks it is not identical to the new record; if it is
161        incremement the harvest_count value and don't process again
162        @return: True if doc contains changes, False otherwise
163        '''
164        logging.info("Checking the updated document actually contains changes")
165
166        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
167            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
168        results = db_funcs.runSQLCommand(self._connection, sql)
169
170        # NB, if the document is not identical, the sql command will not find anything
171        if not results:
172            logging.info("Ingested document is different to that in the current DB")
173            # get the current SCN
174            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
175                    str(self._record.db_id) + ";"
176            results = db_funcs.runSQLCommand(self._connection, sql)
177            self._record.scn = results[0][0]
178            return True
179           
180        count = results[0][0]
181
182        # get current system change number
183        scn = results[0][1]
184        self._record.scn = scn
185        logging.info("Ingested document is identical to document currently in DB - " + \
186                     "incrementing harvest_count")
187        count += 1
188        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
189            " WHERE original_document_id = " + str(self._record.db_id)
190        db_funcs.runSQLCommand(self._connection, sql)
191        return False
192
193
194    def _deleteSpatioTemporalData(self):
195        '''
196        Delete all existing spatiotemporal data for the current record
197        - NB, the delete is set to cascade from the linking table so only need to
198        delete entries from there
199        '''
200        logging.info("Deleting existing spatiotemporal data for record")
201        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
202            str(self._record.db_id) + ";"     
203
204        db_funcs.runSQLCommand(self._connection, sqlCmd)
205        logging.info("Spatiotemporal data deleted successfully")
206
207
208    def _insertSpatioTemporalRow(self, coords, timeRange):
209        '''
210        Create a single row record in the postgres DB based on the
211        input data
212        @param coords: a Coords object representing NSWE coords
213        @param timeRange: a TimeRange object representing a start/end date
214        '''
215        logging.info("Adding spatiotemporal row to DB")
216       
217       
218        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', " + \
219            str(coords.north) + ", " + str(coords.south) + ", " + str(coords.west) + ", " + \
220            str(coords.east) + ", "
221                   
222       
223        # cope with null dates appropriately
224        if timeRange.start == "null":
225            sqlCmd += timeRange.start + ", "
226        else:
227            sqlCmd += "'" + timeRange.start + "', "
228           
229        if timeRange.end == "null":
230            sqlCmd += timeRange.end
231        else:
232            sqlCmd += "'" + timeRange.end + "'"
233       
234        sqlCmd += ");" 
235       
236        db_funcs.runSQLCommand(self._connection, sqlCmd)
237        logging.info("Spatiotemporal row added successfully")
238       
239   
240    def _insertSpatioTemporalData(self):
241        '''
242        Create a record in the postgres DB based on the spatiotemporal data
243        specified in the PostgresRecord object
244        '''
245        logging.info("Adding spatiotemporal data to DB record")
246       
247        # Work out the relationship between the spatial and temporal data and handle appropriately
248       
249        # error here! SJD - added a try/except to set timedata to null for instances where no temporal data in xml
250        # - should be caught elsewhere, but easiest fudge is to put it in here. 23/09/08 SJD.
251        try:
252            timeData = self._record.getTemporalData() 
253        except:           
254            timeData = [ TimeRange('null', 'null') ]
255           
256        try:
257            spatialData = self._record.getSpatialData()
258        except:
259            print "ouch"
260               
261        # check if we have any spatiotemporal data to add; escape if not
262        if not timeData and not spatialData:           
263            logging.info("No spatiotemporal data found for record - skipping")
264            return
265       
266        # setup dummy, null data if required
267        if not timeData:
268            timeData = [ TimeRange('null', 'null') ]
269       
270        if not spatialData:
271            spatialData = [ Coords('null', 'null', 'null', 'null') ]
272           
273
274        # if both arrays of data are the same length, assume they map together
275        i = 0
276        if len(timeData) == len(spatialData):
277            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
278            for timeRange in timeData:
279                self._insertSpatioTemporalRow(spatialData[i], timeRange)
280                i += 1
281
282        # otherwise, map everything to everything
283        else:
284            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
285            for timeRange in timeData:
286                for coords in spatialData:
287                    self._insertSpatioTemporalRow(coords, timeRange)
288        logging.info("Spatiotemporal data added to DB record")
289       
290       
291   
292    def _insertOriginalRecord(self):
293        '''
294        Insert the original metadata doc into the postgres DB
295        '''
296       
297        logging.info("Inserting new original document in Postgres DB")
298        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
299            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
300            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
301            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
302            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "');" 
303
304        #scan the sql string for any 'null' characters - if there are some, get rid of the quote marks.  Easier to do this now..       
305        sqlCmd = self.sqlCommandNullScan(sqlCmd)
306       
307        #logging.info("Update Command:  " + sqlCmd)
308       
309        id = db_funcs.runSQLCommand(self._connection, sqlCmd)
310        if len(id) == 0:
311            raise ValueError, 'No PK ID returned from INSERT to DB'
312       
313        self._record.db_id = id[0][0] 
314        logging.info("Original document inserted in Postgres DB")
315           
316   
317    def deleteOriginalRecord(self):
318        '''
319        Delete the original metadata doc from the postgres DB
320        '''
321        logging.info("Deleting original document from Postgres DB")
322       
323        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
324       
325        #scan the sql string for any 'null' characters - if there are some, get rid of the quote marks.  Easier to do this now..
326        sqlCmd = self.sqlCommandNullScan(sqlCmd)
327
328        db_funcs.runSQLCommand(self._connection, sqlCmd)
329        logging.info("Original document deleted from Postgres DB")
330       
331   
332    def _updateOriginalRecord(self):
333        '''
334        Update the original doc in the postgres DB
335        '''
336        logging.info("Updating original document in Postgres DB")
337        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
338            self._record.shortFilename + "', '" + \
339            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
340            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
341            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit +"');"
342           
343        #scan the sql string for any 'null' characters - if there are some, get rid of the quote marks.  Easier to do this now..
344        sqlCmd = self.sqlCommandNullScan(sqlCmd)
345       
346        db_funcs.runSQLCommand(self._connection, sqlCmd)
347       
348        # increment scn - saves doing an additional db call
349        self._record.scn += 1
350        logging.info("Original document updated in Postgres DB")
351   
352           
353    def _insertMetadataRecords(self):
354        '''
355        Insert the metadata docs into the postgres DB
356        '''
357        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
358        if self._record.db_id is None:
359            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
360            return
361       
362        for docType, doc in self._record.getAllDocs():
363            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
364                "original_document_id, transformed_format, " \
365                "transformed_document, create_date, scn) VALUES (" \
366                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
367                docType + "', '" + doc + "', current_timestamp, 1);"
368           
369            #scan the sql string for any 'null' characters - if there are some, get rid of the quote marks.  Easier to do this now..
370            sqlCmd = self.sqlCommandNullScan(sqlCmd)
371       
372           
373            db_funcs.runSQLCommand(self._connection, sqlCmd)
374       
375        logging.info("Transformed records created in DB")
376   
377   
378    def _updateMetadataRecords(self):
379        '''
380        Update the metadata docs into the postgres DB
381        '''
382        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
383        if self._record.db_id is None:
384            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
385            return
386       
387        for docType, doc in self._record.getAllDocs():
388            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
389                "', update_date = current_timestamp WHERE original_document_id = " + \
390                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
391
392            #scan the sql string for any 'null' characters - if there are some, get rid of the quote marks.  Easier to do this now..
393            sqlCmd = self.sqlCommandNullScan(sqlCmd)
394       
395            db_funcs.runSQLCommand(self._connection, sqlCmd)
396   
397        logging.info("Transformed records updated in DB")
398
399
400    def sqlCommandNullScan(self,sqlCmd):
401        '''
402        Scan completed sql command for any text 'null', then use re module to convert to null
403        '''       
404        return re.sub("'null'","null",sqlCmd)
405       
406   
407
408    def setRecord(self, record):
409        '''
410        Set the record to use with the DAL - to allow object re-use
411        @param record: PostgresRecord to use with the DAL
412        '''
413        self._record = record
414
Note: See TracBrowser for help on using the repository browser.