source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 3972

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@3972
Revision 3972, 12.8 KB checked in by cbyrom, 13 years ago (diff)

Use the short filename in the postgres DB for storing the original
document filename.
Add fix to allow proper handling of scope fields as a ts_vector.
Add TODO comments to highlight areas of concern + update docs.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging
7import db_funcs
8from SpatioTemporalData import *
9
10class PostgresDAO:
11   
12    def __init__(self, record, connection=None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        '''
17        if record == "":
18            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
19        else:
20            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
21
22        # setup a connection to the db - if none specified
23        if connection is None:
24            connection = db_funcs.db_connect()
25        self._connection = connection
26        self._record = record
27       
28
29    def getRecordID(self):
30        '''
31        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
32        returns '-1'
33        @return: id of record, if it exists, '-1' if it doesn't
34        '''
35        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
36        if self._record.db_id is not None and self._record.db_id > 0:
37            logging.info("Already looked up record - ID is " + str(self._record.db_id))
38            return self._record.db_id
39       
40        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
41            self._record.discovery_id + "';"
42        dbId = db_funcs.runSQLCommand(self._connection, sql)
43        if dbId:
44            self._record.db_id = dbId[0][0]
45       
46
47    def createOrUpdateRecord(self):
48        '''
49        Looks up a record in the DB; if it finds it, update it, otherwise create it
50        @return result: True if record created/updated, false if otherwise
51        '''
52        self.getRecordID()
53        if self._record.db_id:
54            return self.updateRecord()
55        else:
56            return self.createRecord()
57           
58
59           
60    def createRecord(self):
61        '''
62        Create a record in the postgres DB based on the specified PostgresRecord object
63        @return result: True if record created, false if otherwise
64        '''
65        logging.info("Creating a new record in the DB for the metada document")
66        # firstly store the actual documents contained by the record - i.e. the original doc +
67        # the various transforms required
68        self._insertOriginalRecord()
69        self._insertMetadataRecords()
70       
71        # Now add the spatiotemporal data
72        self._insertSpatioTemporalData()
73        logging.info("New record created")
74        return True
75           
76
77    def updateRecord(self):
78        '''
79        Update a record in the postgres DB based on the specified PostgresRecord object
80        @return result: True if record updated, false if otherwise
81        '''
82        logging.info("Record already existing in DB - performing updates")
83        result = False
84        # firstly, check the document is actually new - i.e. not just a repeat of the same data
85        if self._checkIsUpdatedRecord():
86
87            # firstly, update the original record
88            self._updateOriginalRecord()
89       
90            # now update the actual documents contained by the record - i.e. the original doc +
91            # the various transforms required
92            self._updateMetadataRecords()
93
94            # If doing an update of an existing record, clear out existing spatiotemporal
95            # data, rather than updating it, to keep things simple
96            logging.info("Clearing out existing data for record - to allow clean update")
97            self._deleteSpatioTemporalData()
98            self._insertSpatioTemporalData()
99            result = True
100
101        logging.info("Finish processing document...")
102        return result
103       
104       
105    def _checkIsUpdatedRecord(self):
106        '''
107        Looks up an existing record and checks it is not identical to the new record; if it is
108        incremement the harvest_count value and don't process again
109        @return: True if doc contains changes, False otherwise
110        '''
111        logging.info("Checking the updated document actually contains changes")
112
113        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
114            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
115        results = db_funcs.runSQLCommand(self._connection, sql)
116
117        # NB, if the document is not identical, the sql command will not find anything
118        if not results:
119            logging.info("Ingested document is different to that in the current DB")
120            # get the current SCN
121            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
122                    str(self._record.db_id) + ";"
123            results = db_funcs.runSQLCommand(self._connection, sql)
124            self._record.scn = results[0][0]
125            return True
126           
127        count = results[0][0]
128
129        # get current system change number
130        scn = results[0][1]
131        self._record.scn = scn
132        logging.info("Ingested document is identical to document currently in DB - " + \
133                     "incrementing harvest_count")
134        count += 1
135        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
136            " WHERE original_document_id = " + str(self._record.db_id)
137        db_funcs.runSQLCommand(self._connection, sql)
138        return False
139
140
141    def _deleteSpatioTemporalData(self):
142        '''
143        Delete all existing spatiotemporal data for the current record
144        - NB, the delete is set to cascade from the linking table so only need to
145        delete entries from there
146        '''
147        logging.info("Deleting existing spatiotemporal data for record")
148        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
149            str(self._record.db_id) + ";"     
150
151        db_funcs.runSQLCommand(self._connection, sqlCmd)
152        logging.info("Spatiotemporal data deleted successfully")
153
154
155    def _insertSpatioTemporalRow(self, coords, timeRange):
156        '''
157        Create a single row record in the postgres DB based on the
158        input data
159        @param coords: a Coords object representing NSWE coords
160        @param timeRange: a TimeRange object representing a start/end date
161        '''
162        logging.info("Adding spatiotemporal row to DB")
163        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
164            str(coords.north) + "', '" + str(coords.south) + "', '" + str(coords.west) + "', '" + \
165            str(coords.east) + "', "
166       
167        # cope with null dates appropriately
168        if timeRange.start == "null":
169            sqlCmd += timeRange.start + ", "
170        else:
171            sqlCmd += "'" + timeRange.start + "', "
172           
173        if timeRange.end == "null":
174            sqlCmd += timeRange.end
175        else:
176            sqlCmd += "'" + timeRange.end + "'"
177       
178        sqlCmd += ");"     
179
180        db_funcs.runSQLCommand(self._connection, sqlCmd)
181        logging.info("Spatiotemporal row added successfully")
182       
183   
184    def _insertSpatioTemporalData(self):
185        '''
186        Create a record in the postgres DB based on the spatiotemporal data
187        specified in the PostgresRecord object
188        '''
189        logging.info("Adding spatiotemporal data to DB record")
190       
191        # Work out the relationship between the spatial and temporal data and handle appropriately
192        timeData = self._record.getTemporalData()
193        spatialData = self._record.getSpatialData()
194       
195        # check if we have any spatiotemporal data to add; escape if not
196        if not timeData and not spatialData:
197            logging.info("No spatiotemporal data found for record - skipping")
198            return
199       
200        # setup dummy, null data if required
201        if not timeData:
202            timeData = [ TimeRange('null', 'null') ]
203       
204        if not spatialData:
205            spatialData = [ Coords('null', 'null', 'null', 'null') ]
206       
207        # if both arrays of data are the same length, assume they map together
208        i = 0
209        if len(timeData) == len(spatialData):
210            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
211            for timeRange in timeData:
212                self._insertSpatioTemporalRow(spatialData[i], timeRange)
213                i += 1
214
215        # otherwise, map everything to everything
216        else:
217            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
218            for timeRange in timeData:
219                for coords in spatialData:
220                    self._insertSpatioTemporalRow(coords, timeRange)
221        logging.info("Spatiotemporal data added to DB record")
222           
223   
224    def _insertOriginalRecord(self):
225        '''
226        Insert the original metadata doc into the postgres DB
227        '''
228        logging.info("Inserting new original document in Postgres DB")
229        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
230            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
231            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
232            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "');" 
233
234        id = db_funcs.runSQLCommand(self._connection, sqlCmd)
235        if len(id) == 0:
236            raise ValueError, 'No PK ID returned from INSERT to DB'
237       
238        self._record.db_id = id[0][0] 
239        logging.info("Original document inserted in Postgres DB")
240           
241   
242    def deleteOriginalRecord(self):
243        '''
244        Delete the original metadata doc from the postgres DB
245        '''
246        logging.info("Deleting original document from Postgres DB")
247        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
248
249        db_funcs.runSQLCommand(self._connection, sqlCmd)
250        logging.info("Original document deleted from Postgres DB")
251       
252   
253    def _updateOriginalRecord(self):
254        '''
255        Update the original doc in the postgres DB
256        '''
257        logging.info("Updating original document in Postgres DB")
258        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
259            self._record.shortFilename + "', '" + \
260            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
261            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
262            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + "');"
263        db_funcs.runSQLCommand(self._connection, sqlCmd)
264       
265        # increment scn - saves doing an additional db call
266        self._record.scn += 1
267        logging.info("Original document updated in Postgres DB")
268   
269           
270    def _insertMetadataRecords(self):
271        '''
272        Insert the metadata docs into the postgres DB
273        '''
274        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
275        if self._record.db_id is None:
276            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
277            return
278       
279        for docType, doc in self._record.getAllDocs():
280            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
281                "original_document_id, transformed_format, " \
282                "transformed_document, create_date, scn) VALUES (" \
283                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
284                docType + "', '" + doc + "', current_timestamp, 1);"
285           
286            db_funcs.runSQLCommand(self._connection, sqlCmd)
287       
288        logging.info("Transformed records created in DB")
289   
290   
291    def _updateMetadataRecords(self):
292        '''
293        Update the metadata docs into the postgres DB
294        '''
295        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
296        if self._record.db_id is None:
297            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
298            return
299       
300        for docType, doc in self._record.getAllDocs():
301            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
302                "', update_date = current_timestamp WHERE original_document_id = " + \
303                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
304
305            db_funcs.runSQLCommand(self._connection, sqlCmd)
306   
307        logging.info("Transformed records updated in DB")
308
309
310    def setRecord(self, record):
311        '''
312        Set the record to use with the DAL - to allow object re-use
313        @param record: PostgresRecord to use with the DAL
314        '''
315        self._record = record
316
Note: See TracBrowser for help on using the repository browser.