source: TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/PostgresDAO.py @ 5047

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/PostgresDAO.py@5047
Revision 5047, 17.1 KB checked in by sdonegan, 12 years ago (diff)

Handle a bug re. timestamps and ingest.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging,re
7import db_funcs
8from SpatioTemporalData import *
9
10class PostgresDAO:
11   
12    def __init__(self, record, connection=None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        '''
17        if record == "":
18            logging.error("Invalid call to PostgresDAO!")
19            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
20        elif record == "deleting":
21            logging.info("Record object not supplied as DELETING records FROM database..")
22        else:
23            logging.info("Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if connection is None:
27            connection = db_funcs.db_connect()
28        self._connection = connection
29        self._record = record
30       
31
32    def getRecordID(self):
33        '''
34        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
35        returns '-1'
36        @return: id of record, if it exists, '-1' if it doesn't
37        '''
38        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
39        if self._record.db_id is not None and self._record.db_id > 0:
40            logging.info("Already looked up record - ID is " + str(self._record.db_id))
41            return self._record.db_id
42       
43        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
44            self._record.discovery_id + "';"
45        dbId = db_funcs.runSQLCommand(self._connection, sql)
46        if dbId:
47            self._record.db_id = dbId[0][0]
48           
49
50    def getRecordID_using_OriginalDocumentFilename(self):
51        '''
52        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
53        returns '-1'
54        @return: id of record, if it exists, '-1' if it doesn't
55        '''
56        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
57       
58        '''if self._record.db_id is not None and self._record.db_id > 0:
59            logging.info("Already looked up record - ID is " + str(self._record.db_id))
60            return self._record.db_id'''
61       
62        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
63       
64        dbId = db_funcs.runSQLCommand(self._connection, sql)
65       
66        if dbId:
67            self._record.db_id = dbId[0][0]
68           
69           
70    def getDiscoveryID_using_OriginalDocumentFilename(self):
71        '''
72        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
73        returns '-1'
74        @return: id of record, if it exists, '-1' if it doesn't
75        '''
76        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
77       
78        '''if self._record.db_id is not None and self._record.db_id > 0:
79            logging.info("Already looked up record - ID is " + str(self._record.db_id))
80            return self._record.db_id'''
81       
82        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
83       
84        dbId = db_funcs.runSQLCommand(self._connection, sql)
85       
86        if dbId:
87            self._record.discovery_id = dbId[0][0]
88           
89       
90    def getTemporalDataId(self):
91       
92        '''
93        Looks up the temporal data id using the original document id
94        '''
95       
96        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
97       
98        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
99
100    def createOrUpdateRecord(self):
101        '''
102        Looks up a record in the DB; if it finds it, update it, otherwise create it
103        @return result: True if record created/updated, false if otherwise
104        '''
105        self.getRecordID()
106        if self._record.db_id:
107            return self.updateRecord()
108        else:
109            return self.createRecord()
110           
111
112           
113    def createRecord(self):
114        '''
115        Create a record in the postgres DB based on the specified PostgresRecord object
116        @return result: True if record created, false if otherwise
117        '''
118        logging.info("Creating a new record in the DB for the metada document")
119        # firstly store the actual documents contained by the record - i.e. the original doc +
120        # the various transforms required
121        self._insertOriginalRecord()
122        self._insertMetadataRecords()
123       
124        # Now add the spatiotemporal data
125        self._insertSpatioTemporalData()
126        logging.info("New record created")
127        return True
128           
129
130    def updateRecord(self):
131        '''
132        Update a record in the postgres DB based on the specified PostgresRecord object
133        @return result: True if record updated, false if otherwise
134        '''
135        logging.info("Record already existing in DB - performing updates")
136        result = False
137        # firstly, check the document is actually new - i.e. not just a repeat of the same data
138        if self._checkIsUpdatedRecord():
139
140            # firstly, update the original record
141            self._updateOriginalRecord()
142       
143            # now update the actual documents contained by the record - i.e. the original doc +
144            # the various transforms required
145            self._updateMetadataRecords()
146
147            # If doing an update of an existing record, clear out existing spatiotemporal
148            # data, rather than updating it, to keep things simple
149            logging.info("Clearing out existing data for record - to allow clean update")
150            self._deleteSpatioTemporalData()
151            self._insertSpatioTemporalData()
152            result = True
153
154        logging.info("Finish processing document...")
155        return result
156       
157       
158    def _checkIsUpdatedRecord(self):
159        '''
160        Looks up an existing record and checks it is not identical to the new record; if it is
161        incremement the harvest_count value and don't process again
162        @return: True if doc contains changes, False otherwise
163        '''
164        logging.info("Checking the updated document actually contains changes")
165
166        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
167            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
168        results = db_funcs.runSQLCommand(self._connection, sql)
169
170        # NB, if the document is not identical, the sql command will not find anything
171        if not results:
172            logging.info("Ingested document is different to that in the current DB")
173            # get the current SCN
174            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
175                    str(self._record.db_id) + ";"
176            results = db_funcs.runSQLCommand(self._connection, sql)
177            self._record.scn = results[0][0]
178            return True
179           
180        count = results[0][0]
181
182        # get current system change number
183        scn = results[0][1]
184        self._record.scn = scn
185        logging.info("Ingested document is identical to document currently in DB - " + \
186                     "incrementing harvest_count")
187        count += 1
188        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
189            " WHERE original_document_id = " + str(self._record.db_id)
190        db_funcs.runSQLCommand(self._connection, sql)
191        return False
192
193
194    def _deleteSpatioTemporalData(self):
195        '''
196        Delete all existing spatiotemporal data for the current record
197        - NB, the delete is set to cascade from the linking table so only need to
198        delete entries from there
199        '''
200        logging.info("Deleting existing spatiotemporal data for record")
201        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
202            str(self._record.db_id) + ";"     
203
204        db_funcs.runSQLCommand(self._connection, sqlCmd)
205        logging.info("Spatiotemporal data deleted successfully")
206
207
208    def _insertSpatioTemporalRow(self, coords, timeRange):
209        '''
210        Create a single row record in the postgres DB based on the
211        input data
212        @param coords: a Coords object representing NSWE coords
213        @param timeRange: a TimeRange object representing a start/end date
214        '''
215        logging.info("Adding spatiotemporal row to DB")
216       
217       
218        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', " + \
219            str(coords.north) + ", " + str(coords.south) + ", " + str(coords.west) + ", " + \
220            str(coords.east) + ", "
221                   
222       
223        # cope with null dates appropriately
224        if timeRange.start == "null":
225            sqlCmd += timeRange.start + ", "
226        else:
227            sqlCmd += "'" + timeRange.start + "', "
228           
229        if timeRange.end == "null":
230            sqlCmd += timeRange.end
231        else:
232            sqlCmd += "'" + timeRange.end + "'"
233       
234        sqlCmd += ");" 
235       
236        db_funcs.runSQLCommand(self._connection, sqlCmd)
237        logging.info("Spatiotemporal row added successfully")
238       
239   
240    def _insertSpatioTemporalData(self):
241        '''
242        Create a record in the postgres DB based on the spatiotemporal data
243        specified in the PostgresRecord object
244        '''
245        logging.info("Adding spatiotemporal data to DB record")
246       
247        # Work out the relationship between the spatial and temporal data and handle appropriately
248       
249        # error here! SJD - added a try/except to set timedata to null for instances where no temporal data in xml
250        # - should be caught elsewhere, but easiest fudge is to put it in here. 23/09/08 SJD.
251        try:
252            timeData = self._record.getTemporalData() 
253        except:           
254            timeData = [ TimeRange('null', 'null') ]
255           
256        try:
257            spatialData = self._record.getSpatialData()
258        except:
259            print "ouch"
260               
261        # check if we have any spatiotemporal data to add; escape if not
262        if not timeData and not spatialData:           
263            logging.info("No spatiotemporal data found for record - skipping")
264            return
265       
266        # setup dummy, null data if required
267        if not timeData:
268            timeData = [ TimeRange('null', 'null') ]
269       
270        if not spatialData:
271            spatialData = [ Coords('null', 'null', 'null', 'null') ]
272           
273
274        # if both arrays of data are the same length, assume they map together
275        i = 0
276        if len(timeData) == len(spatialData):
277            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
278            for timeRange in timeData:
279                self._insertSpatioTemporalRow(spatialData[i], timeRange)
280                i += 1
281
282        # otherwise, map everything to everything
283        else:
284            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
285            for timeRange in timeData:
286                for coords in spatialData:
287                    self._insertSpatioTemporalRow(coords, timeRange)
288        logging.info("Spatiotemporal data added to DB record")
289       
290       
291   
292    def _insertOriginalRecord(self):
293        '''
294        Insert the original metadata doc into the postgres DB
295        '''
296       
297        logging.info("Inserting new original document in Postgres DB")
298        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
299            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
300            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
301            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
302            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "');" 
303
304        #scan the sql string for any 'null' characters - if there are some, get rid of the quote marks.  Easier to do this now..       
305        sqlCmd = self.sqlCommandNullScan(sqlCmd)
306       
307        #logging.info("Update Command:  " + sqlCmd)
308       
309        id = db_funcs.runSQLCommand(self._connection, sqlCmd)
310        if len(id) == 0:
311            raise ValueError, 'No PK ID returned from INSERT to DB'
312       
313        self._record.db_id = id[0][0] 
314        logging.info("Original document inserted in Postgres DB")
315           
316   
317    def deleteOriginalRecord(self):
318        '''
319        Delete the original metadata doc from the postgres DB
320        '''
321        logging.info("Deleting original document from Postgres DB")
322       
323        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
324       
325        #scan the sql string for any 'null' characters - if there are some, get rid of the quote marks.  Easier to do this now..
326        sqlCmd = self.sqlCommandNullScan(sqlCmd)
327
328        db_funcs.runSQLCommand(self._connection, sqlCmd)
329        logging.info("Original document deleted from Postgres DB")
330       
331   
332    def _updateOriginalRecord(self):
333        '''
334        Update the original doc in the postgres DB
335        '''
336        logging.info("Updating original document in Postgres DB")
337        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
338            self._record.shortFilename + "', '" + \
339            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
340            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
341            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + \
342            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit +"');"
343           
344        #scan the sql string for any 'null' characters - if there are some, get rid of the quote marks.  Easier to do this now..
345        sqlCmd = self.sqlCommandNullScan(sqlCmd)
346       
347        db_funcs.runSQLCommand(self._connection, sqlCmd)
348       
349        # increment scn - saves doing an additional db call
350        self._record.scn += 1
351        logging.info("Original document updated in Postgres DB")
352   
353           
354    def _insertMetadataRecords(self):
355        '''
356        Insert the metadata docs into the postgres DB
357        '''
358        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
359        if self._record.db_id is None:
360            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
361            return
362       
363        for docType, doc in self._record.getAllDocs():
364            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
365                "original_document_id, transformed_format, " \
366                "transformed_document, create_date, scn) VALUES (" \
367                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
368                docType + "', '" + doc + "', current_timestamp, 1);"
369           
370            #scan the sql string for any 'null' characters - if there are some, get rid of the quote marks.  Easier to do this now..
371            sqlCmd = self.sqlCommandNullScan(sqlCmd)
372       
373           
374            db_funcs.runSQLCommand(self._connection, sqlCmd)
375       
376        logging.info("Transformed records created in DB")
377   
378   
379    def _updateMetadataRecords(self):
380        '''
381        Update the metadata docs into the postgres DB
382        '''
383        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
384        if self._record.db_id is None:
385            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
386            return
387       
388        for docType, doc in self._record.getAllDocs():
389            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
390                "', update_date = current_timestamp WHERE original_document_id = " + \
391                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
392
393            #scan the sql string for any 'null' characters - if there are some, get rid of the quote marks.  Easier to do this now..
394            sqlCmd = self.sqlCommandNullScan(sqlCmd)
395       
396            db_funcs.runSQLCommand(self._connection, sqlCmd)
397   
398        logging.info("Transformed records updated in DB")
399
400
401    def sqlCommandNullScan(self,sqlCmd):
402        '''
403        Scan completed sql command for any text 'null', then use re module to convert to null
404        '''       
405        return re.sub("'null'","null",sqlCmd)
406       
407   
408
409    def setRecord(self, record):
410        '''
411        Set the record to use with the DAL - to allow object re-use
412        @param record: PostgresRecord to use with the DAL
413        '''
414        self._record = record
415
Note: See TracBrowser for help on using the repository browser.