source: TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/PostgresDAO.py @ 5040

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/PostgresDAO.py@5040
Revision 5040, 16.3 KB checked in by sdonegan, 11 years ago (diff)

Debug new ingest classes - previous commit had problems with mdip records.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging
7import db_funcs
8from SpatioTemporalData import *
9
10class PostgresDAO:
11   
12    def __init__(self, record, connection=None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        '''
17        if record == "":
18            logging.error("Invalid call to PostgresDAO!")
19            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
20        elif record == "deleting":
21            logging.info("Record object not supplied as DELETING records FROM database..")
22        else:
23            logging.info("Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if connection is None:
27            connection = db_funcs.db_connect()
28        self._connection = connection
29        self._record = record
30       
31
32    def getRecordID(self):
33        '''
34        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
35        returns '-1'
36        @return: id of record, if it exists, '-1' if it doesn't
37        '''
38        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
39        if self._record.db_id is not None and self._record.db_id > 0:
40            logging.info("Already looked up record - ID is " + str(self._record.db_id))
41            return self._record.db_id
42       
43        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
44            self._record.discovery_id + "';"
45        dbId = db_funcs.runSQLCommand(self._connection, sql)
46        if dbId:
47            self._record.db_id = dbId[0][0]
48           
49
50    def getRecordID_using_OriginalDocumentFilename(self):
51        '''
52        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
53        returns '-1'
54        @return: id of record, if it exists, '-1' if it doesn't
55        '''
56        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
57       
58        '''if self._record.db_id is not None and self._record.db_id > 0:
59            logging.info("Already looked up record - ID is " + str(self._record.db_id))
60            return self._record.db_id'''
61       
62        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
63       
64        dbId = db_funcs.runSQLCommand(self._connection, sql)
65       
66        if dbId:
67            self._record.db_id = dbId[0][0]
68           
69           
70    def getDiscoveryID_using_OriginalDocumentFilename(self):
71        '''
72        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
73        returns '-1'
74        @return: id of record, if it exists, '-1' if it doesn't
75        '''
76        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
77       
78        '''if self._record.db_id is not None and self._record.db_id > 0:
79            logging.info("Already looked up record - ID is " + str(self._record.db_id))
80            return self._record.db_id'''
81       
82        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
83       
84        dbId = db_funcs.runSQLCommand(self._connection, sql)
85       
86        if dbId:
87            self._record.discovery_id = dbId[0][0]
88           
89       
90    def getTemporalDataId(self):
91       
92        '''
93        Looks up the temporal data id using the original document id
94        '''
95       
96        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
97       
98        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
99
100    def createOrUpdateRecord(self):
101        '''
102        Looks up a record in the DB; if it finds it, update it, otherwise create it
103        @return result: True if record created/updated, false if otherwise
104        '''
105        self.getRecordID()
106        if self._record.db_id:
107            return self.updateRecord()
108        else:
109            return self.createRecord()
110           
111
112           
113    def createRecord(self):
114        '''
115        Create a record in the postgres DB based on the specified PostgresRecord object
116        @return result: True if record created, false if otherwise
117        '''
118        logging.info("Creating a new record in the DB for the metada document")
119        # firstly store the actual documents contained by the record - i.e. the original doc +
120        # the various transforms required
121        self._insertOriginalRecord()
122        self._insertMetadataRecords()
123       
124        # Now add the spatiotemporal data
125        self._insertSpatioTemporalData()
126        logging.info("New record created")
127        return True
128           
129
130    def updateRecord(self):
131        '''
132        Update a record in the postgres DB based on the specified PostgresRecord object
133        @return result: True if record updated, false if otherwise
134        '''
135        logging.info("Record already existing in DB - performing updates")
136        result = False
137        # firstly, check the document is actually new - i.e. not just a repeat of the same data
138        if self._checkIsUpdatedRecord():
139
140            # firstly, update the original record
141            self._updateOriginalRecord()
142       
143            # now update the actual documents contained by the record - i.e. the original doc +
144            # the various transforms required
145            self._updateMetadataRecords()
146
147            # If doing an update of an existing record, clear out existing spatiotemporal
148            # data, rather than updating it, to keep things simple
149            logging.info("Clearing out existing data for record - to allow clean update")
150            self._deleteSpatioTemporalData()
151            self._insertSpatioTemporalData()
152            result = True
153
154        logging.info("Finish processing document...")
155        return result
156       
157       
158    def _checkIsUpdatedRecord(self):
159        '''
160        Looks up an existing record and checks it is not identical to the new record; if it is
161        incremement the harvest_count value and don't process again
162        @return: True if doc contains changes, False otherwise
163        '''
164        logging.info("Checking the updated document actually contains changes")
165
166        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
167            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
168        results = db_funcs.runSQLCommand(self._connection, sql)
169
170        # NB, if the document is not identical, the sql command will not find anything
171        if not results:
172            logging.info("Ingested document is different to that in the current DB")
173            # get the current SCN
174            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
175                    str(self._record.db_id) + ";"
176            results = db_funcs.runSQLCommand(self._connection, sql)
177            self._record.scn = results[0][0]
178            return True
179           
180        count = results[0][0]
181
182        # get current system change number
183        scn = results[0][1]
184        self._record.scn = scn
185        logging.info("Ingested document is identical to document currently in DB - " + \
186                     "incrementing harvest_count")
187        count += 1
188        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
189            " WHERE original_document_id = " + str(self._record.db_id)
190        db_funcs.runSQLCommand(self._connection, sql)
191        return False
192
193
194    def _deleteSpatioTemporalData(self):
195        '''
196        Delete all existing spatiotemporal data for the current record
197        - NB, the delete is set to cascade from the linking table so only need to
198        delete entries from there
199        '''
200        logging.info("Deleting existing spatiotemporal data for record")
201        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
202            str(self._record.db_id) + ";"     
203
204        db_funcs.runSQLCommand(self._connection, sqlCmd)
205        logging.info("Spatiotemporal data deleted successfully")
206
207
208    def _insertSpatioTemporalRow(self, coords, timeRange):
209        '''
210        Create a single row record in the postgres DB based on the
211        input data
212        @param coords: a Coords object representing NSWE coords
213        @param timeRange: a TimeRange object representing a start/end date
214        '''
215        logging.info("Adding spatiotemporal row to DB")
216       
217       
218        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', " + \
219            str(coords.north) + ", " + str(coords.south) + ", " + str(coords.west) + ", " + \
220            str(coords.east) + ", "
221                   
222       
223        # cope with null dates appropriately
224        if timeRange.start == "null":
225            sqlCmd += timeRange.start + ", "
226        else:
227            sqlCmd += "'" + timeRange.start + "', "
228           
229        if timeRange.end == "null":
230            sqlCmd += timeRange.end
231        else:
232            sqlCmd += "'" + timeRange.end + "'"
233       
234        sqlCmd += ");" 
235       
236        db_funcs.runSQLCommand(self._connection, sqlCmd)
237        logging.info("Spatiotemporal row added successfully")
238       
239   
240    def _insertSpatioTemporalData(self):
241        '''
242        Create a record in the postgres DB based on the spatiotemporal data
243        specified in the PostgresRecord object
244        '''
245        logging.info("Adding spatiotemporal data to DB record")
246       
247        # Work out the relationship between the spatial and temporal data and handle appropriately
248       
249        # error here! SJD - added a try/except to set timedata to null for instances where no temporal data in xml
250        # - should be caught elsewhere, but easiest fudge is to put it in here. 23/09/08 SJD.
251        try:
252            timeData = self._record.getTemporalData() 
253        except:           
254            timeData = [ TimeRange('null', 'null') ]
255           
256        try:
257            spatialData = self._record.getSpatialData()
258        except:
259            print "ouch"
260               
261        # check if we have any spatiotemporal data to add; escape if not
262        if not timeData and not spatialData:           
263            logging.info("No spatiotemporal data found for record - skipping")
264            return
265       
266        # setup dummy, null data if required
267        if not timeData:
268            timeData = [ TimeRange('null', 'null') ]
269       
270        if not spatialData:
271            spatialData = [ Coords('null', 'null', 'null', 'null') ]
272           
273
274        # if both arrays of data are the same length, assume they map together
275        i = 0
276        if len(timeData) == len(spatialData):
277            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
278            for timeRange in timeData:
279                self._insertSpatioTemporalRow(spatialData[i], timeRange)
280                i += 1
281
282        # otherwise, map everything to everything
283        else:
284            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
285            for timeRange in timeData:
286                for coords in spatialData:
287                    self._insertSpatioTemporalRow(coords, timeRange)
288        logging.info("Spatiotemporal data added to DB record")
289       
290       
291   
292    def _insertOriginalRecord(self):
293        '''
294        Insert the original metadata doc into the postgres DB
295        '''
296       
297        logging.info("Inserting new original document in Postgres DB")
298        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
299            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
300            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
301            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
302            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "');" 
303
304        '''logging.info("SELECT create_document('" + self._record.shortFilename + "', '" + \
305            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
306            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
307            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
308            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "');" )'''
309       
310        id = db_funcs.runSQLCommand(self._connection, sqlCmd)
311        if len(id) == 0:
312            raise ValueError, 'No PK ID returned from INSERT to DB'
313       
314        self._record.db_id = id[0][0] 
315        logging.info("Original document inserted in Postgres DB")
316           
317   
318    def deleteOriginalRecord(self):
319        '''
320        Delete the original metadata doc from the postgres DB
321        '''
322        logging.info("Deleting original document from Postgres DB")
323       
324        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
325
326        db_funcs.runSQLCommand(self._connection, sqlCmd)
327        logging.info("Original document deleted from Postgres DB")
328       
329   
330    def _updateOriginalRecord(self):
331        '''
332        Update the original doc in the postgres DB
333        '''
334        logging.info("Updating original document in Postgres DB")
335        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
336            self._record.shortFilename + "', '" + \
337            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
338            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
339            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + \
340            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit +"');"
341           
342        db_funcs.runSQLCommand(self._connection, sqlCmd)
343       
344        # increment scn - saves doing an additional db call
345        self._record.scn += 1
346        logging.info("Original document updated in Postgres DB")
347   
348           
349    def _insertMetadataRecords(self):
350        '''
351        Insert the metadata docs into the postgres DB
352        '''
353        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
354        if self._record.db_id is None:
355            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
356            return
357       
358        for docType, doc in self._record.getAllDocs():
359            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
360                "original_document_id, transformed_format, " \
361                "transformed_document, create_date, scn) VALUES (" \
362                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
363                docType + "', '" + doc + "', current_timestamp, 1);"
364           
365            db_funcs.runSQLCommand(self._connection, sqlCmd)
366       
367        logging.info("Transformed records created in DB")
368   
369   
370    def _updateMetadataRecords(self):
371        '''
372        Update the metadata docs into the postgres DB
373        '''
374        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
375        if self._record.db_id is None:
376            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
377            return
378       
379        for docType, doc in self._record.getAllDocs():
380            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
381                "', update_date = current_timestamp WHERE original_document_id = " + \
382                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
383
384            db_funcs.runSQLCommand(self._connection, sqlCmd)
385   
386        logging.info("Transformed records updated in DB")
387
388
389    def setRecord(self, record):
390        '''
391        Set the record to use with the DAL - to allow object re-use
392        @param record: PostgresRecord to use with the DAL
393        '''
394        self._record = record
395
Note: See TracBrowser for help on using the repository browser.