source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 5580

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@5580
Revision 5580, 15.9 KB checked in by sdonegan, 10 years ago (diff)
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging
7from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33
34    def getRecordID(self):
35        '''
36        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
37        returns '-1'
38        @return: id of record, if it exists, '-1' if it doesn't
39        '''
40        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
41        if self._record.db_id is not None and self._record.db_id > 0:
42            logging.info("Already looked up record - ID is " + str(self._record.db_id))
43            return self._record.db_id
44       
45        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
46            self._record.discovery_id + "';"
47        dbId = self.pgc.runSQLCommand(sql)
48        if dbId:
49            self._record.db_id = dbId[0][0]
50           
51    def getRecordID_using_OriginalDocumentFilename(self):
52        '''
53        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
54        returns '-1'
55        @return: id of record, if it exists, '-1' if it doesn't
56        '''
57        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
58       
59        '''if self._record.db_id is not None and self._record.db_id > 0:
60            logging.info("Already looked up record - ID is " + str(self._record.db_id))
61            return self._record.db_id'''
62       
63        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
64       
65        dbId = self.pgc.runSQLCommand(sql)
66       
67        if dbId:
68            self._record.db_id = dbId[0][0]
69           
70           
71    def getDiscoveryID_using_OriginalDocumentFilename(self):
72        '''
73        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
74        returns '-1'
75        @return: id of record, if it exists, '-1' if it doesn't
76        '''
77        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
78       
79        '''if self._record.db_id is not None and self._record.db_id > 0:
80            logging.info("Already looked up record - ID is " + str(self._record.db_id))
81            return self._record.db_id'''
82       
83        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
84       
85        dbId = self.pgc.runSQLCommand(sql)
86       
87        if dbId:
88            self._record.discovery_id = dbId[0][0]
89           
90       
91    def getTemporalDataId(self):
92       
93        '''
94        Looks up the temporal data id using the original document id
95        '''
96       
97        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
98       
99        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
100
101       
102       
103
104    def createOrUpdateRecord(self):
105        '''
106        Looks up a record in the DB; if it finds it, update it, otherwise create it
107        @return result: True if record created/updated, false if otherwise
108        '''
109        self.getRecordID()
110        returnCode=0 # 0 if failed, 1 if update, 2 if create
111       
112        if self._record.db_id:         
113            if self.updateRecord():
114                returnCode = 1
115        else:           
116            if self.createRecord():
117                returnCode = 2
118               
119        return returnCode
120           
121
122           
123    def createRecord(self):
124        '''
125        Create a record in the postgres DB based on the specified PostgresRecord object
126        @return result: True if record created, false if otherwise
127        '''
128        logging.info("Creating a new record in the DB for the metada document")
129        # firstly store the actual documents contained by the record - i.e. the original doc +
130        # the various transforms required
131        self._insertOriginalRecord()
132        self._insertMetadataRecords()
133       
134        # Now add the spatiotemporal data
135        self._insertSpatioTemporalData()
136        logging.info("New record created")
137        return True
138           
139
140    def updateRecord(self):
141        '''
142        Update a record in the postgres DB based on the specified PostgresRecord object
143        @return result: True if record updated, false if otherwise
144        '''
145        logging.info("Record already existing in DB - performing updates")
146        result = False
147        # firstly, check the document is actually new - i.e. not just a repeat of the same data
148        if self._checkIsUpdatedRecord():
149
150            # firstly, update the original record
151            self._updateOriginalRecord()
152       
153            # now update the actual documents contained by the record - i.e. the original doc +
154            # the various transforms required
155            self._updateMetadataRecords()
156
157            # If doing an update of an existing record, clear out existing spatiotemporal
158            # data, rather than updating it, to keep things simple
159            logging.info("Clearing out existing data for record - to allow clean update")
160            self._deleteSpatioTemporalData()
161            self._insertSpatioTemporalData()
162            result = True
163
164        logging.info("Finish processing document...")
165        return result
166       
167       
168    def _checkIsUpdatedRecord(self):
169        '''
170        Looks up an existing record and checks it is not identical to the new record; if it is
171        incremement the harvest_count value and don't process again
172        @return: True if doc contains changes, False otherwise
173        '''
174        logging.info("Checking the updated document actually contains changes")
175
176        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
177            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
178        results = self.pgc.runSQLCommand(sql)
179
180        # NB, if the document is not identical, the sql command will not find anything
181        if not results:
182            logging.info("Ingested document is different to that in the current DB")
183            # get the current SCN
184            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
185                    str(self._record.db_id) + ";"
186            results = self.pgc.runSQLCommand(sql)
187            self._record.scn = results[0][0]
188            return True
189           
190        count = results[0][0]
191
192        # get current system change number
193        scn = results[0][1]
194        self._record.scn = scn
195        logging.info("Ingested document is identical to document currently in DB - " + \
196                     "incrementing harvest_count")
197        count += 1
198        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
199            " WHERE original_document_id = " + str(self._record.db_id)
200        self.pgc.runSQLCommand(sql)
201        return False
202
203
204    def _deleteSpatioTemporalData(self):
205        '''
206        Delete all existing spatiotemporal data for the current record
207        - NB, the delete is set to cascade from the linking table so only need to
208        delete entries from there
209        '''
210        logging.info("Deleting existing spatiotemporal data for record")
211        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
212            str(self._record.db_id) + ";"     
213
214        self.pgc.runSQLCommand(sqlCmd)
215        logging.info("Spatiotemporal data deleted successfully")
216
217
218    def _insertSpatioTemporalRow(self, coords, timeRange):
219        '''
220        Create a single row record in the postgres DB based on the
221        input data
222        @param coords: a Coords object representing NSWE coords
223        @param timeRange: a TimeRange object representing a start/end date
224        '''
225        logging.info("Adding spatiotemporal row to DB")
226       
227        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
228            str(coords.north) + "', '" + str(coords.south) + "', '" + str(coords.west) + "', '" + \
229            str(coords.east) + "', '" + timeRange.start + "', '" + timeRange.end + "');"
230           
231        # fix any null strings
232        sqlCmd = sqlCmd.replace("'null'", "null")
233
234        self.pgc.runSQLCommand(sqlCmd)
235        logging.info("Spatiotemporal row added successfully")
236       
237   
238    def _insertSpatioTemporalData(self):
239        '''
240        Create a record in the postgres DB based on the spatiotemporal data
241        specified in the PostgresRecord object
242        '''
243        logging.info("Adding spatiotemporal data to DB record")
244       
245        # Work out the relationship between the spatial and temporal data and handle appropriately
246       
247        # error here! SJD - added a try/except to set timedata to null for instances where no temporal data in xml
248        # - should be caught elsewhere, but easiest fudge is to put it in here. 23/09/08 SJD.
249        try:
250            timeData = self._record.getTemporalData() 
251        except:
252           
253            timeData = [ TimeRange('null', 'null') ]
254           
255        spatialData = self._record.getSpatialData()
256       
257        # check if we have any spatiotemporal data to add; escape if not
258        if not timeData and not spatialData:
259            logging.info("No spatiotemporal data found for record - skipping")
260            return
261       
262        # setup dummy, null data if required
263        if not timeData:
264            timeData = [ TimeRange('null', 'null') ]
265       
266        if not spatialData:
267            spatialData = [ Coords('null', 'null', 'null', 'null') ]
268       
269        # if both arrays of data are the same length, assume they map together
270        i = 0
271        if len(timeData) == len(spatialData):
272            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
273            for timeRange in timeData:
274                self._insertSpatioTemporalRow(spatialData[i], timeRange)
275                i += 1
276
277        # otherwise, map everything to everything
278        else:
279            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
280            for timeRange in timeData:
281                for coords in spatialData:
282                    self._insertSpatioTemporalRow(coords, timeRange)
283        logging.info("Spatiotemporal data added to DB record")
284           
285   
286    def _insertOriginalRecord(self):
287        '''
288        Insert the original metadata doc into the postgres DB
289        '''
290        logging.info("Inserting new original document in Postgres DB")
291        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
292            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
293            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
294            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
295            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
296           
297       
298        sqlCmd = sqlCmd.replace("'NULL'","NULL")
299
300        id = self.pgc.runSQLCommand(sqlCmd)
301        if len(id) == 0:
302            raise ValueError, 'No PK ID returned from INSERT to DB'
303       
304        self._record.db_id = id[0][0] 
305        logging.info("Original document inserted in Postgres DB")
306           
307   
308    def deleteOriginalRecord(self):
309        '''
310        Delete the original metadata doc from the postgres DB
311        '''
312        logging.info("Deleting original document from Postgres DB")
313        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
314
315        self.pgc.runSQLCommand(sqlCmd)
316        logging.info("Original document deleted from Postgres DB")
317       
318   
319    def _updateOriginalRecord(self):
320        '''
321        Update the original doc in the postgres DB
322        '''
323        logging.info("Updating original document in Postgres DB")
324        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
325            self._record.shortFilename + "', '" + \
326            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
327            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
328            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');" 
329           
330           
331           
332        #sort out any NULL values"
333        sqlCmd = sqlCmd.replace("'NULL'","NULL")
334       
335        logging.info("SQl command:   " + sqlCmd)         
336       
337        self.pgc.runSQLCommand(sqlCmd)
338       
339        # increment scn - saves doing an additional db call
340        self._record.scn += 1
341       
342        logging.info("Original document updated in Postgres DB")
343   
344           
345    def _insertMetadataRecords(self):
346        '''
347        Insert the metadata docs into the postgres DB
348        '''
349        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
350        if self._record.db_id is None:
351            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
352            return
353       
354        for docType, doc in self._record.getAllDocs():
355            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
356                "original_document_id, transformed_format, " \
357                "transformed_document, create_date, scn) VALUES (" \
358                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
359                docType + "', '" + doc + "', current_timestamp, 1);"
360           
361            self.pgc.runSQLCommand(sqlCmd)
362       
363        logging.info("Transformed records created in DB")
364   
365   
366    def _updateMetadataRecords(self):
367        '''
368        Update the metadata docs into the postgres DB
369        '''
370        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
371        if self._record.db_id is None:
372            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
373            return
374       
375        for docType, doc in self._record.getAllDocs():
376            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
377                "', update_date = current_timestamp WHERE original_document_id = " + \
378                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
379
380            self.pgc.runSQLCommand(sqlCmd)
381   
382        logging.info("Transformed records updated in DB")
383
384
385    def setRecord(self, record):
386        '''
387        Set the record to use with the DAL - to allow object re-use
388        @param record: PostgresRecord to use with the DAL
389        '''
390        self._record = record
391
Note: See TracBrowser for help on using the repository browser.