source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 4224

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@4224
Revision 4224, 13.2 KB checked in by sdonegan, 11 years ago (diff)

Added extra try-catch to deal with missing temporal coverage in original DIFs

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging
7import db_funcs
8from SpatioTemporalData import *
9
10class PostgresDAO:
11   
12    def __init__(self, record, connection=None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        '''
17        if record == "":
18            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
19        else:
20            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
21
22        # setup a connection to the db - if none specified
23        if connection is None:
24            connection = db_funcs.db_connect()
25        self._connection = connection
26        self._record = record
27       
28
29    def getRecordID(self):
30        '''
31        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
32        returns '-1'
33        @return: id of record, if it exists, '-1' if it doesn't
34        '''
35        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
36        if self._record.db_id is not None and self._record.db_id > 0:
37            logging.info("Already looked up record - ID is " + str(self._record.db_id))
38            return self._record.db_id
39       
40        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
41            self._record.discovery_id + "';"
42        dbId = db_funcs.runSQLCommand(self._connection, sql)
43        if dbId:
44            self._record.db_id = dbId[0][0]
45       
46
47    def createOrUpdateRecord(self):
48        '''
49        Looks up a record in the DB; if it finds it, update it, otherwise create it
50        @return result: True if record created/updated, false if otherwise
51        '''
52        self.getRecordID()
53        if self._record.db_id:
54            return self.updateRecord()
55        else:
56            return self.createRecord()
57           
58
59           
60    def createRecord(self):
61        '''
62        Create a record in the postgres DB based on the specified PostgresRecord object
63        @return result: True if record created, false if otherwise
64        '''
65        logging.info("Creating a new record in the DB for the metada document")
66        # firstly store the actual documents contained by the record - i.e. the original doc +
67        # the various transforms required
68        self._insertOriginalRecord()
69        self._insertMetadataRecords()
70       
71        # Now add the spatiotemporal data
72        self._insertSpatioTemporalData()
73        logging.info("New record created")
74        return True
75           
76
77    def updateRecord(self):
78        '''
79        Update a record in the postgres DB based on the specified PostgresRecord object
80        @return result: True if record updated, false if otherwise
81        '''
82        logging.info("Record already existing in DB - performing updates")
83        result = False
84        # firstly, check the document is actually new - i.e. not just a repeat of the same data
85        if self._checkIsUpdatedRecord():
86
87            # firstly, update the original record
88            self._updateOriginalRecord()
89       
90            # now update the actual documents contained by the record - i.e. the original doc +
91            # the various transforms required
92            self._updateMetadataRecords()
93
94            # If doing an update of an existing record, clear out existing spatiotemporal
95            # data, rather than updating it, to keep things simple
96            logging.info("Clearing out existing data for record - to allow clean update")
97            self._deleteSpatioTemporalData()
98            self._insertSpatioTemporalData()
99            result = True
100
101        logging.info("Finish processing document...")
102        return result
103       
104       
105    def _checkIsUpdatedRecord(self):
106        '''
107        Looks up an existing record and checks it is not identical to the new record; if it is
108        incremement the harvest_count value and don't process again
109        @return: True if doc contains changes, False otherwise
110        '''
111        logging.info("Checking the updated document actually contains changes")
112
113        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
114            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
115        results = db_funcs.runSQLCommand(self._connection, sql)
116
117        # NB, if the document is not identical, the sql command will not find anything
118        if not results:
119            logging.info("Ingested document is different to that in the current DB")
120            # get the current SCN
121            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
122                    str(self._record.db_id) + ";"
123            results = db_funcs.runSQLCommand(self._connection, sql)
124            self._record.scn = results[0][0]
125            return True
126           
127        count = results[0][0]
128
129        # get current system change number
130        scn = results[0][1]
131        self._record.scn = scn
132        logging.info("Ingested document is identical to document currently in DB - " + \
133                     "incrementing harvest_count")
134        count += 1
135        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
136            " WHERE original_document_id = " + str(self._record.db_id)
137        db_funcs.runSQLCommand(self._connection, sql)
138        return False
139
140
141    def _deleteSpatioTemporalData(self):
142        '''
143        Delete all existing spatiotemporal data for the current record
144        - NB, the delete is set to cascade from the linking table so only need to
145        delete entries from there
146        '''
147        logging.info("Deleting existing spatiotemporal data for record")
148        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
149            str(self._record.db_id) + ";"     
150
151        db_funcs.runSQLCommand(self._connection, sqlCmd)
152        logging.info("Spatiotemporal data deleted successfully")
153
154
155    def _insertSpatioTemporalRow(self, coords, timeRange):
156        '''
157        Create a single row record in the postgres DB based on the
158        input data
159        @param coords: a Coords object representing NSWE coords
160        @param timeRange: a TimeRange object representing a start/end date
161        '''
162        logging.info("Adding spatiotemporal row to DB")
163       
164        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
165            str(coords.north) + "', '" + str(coords.south) + "', '" + str(coords.west) + "', '" + \
166            str(coords.east) + "', "
167           
168           
169       
170        # cope with null dates appropriately
171        if timeRange.start == "null":
172            sqlCmd += timeRange.start + ", "
173        else:
174            sqlCmd += "'" + timeRange.start + "', "
175           
176        if timeRange.end == "null":
177            sqlCmd += timeRange.end
178        else:
179            sqlCmd += "'" + timeRange.end + "'"
180       
181        sqlCmd += ");"     
182
183        db_funcs.runSQLCommand(self._connection, sqlCmd)
184        logging.info("Spatiotemporal row added successfully")
185       
186   
187    def _insertSpatioTemporalData(self):
188        '''
189        Create a record in the postgres DB based on the spatiotemporal data
190        specified in the PostgresRecord object
191        '''
192        logging.info("Adding spatiotemporal data to DB record")
193       
194        # Work out the relationship between the spatial and temporal data and handle appropriately
195       
196        # error here! SJD - added a try/except to set timedata to null for instances where no temporal data in xml
197        # - should be caught elsewhere, but easiest fudge is to put it in here. 23/09/08 SJD.
198        try:
199            timeData = self._record.getTemporalData() 
200        except:
201           
202            timeData = [ TimeRange('null', 'null') ]
203           
204        spatialData = self._record.getSpatialData()
205       
206        # check if we have any spatiotemporal data to add; escape if not
207        if not timeData and not spatialData:
208            logging.info("No spatiotemporal data found for record - skipping")
209            return
210       
211        # setup dummy, null data if required
212        if not timeData:
213            timeData = [ TimeRange('null', 'null') ]
214       
215        if not spatialData:
216            spatialData = [ Coords('null', 'null', 'null', 'null') ]
217       
218        # if both arrays of data are the same length, assume they map together
219        i = 0
220        if len(timeData) == len(spatialData):
221            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
222            for timeRange in timeData:
223                self._insertSpatioTemporalRow(spatialData[i], timeRange)
224                i += 1
225
226        # otherwise, map everything to everything
227        else:
228            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
229            for timeRange in timeData:
230                for coords in spatialData:
231                    self._insertSpatioTemporalRow(coords, timeRange)
232        logging.info("Spatiotemporal data added to DB record")
233           
234   
235    def _insertOriginalRecord(self):
236        '''
237        Insert the original metadata doc into the postgres DB
238        '''
239        logging.info("Inserting new original document in Postgres DB")
240        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
241            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
242            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
243            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "');" 
244
245        id = db_funcs.runSQLCommand(self._connection, sqlCmd)
246        if len(id) == 0:
247            raise ValueError, 'No PK ID returned from INSERT to DB'
248       
249        self._record.db_id = id[0][0] 
250        logging.info("Original document inserted in Postgres DB")
251           
252   
253    def deleteOriginalRecord(self):
254        '''
255        Delete the original metadata doc from the postgres DB
256        '''
257        logging.info("Deleting original document from Postgres DB")
258        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
259
260        db_funcs.runSQLCommand(self._connection, sqlCmd)
261        logging.info("Original document deleted from Postgres DB")
262       
263   
264    def _updateOriginalRecord(self):
265        '''
266        Update the original doc in the postgres DB
267        '''
268        logging.info("Updating original document in Postgres DB")
269        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
270            self._record.shortFilename + "', '" + \
271            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
272            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
273            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + "');"
274        db_funcs.runSQLCommand(self._connection, sqlCmd)
275       
276        # increment scn - saves doing an additional db call
277        self._record.scn += 1
278        logging.info("Original document updated in Postgres DB")
279   
280           
281    def _insertMetadataRecords(self):
282        '''
283        Insert the metadata docs into the postgres DB
284        '''
285        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
286        if self._record.db_id is None:
287            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
288            return
289       
290        for docType, doc in self._record.getAllDocs():
291            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
292                "original_document_id, transformed_format, " \
293                "transformed_document, create_date, scn) VALUES (" \
294                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
295                docType + "', '" + doc + "', current_timestamp, 1);"
296           
297            db_funcs.runSQLCommand(self._connection, sqlCmd)
298       
299        logging.info("Transformed records created in DB")
300   
301   
302    def _updateMetadataRecords(self):
303        '''
304        Update the metadata docs into the postgres DB
305        '''
306        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
307        if self._record.db_id is None:
308            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
309            return
310       
311        for docType, doc in self._record.getAllDocs():
312            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
313                "', update_date = current_timestamp WHERE original_document_id = " + \
314                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
315
316            db_funcs.runSQLCommand(self._connection, sqlCmd)
317   
318        logging.info("Transformed records updated in DB")
319
320
321    def setRecord(self, record):
322        '''
323        Set the record to use with the DAL - to allow object re-use
324        @param record: PostgresRecord to use with the DAL
325        '''
326        self._record = record
327
Note: See TracBrowser for help on using the repository browser.