source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 3853

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@3853
Revision 3853, 12.3 KB checked in by cbyrom, 13 years ago (diff)

Update python scripts to call the new stored procedures, to properly
handle SCN (system change number) values in the DB, to provide status
on the update/create of records - to allow 'number of ingested records'
result to be displayed at end + include method to delete existing
spatiotemporal data when doing updates + add additional logging.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging
7import db_funcs
8from SpatioTemporalData import *
9
10class PostgresDAO:
11   
12    def __init__(self, record, connection=None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        '''
17        if record == "":
18            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
19        else:
20            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
21
22        # setup a connection to the db - if none specified
23        if connection is None:
24            connection = db_funcs.db_connect()
25        self._connection = connection
26        self._record = record
27       
28
29    def commavaluesfromsublist(self, list):
30        '''
31        Creates an comma-separated string of values from a list of sublists, taking
32        the first element of each sublist
33        '''
34        csvString = ""
35        for item in list:
36            if item:
37                if len(csvString) > 0:
38                    csvString += ","   
39                csvString += "%s" % item[0]
40           
41        return csvString
42
43           
44    def getRecordID(self):
45        '''
46        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
47        returns '-1'
48        @return: id of record, if it exists, '-1' if it doesn't
49        '''
50        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
51        if self._record.db_id is not None and self._record.db_id > 0:
52            logging.info("Already looked up record - ID is " + str(self._record.db_id))
53            return self._record.db_id
54       
55        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
56            self._record.discovery_id + "';"
57        dbId = db_funcs.runSQLCommand(self._connection, sql)
58        if dbId:
59            self._record.db_id = dbId[0][0]
60       
61
62    def createOrUpdateRecord(self):
63        '''
64        Looks up a record in the DB; if it finds it, update it, otherwise create it
65        @return result: True if record created/updated, false if otherwise
66        '''
67        self.getRecordID()
68        if self._record.db_id:
69            return self.updateRecord()
70        else:
71            return self.createRecord()
72           
73
74           
75    def createRecord(self):
76        '''
77        Create a record in the postgres DB based on the specified PostgresRecord object
78        @return result: True if record created, false if otherwise
79        '''
80        logging.info("Creating a new record in the DB for the metada document")
81        # firstly store the actual documents contained by the record - i.e. the original doc +
82        # the various transforms required
83        self._insertOriginalRecord()
84        self._insertMetadataRecords()
85       
86        # Now add the spatiotemporal data
87        self._insertSpatioTemporalData()
88        logging.info("New record created")
89        return True
90           
91
92    def updateRecord(self):
93        '''
94        Update a record in the postgres DB based on the specified PostgresRecord object
95        @return result: True if record updated, false if otherwise
96        '''
97        logging.info("Record already existing in DB - performing updates")
98        result = False
99        # firstly, check the document is actually new - i.e. not just a repeat of the same data
100        if self._checkIsUpdatedRecord():
101       
102            # firstly update the actual documents contained by the record - i.e. the original doc +
103            # the various transforms required
104            self._updateMetadataRecords()
105
106            # If doing an update of an existing record, clear out existing spatiotemporal
107            # data, rather than updating it, to keep things simple
108            logging.info("Clearing out existing data for record - to allow clean update")
109            self._deleteSpatioTemporalData()
110            self._insertSpatioTemporalData()
111            result = True
112
113        logging.info("Finish processing document...")
114        return result
115       
116       
117    def _checkIsUpdatedRecord(self):
118        '''
119        Looks up an existing record and checks it is not identical to the new record; if it is
120        incremement the harvest_count value and don't process again
121        @return: True if doc contains changes, False otherwise
122        '''
123        logging.info("Checking the updated document actually contains changes")
124
125        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
126            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
127        results = db_funcs.runSQLCommand(self._connection, sql)
128
129        # NB, if the document is not identical, the sql command will not find anything
130        if not results:
131            logging.info("Ingested document is different to that in the current DB")
132            return True
133           
134        count = results[0][0]
135
136        # get current system change number
137        scn = results[0][1]
138        self._record.scn = scn
139        logging.info("Ingested document is identical to document currently in DB - " + \
140                     "incrementing harvest_count")
141        count += 1
142        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
143            " WHERE original_document_id = " + str(self._record.db_id)
144        db_funcs.runSQLCommand(self._connection, sql)
145        return False
146
147
148    def _deleteSpatioTemporalData(self):
149        '''
150        Delete all existing spatiotemporal data for the current record
151        - NB, the delete is set to cascade from the linking table so only need to
152        delete entries from there
153        '''
154        logging.info("Deleting existing spatiotemporal data for record")
155        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_record_id = " + \
156            str(self._record.db_id) + ";"     
157
158        db_funcs.runSQLCommand(self._connection, sqlCmd)
159        logging.info("Spatiotemporal data deleted successfully")
160
161
162    def _insertSpatioTemporalRow(self, coords, timeRange):
163        '''
164        Create a single row record in the postgres DB based on the
165        input data
166        @param coords: a Coords object representing NSWE coords
167        @param timeRange: a TimeRange object representing a start/end date
168        '''
169        logging.info("Adding spatiotemporal row to DB")
170        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
171            str(coords.north) + "', '" + str(coords.south) + "', '" + str(coords.west) + "', '" + \
172            str(coords.east) + "', "
173       
174        # cope with null dates appropriately
175        if timeRange.start == "null":
176            sqlCmd += timeRange.start + ", "
177        else:
178            sqlCmd += "'" + timeRange.start + "', "
179           
180        if timeRange.end == "null":
181            sqlCmd += timeRange.end
182        else:
183            sqlCmd += "'" + timeRange.end + "'"
184       
185        sqlCmd += ");"     
186
187        db_funcs.runSQLCommand(self._connection, sqlCmd)
188        logging.info("Spatiotemporal row added successfully")
189       
190   
191    def _insertSpatioTemporalData(self):
192        '''
193        Create a record in the postgres DB based on the spatiotemporal data
194        specified in the PostgresRecord object
195        '''
196        logging.info("Adding spatiotemporal data to DB record")
197       
198        # Work out the relationship between the spatial and temporal data and handle appropriately
199        timeData = self._record.stData.getTemporalData()
200        spatialData = self._record.stData.getSpatialData()
201       
202        # check if we have any spatiotemporal data to add; escape if not
203        if not timeData and not spatialData:
204            logging.info("No spatiotemporal data found for record - skipping")
205            return
206       
207        # setup dummy, null data if required
208        if not timeData:
209            timeData = [ TimeRange('null', 'null') ]
210       
211        if not spatialData:
212            spatialData = [ Coords('null', 'null', 'null', 'null') ]
213       
214        # if both arrays of data are the same length, assume they map together
215        i = 0
216        if len(timeData) == len(spatialData):
217            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
218            for timeRange in timeData:
219                self._insertSpatioTemporalRow(spatialData[i], timeRange)
220                i += 1
221
222        # otherwise, map everything to everything
223        else:
224            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
225            for timeRange in timeData:
226                for coords in spatialData:
227                    self._insertSpatioTemporalRow(coords, timeRange)
228        logging.info("Spatiotemporal data added to DB record")
229           
230   
231    def _insertOriginalRecord(self):
232        '''
233        Insert the original metadata doc into the postgres DB
234        '''
235        logging.info("Inserting new original document in Postgres DB")
236        sqlCmd = "SELECT create_document('" + self._record.filename + "', '" + \
237            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
238            self._record.originalFormat + "');"     
239
240        id = db_funcs.runSQLCommand(self._connection, sqlCmd)
241        if len(id) == 0:
242            raise ValueError, 'No PK ID returned from INSERT to DB'
243       
244        self._record.db_id = id[0][0] 
245        logging.info("Original document inserted in Postgres DB")
246       
247   
248    def _updateOriginalRecord(self):
249        '''
250        Update the original doc in the postgres DB
251        '''
252        logging.info("Updating original document in Postgres DB")
253        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
254            self._record.filename + "', '" + \
255            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
256            self._record.originalFormat + "', '" + str(self._record.scn) + "');"
257
258        id = db_funcs.runSQLCommand(self._connection, sqlCmd)
259        db_funcs.runSQLCommand(self._connection, sqlCmd)
260       
261        # increment scn - saves doing an additional db call
262        self._record.scn += 1
263        logging.info("Original document updated in Postgres DB")
264   
265           
266    def _insertMetadataRecords(self):
267        '''
268        Insert the metadata docs into the postgres DB
269        '''
270        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.filename)
271        if self._record.db_id is None:
272            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
273            return
274       
275        for docType, doc in self._record.getAllDocs():
276            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
277                "original_record_id, transformed_format, " \
278                "transformed_document, create_date, scn) VALUES (" \
279                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
280                docType + "', '" + doc + "', current_timestamp, 1);"
281           
282            db_funcs.runSQLCommand(self._connection, sqlCmd)
283       
284        logging.info("Transformed records created in DB")
285   
286   
287    def _updateMetadataRecords(self):
288        '''
289        Update the metadata docs into the postgres DB
290        '''
291        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.filename)
292        if self._record.db_id is None:
293            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
294            return
295       
296        for docType, doc in self._record.getAllDocs():
297            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
298                "', update_date = current_timestamp WHERE original_record_id = " + \
299                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
300
301            db_funcs.runSQLCommand(self._connection, sqlCmd)
302   
303        logging.info("Transformed records updated in DB")
304
305
306    def setRecord(self, record):
307        '''
308        Set the record to use with the DAL - to allow object re-use
309        @param record: PostgresRecord to use with the DAL
310        '''
311        self._record = record
312
Note: See TracBrowser for help on using the repository browser.