source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 4854

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@4854
Revision 4854, 13.0 KB checked in by cbyrom, 11 years ago (diff)

Add new ingest script - to allow ingest of DIF docs from eXist hosted
atom feed. NB, this required restructure of original OAI harvester
to allow re-use of shared code - by abstracting this out into new class,
absstractdocumentingester.

Add new documentation and tidy up codebase removing dependencies where possible to simplify things.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging
7from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33
34    def getRecordID(self):
35        '''
36        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
37        returns '-1'
38        @return: id of record, if it exists, '-1' if it doesn't
39        '''
40        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
41        if self._record.db_id is not None and self._record.db_id > 0:
42            logging.info("Already looked up record - ID is " + str(self._record.db_id))
43            return self._record.db_id
44       
45        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
46            self._record.discovery_id + "';"
47        dbId = self.pgc.runSQLCommand(sql)
48        if dbId:
49            self._record.db_id = dbId[0][0]
50       
51
52    def createOrUpdateRecord(self):
53        '''
54        Looks up a record in the DB; if it finds it, update it, otherwise create it
55        @return result: True if record created/updated, false if otherwise
56        '''
57        self.getRecordID()
58        if self._record.db_id:
59            return self.updateRecord()
60        else:
61            return self.createRecord()
62           
63
64           
65    def createRecord(self):
66        '''
67        Create a record in the postgres DB based on the specified PostgresRecord object
68        @return result: True if record created, false if otherwise
69        '''
70        logging.info("Creating a new record in the DB for the metada document")
71        # firstly store the actual documents contained by the record - i.e. the original doc +
72        # the various transforms required
73        self._insertOriginalRecord()
74        self._insertMetadataRecords()
75       
76        # Now add the spatiotemporal data
77        self._insertSpatioTemporalData()
78        logging.info("New record created")
79        return True
80           
81
82    def updateRecord(self):
83        '''
84        Update a record in the postgres DB based on the specified PostgresRecord object
85        @return result: True if record updated, false if otherwise
86        '''
87        logging.info("Record already existing in DB - performing updates")
88        result = False
89        # firstly, check the document is actually new - i.e. not just a repeat of the same data
90        if self._checkIsUpdatedRecord():
91
92            # firstly, update the original record
93            self._updateOriginalRecord()
94       
95            # now update the actual documents contained by the record - i.e. the original doc +
96            # the various transforms required
97            self._updateMetadataRecords()
98
99            # If doing an update of an existing record, clear out existing spatiotemporal
100            # data, rather than updating it, to keep things simple
101            logging.info("Clearing out existing data for record - to allow clean update")
102            self._deleteSpatioTemporalData()
103            self._insertSpatioTemporalData()
104            result = True
105
106        logging.info("Finish processing document...")
107        return result
108       
109       
110    def _checkIsUpdatedRecord(self):
111        '''
112        Looks up an existing record and checks it is not identical to the new record; if it is
113        incremement the harvest_count value and don't process again
114        @return: True if doc contains changes, False otherwise
115        '''
116        logging.info("Checking the updated document actually contains changes")
117
118        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
119            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
120        results = self.pgc.runSQLCommand(sql)
121
122        # NB, if the document is not identical, the sql command will not find anything
123        if not results:
124            logging.info("Ingested document is different to that in the current DB")
125            # get the current SCN
126            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
127                    str(self._record.db_id) + ";"
128            results = self.pgc.runSQLCommand(sql)
129            self._record.scn = results[0][0]
130            return True
131           
132        count = results[0][0]
133
134        # get current system change number
135        scn = results[0][1]
136        self._record.scn = scn
137        logging.info("Ingested document is identical to document currently in DB - " + \
138                     "incrementing harvest_count")
139        count += 1
140        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
141            " WHERE original_document_id = " + str(self._record.db_id)
142        self.pgc.runSQLCommand(sql)
143        return False
144
145
146    def _deleteSpatioTemporalData(self):
147        '''
148        Delete all existing spatiotemporal data for the current record
149        - NB, the delete is set to cascade from the linking table so only need to
150        delete entries from there
151        '''
152        logging.info("Deleting existing spatiotemporal data for record")
153        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
154            str(self._record.db_id) + ";"     
155
156        self.pgc.runSQLCommand(sqlCmd)
157        logging.info("Spatiotemporal data deleted successfully")
158
159
160    def _insertSpatioTemporalRow(self, coords, timeRange):
161        '''
162        Create a single row record in the postgres DB based on the
163        input data
164        @param coords: a Coords object representing NSWE coords
165        @param timeRange: a TimeRange object representing a start/end date
166        '''
167        logging.info("Adding spatiotemporal row to DB")
168       
169        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
170            str(coords.north) + "', '" + str(coords.south) + "', '" + str(coords.west) + "', '" + \
171            str(coords.east) + "', '" + timeRange.start + "', '" + timeRange.end + "');"
172           
173        # fix any null strings
174        sqlCmd = sqlCmd.replace("'null'", "null")
175
176        self.pgc.runSQLCommand(sqlCmd)
177        logging.info("Spatiotemporal row added successfully")
178       
179   
180    def _insertSpatioTemporalData(self):
181        '''
182        Create a record in the postgres DB based on the spatiotemporal data
183        specified in the PostgresRecord object
184        '''
185        logging.info("Adding spatiotemporal data to DB record")
186       
187        # Work out the relationship between the spatial and temporal data and handle appropriately
188       
189        # error here! SJD - added a try/except to set timedata to null for instances where no temporal data in xml
190        # - should be caught elsewhere, but easiest fudge is to put it in here. 23/09/08 SJD.
191        try:
192            timeData = self._record.getTemporalData() 
193        except:
194           
195            timeData = [ TimeRange('null', 'null') ]
196           
197        spatialData = self._record.getSpatialData()
198       
199        # check if we have any spatiotemporal data to add; escape if not
200        if not timeData and not spatialData:
201            logging.info("No spatiotemporal data found for record - skipping")
202            return
203       
204        # setup dummy, null data if required
205        if not timeData:
206            timeData = [ TimeRange('null', 'null') ]
207       
208        if not spatialData:
209            spatialData = [ Coords('null', 'null', 'null', 'null') ]
210       
211        # if both arrays of data are the same length, assume they map together
212        i = 0
213        if len(timeData) == len(spatialData):
214            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
215            for timeRange in timeData:
216                self._insertSpatioTemporalRow(spatialData[i], timeRange)
217                i += 1
218
219        # otherwise, map everything to everything
220        else:
221            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
222            for timeRange in timeData:
223                for coords in spatialData:
224                    self._insertSpatioTemporalRow(coords, timeRange)
225        logging.info("Spatiotemporal data added to DB record")
226           
227   
228    def _insertOriginalRecord(self):
229        '''
230        Insert the original metadata doc into the postgres DB
231        '''
232        logging.info("Inserting new original document in Postgres DB")
233        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
234            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
235            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
236            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "');" 
237
238        id = self.pgc.runSQLCommand(sqlCmd)
239        if len(id) == 0:
240            raise ValueError, 'No PK ID returned from INSERT to DB'
241       
242        self._record.db_id = id[0][0] 
243        logging.info("Original document inserted in Postgres DB")
244           
245   
246    def deleteOriginalRecord(self):
247        '''
248        Delete the original metadata doc from the postgres DB
249        '''
250        logging.info("Deleting original document from Postgres DB")
251        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
252
253        self.pgc.runSQLCommand(sqlCmd)
254        logging.info("Original document deleted from Postgres DB")
255       
256   
257    def _updateOriginalRecord(self):
258        '''
259        Update the original doc in the postgres DB
260        '''
261        logging.info("Updating original document in Postgres DB")
262        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
263            self._record.shortFilename + "', '" + \
264            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
265            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
266            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + "');"
267        self.pgc.runSQLCommand(sqlCmd)
268       
269        # increment scn - saves doing an additional db call
270        self._record.scn += 1
271        logging.info("Original document updated in Postgres DB")
272   
273           
274    def _insertMetadataRecords(self):
275        '''
276        Insert the metadata docs into the postgres DB
277        '''
278        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
279        if self._record.db_id is None:
280            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
281            return
282       
283        for docType, doc in self._record.getAllDocs():
284            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
285                "original_document_id, transformed_format, " \
286                "transformed_document, create_date, scn) VALUES (" \
287                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
288                docType + "', '" + doc + "', current_timestamp, 1);"
289           
290            self.pgc.runSQLCommand(sqlCmd)
291       
292        logging.info("Transformed records created in DB")
293   
294   
295    def _updateMetadataRecords(self):
296        '''
297        Update the metadata docs into the postgres DB
298        '''
299        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
300        if self._record.db_id is None:
301            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
302            return
303       
304        for docType, doc in self._record.getAllDocs():
305            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
306                "', update_date = current_timestamp WHERE original_document_id = " + \
307                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
308
309            self.pgc.runSQLCommand(sqlCmd)
310   
311        logging.info("Transformed records updated in DB")
312
313
314    def setRecord(self, record):
315        '''
316        Set the record to use with the DAL - to allow object re-use
317        @param record: PostgresRecord to use with the DAL
318        '''
319        self._record = record
320
Note: See TracBrowser for help on using the repository browser.