source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 6368

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@6368
Revision 6368, 16.1 KB checked in by sdonegan, 11 years ago (diff)

..and synching this as well

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging
7from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, isoDataModel, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32        import pdb
33        pdb.set_trace()
34       
35        self.isoDataModel = isoDataModel
36       
37
38    def getRecordID(self):
39        '''
40        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
41        returns '-1'
42        @return: id of record, if it exists, '-1' if it doesn't
43        '''
44        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
45        if self._record.db_id is not None and self._record.db_id > 0:
46            logging.info("Already looked up record - ID is " + str(self._record.db_id))
47            return self._record.db_id
48       
49        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
50            self._record.discovery_id + "';"
51        dbId = self.pgc.runSQLCommand(sql)
52        if dbId:
53            self._record.db_id = dbId[0][0]
54           
55    def getRecordID_using_OriginalDocumentFilename(self):
56        '''
57        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
58        returns '-1'
59        @return: id of record, if it exists, '-1' if it doesn't
60        '''
61        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
62       
63        '''if self._record.db_id is not None and self._record.db_id > 0:
64            logging.info("Already looked up record - ID is " + str(self._record.db_id))
65            return self._record.db_id'''
66       
67        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
68       
69        dbId = self.pgc.runSQLCommand(sql)
70       
71        if dbId:
72            self._record.db_id = dbId[0][0]
73           
74           
75    def getDiscoveryID_using_OriginalDocumentFilename(self):
76        '''
77        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
78        returns '-1'
79        @return: id of record, if it exists, '-1' if it doesn't
80        '''
81        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
82       
83        '''if self._record.db_id is not None and self._record.db_id > 0:
84            logging.info("Already looked up record - ID is " + str(self._record.db_id))
85            return self._record.db_id'''
86       
87        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
88       
89        dbId = self.pgc.runSQLCommand(sql)
90       
91        if dbId:
92            self._record.discovery_id = dbId[0][0]
93           
94       
95    def getTemporalDataId(self):
96       
97        '''
98        Looks up the temporal data id using the original document id
99        '''
100       
101        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
102       
103        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
104
105       
106       
107
108    def createOrUpdateRecord(self):
109        '''
110        Looks up a record in the DB; if it finds it, update it, otherwise create it
111        @return result: True if record created/updated, false if otherwise
112        '''
113        self.getRecordID()
114        returnCode=0 # 0 if failed, 1 if update, 2 if create
115       
116        if self._record.db_id:         
117            if self.updateRecord():
118                returnCode = 1
119        else:           
120            if self.createRecord():
121                returnCode = 2
122               
123        return returnCode
124           
125
126           
127    def createRecord(self):
128        '''
129        Create a record in the postgres DB based on the specified PostgresRecord object
130        @return result: True if record created, false if otherwise
131        '''
132        logging.info("Creating a new record in the DB for the metada document")
133        # firstly store the actual documents contained by the record - i.e. the original doc +
134        # the various transforms required
135        self._insertOriginalRecord()
136        self._insertMetadataRecords()
137       
138        # Now add the spatiotemporal data
139        self._insertSpatioTemporalData()
140        logging.info("New record created")
141        return True
142           
143
144    def updateRecord(self):
145        '''
146        Update a record in the postgres DB based on the specified PostgresRecord object
147        @return result: True if record updated, false if otherwise
148        '''
149        logging.info("Record already existing in DB - performing updates")
150        result = False
151        # firstly, check the document is actually new - i.e. not just a repeat of the same data
152        if self._checkIsUpdatedRecord():
153
154            # firstly, update the original record
155            self._updateOriginalRecord()
156       
157            # now update the actual documents contained by the record - i.e. the original doc +
158            # the various transforms required
159            self._updateMetadataRecords()
160
161            # If doing an update of an existing record, clear out existing spatiotemporal
162            # data, rather than updating it, to keep things simple
163            logging.info("Clearing out existing data for record - to allow clean update")
164            self._deleteSpatioTemporalData()
165            self._insertSpatioTemporalData()
166            result = True
167
168        logging.info("Finish processing document...")
169        return result
170       
171       
172    def _checkIsUpdatedRecord(self):
173        '''
174        Looks up an existing record and checks it is not identical to the new record; if it is
175        incremement the harvest_count value and don't process again
176        @return: True if doc contains changes, False otherwise
177        '''
178        logging.info("Checking the updated document actually contains changes")
179
180        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
181            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
182        results = self.pgc.runSQLCommand(sql)
183
184        # NB, if the document is not identical, the sql command will not find anything
185        if not results:
186            logging.info("Ingested document is different to that in the current DB")
187            # get the current SCN
188            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
189                    str(self._record.db_id) + ";"
190            results = self.pgc.runSQLCommand(sql)
191            self._record.scn = results[0][0]
192            return True
193           
194        count = results[0][0]
195
196        # get current system change number
197        scn = results[0][1]
198        self._record.scn = scn
199        logging.info("Ingested document is identical to document currently in DB - " + \
200                     "incrementing harvest_count")
201        count += 1
202        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
203            " WHERE original_document_id = " + str(self._record.db_id)
204        self.pgc.runSQLCommand(sql)
205        return False
206
207
208    def _deleteSpatioTemporalData(self):
209        '''
210        Delete all existing spatiotemporal data for the current record
211        - NB, the delete is set to cascade from the linking table so only need to
212        delete entries from there
213        '''
214        logging.info("Deleting existing spatiotemporal data for record")
215        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
216            str(self._record.db_id) + ";"     
217
218        self.pgc.runSQLCommand(sqlCmd)
219        logging.info("Spatiotemporal data deleted successfully")
220
221
222    def _insertSpatioTemporalRow(self, coords, timeRange):
223        '''
224        Create a single row record in the postgres DB based on the
225        input data
226        @param coords: a Coords object representing NSWE coords
227        @param timeRange: a TimeRange object representing a start/end date
228        '''
229        logging.info("Adding spatiotemporal row to DB")
230       
231        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
232            str(coords.north) + "', '" + str(coords.south) + "', '" + str(coords.west) + "', '" + \
233            str(coords.east) + "', '" + timeRange.start + "', '" + timeRange.end + "');"
234           
235        # fix any null strings
236        sqlCmd = sqlCmd.replace("'null'", "null")
237
238        self.pgc.runSQLCommand(sqlCmd)
239        logging.info("Spatiotemporal row added successfully")
240       
241   
242    def _insertSpatioTemporalData(self):
243        '''
244        Create a record in the postgres DB based on the spatiotemporal data
245        specified in the PostgresRecord object
246        '''
247        logging.info("Adding spatiotemporal data to DB record")
248       
249        # Work out the relationship between the spatial and temporal data and handle appropriately
250       
251        # error here! SJD - added a try/except to set timedata to null for instances where no temporal data in xml
252        # - should be caught elsewhere, but easiest fudge is to put it in here. 23/09/08 SJD.
253        try:
254            timeData = self._record.getTemporalData() 
255        except:
256           
257            timeData = [ TimeRange('null', 'null') ]
258           
259        spatialData = self._record.getSpatialData()
260       
261        # check if we have any spatiotemporal data to add; escape if not
262        if not timeData and not spatialData:
263            logging.info("No spatiotemporal data found for record - skipping")
264            return
265       
266        # setup dummy, null data if required
267        if not timeData:
268            timeData = [ TimeRange('null', 'null') ]
269       
270        if not spatialData:
271            spatialData = [ Coords('null', 'null', 'null', 'null') ]
272       
273        # if both arrays of data are the same length, assume they map together
274        i = 0
275        if len(timeData) == len(spatialData):
276            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
277            for timeRange in timeData:
278                self._insertSpatioTemporalRow(spatialData[i], timeRange)
279                i += 1
280
281        # otherwise, map everything to everything
282        else:
283            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
284            for timeRange in timeData:
285                for coords in spatialData:
286                    self._insertSpatioTemporalRow(coords, timeRange)
287        logging.info("Spatiotemporal data added to DB record")
288           
289   
290    def _insertOriginalRecord(self):
291        '''
292        Insert the original metadata doc into the postgres DB
293        '''
294        logging.info("Inserting new original document in Postgres DB")
295       
296        import pdb
297        pdb.set_trace()
298       
299        '''
300        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
301            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
302            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
303            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
304            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
305                   
306        '''
307        sqlCmd = sqlCmd.replace("'NULL'","NULL")
308
309        id = self.pgc.runSQLCommand(sqlCmd)
310        if len(id) == 0:
311            raise ValueError, 'No PK ID returned from INSERT to DB'
312       
313        self._record.db_id = id[0][0] 
314        logging.info("Original document inserted in Postgres DB")
315           
316   
317    def deleteOriginalRecord(self):
318        '''
319        Delete the original metadata doc from the postgres DB
320        '''
321        logging.info("Deleting original document from Postgres DB")
322        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
323
324        self.pgc.runSQLCommand(sqlCmd)
325        logging.info("Original document deleted from Postgres DB")
326       
327   
328    def _updateOriginalRecord(self):
329        '''
330        Update the original doc in the postgres DB
331        '''
332        logging.info("Updating original document in Postgres DB")
333        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
334            self._record.shortFilename + "', '" + \
335            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
336            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
337            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');" 
338           
339           
340           
341        #sort out any NULL values"
342        sqlCmd = sqlCmd.replace("'NULL'","NULL")
343       
344        logging.info("SQl command:   " + sqlCmd)         
345       
346        self.pgc.runSQLCommand(sqlCmd)
347       
348        # increment scn - saves doing an additional db call
349        self._record.scn += 1
350       
351        logging.info("Original document updated in Postgres DB")
352   
353           
354    def _insertMetadataRecords(self):
355        '''
356        Insert the metadata docs into the postgres DB
357        '''
358        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
359        if self._record.db_id is None:
360            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
361            return
362       
363        for docType, doc in self._record.getAllDocs():
364            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
365                "original_document_id, transformed_format, " \
366                "transformed_document, create_date, scn) VALUES (" \
367                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
368                docType + "', '" + doc + "', current_timestamp, 1);"
369           
370            self.pgc.runSQLCommand(sqlCmd)
371       
372        logging.info("Transformed records created in DB")
373   
374   
375    def _updateMetadataRecords(self):
376        '''
377        Update the metadata docs into the postgres DB
378        '''
379        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
380        if self._record.db_id is None:
381            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
382            return
383       
384        for docType, doc in self._record.getAllDocs():
385            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
386                "', update_date = current_timestamp WHERE original_document_id = " + \
387                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
388
389            self.pgc.runSQLCommand(sqlCmd)
390   
391        logging.info("Transformed records updated in DB")
392
393
394    def setRecord(self, record):
395        '''
396        Set the record to use with the DAL - to allow object re-use
397        @param record: PostgresRecord to use with the DAL
398        '''
399        self._record = record
400
Note: See TracBrowser for help on using the repository browser.