source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 3846

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@3846
Revision 3846, 16.3 KB checked in by cbyrom, 13 years ago (diff)

Ajudst DAO and Record classes to throw errors rather than catching them

  • to allow processing of multiple files (wrapped by oai_ingest)

to continue more cleanly.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging
7import db_funcs
8
9class PostgresDAO:
10   
11    def __init__(self, record, connection=None):
12        '''
13        Constructor - to initialise the DAL and do some initial setting up
14        @param record: the PostgresRecord object to add or update in the DB
15        '''
16        if record == "":
17            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
18        else:
19            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
20
21        # setup a connection to the db - if none specified
22        if connection is None:
23            connection = db_funcs.db_connect()
24        self._connection = connection
25        self._record = record
26       
27
28    def commavaluesfromsublist(self, list):
29        '''
30        Creates an comma-separated string of values from a list of sublists, taking
31        the first element of each sublist
32        '''
33        csvString = ""
34        for item in list:
35            if item:
36                if len(csvString) > 0:
37                    csvString += ","   
38                csvString += "%s" % item[0]
39           
40        return csvString
41
42           
43    def getRecordID(self):
44        '''
45        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
46        returns '-1'
47        @return: id of record, if it exists, '-1' if it doesn't
48        '''
49        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
50        if self._record.db_id is not None and self._record.db_id > 0:
51            logging.info("Already looked up record - ID is " + str(self._record.db_id))
52            return self._record.db_id
53       
54        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
55            self._record.discovery_id + "';"
56        dbId = db_funcs.runSQLCommand(self._connection, sql)
57        if dbId:
58            self._record.db_id = dbId[0][0]
59       
60
61    def createOrUpdateRecord(self):
62        '''
63        Looks up a record in the DB; if it finds it, update it, otherwise create it
64        '''
65        self.getRecordID()
66        if self._record.db_id:
67            self.updateRecord()
68        else:
69            self.createRecord()
70
71           
72    def createRecord(self):
73        '''
74        Create a record in the postgres DB based on the specified PostgresRecord object
75        '''
76        logging.info("Creating a new record in the DB for the metada document")
77        # firstly store the actual documents contained by the record - i.e. the original doc +
78        # the various transforms required
79        self._insertOriginalRecord()
80        self._insertMetadataRecords()
81       
82        # Now add the spatiotemporal data
83        self._insertSpatioTemporalData()
84        logging.info("New record created")
85           
86
87    def updateRecord(self):
88        '''
89        Update a record in the postgres DB based on the specified PostgresRecord object
90        '''
91        logging.info("Record already existing in DB - performing updates")
92        # firstly, check the document is actually new - i.e. not just a repeat of the same data
93        if self._checkIsUpdatedRecord():
94       
95            # firstly update the actual documents contained by the record - i.e. the original doc +
96            # the various transforms required
97            self._updateMetadataRecords()
98
99            # Now update the spatiotemporal data
100            self._updateSpatioTemporalData()
101
102        logging.info("Finish processing document...")
103       
104       
105    def _checkIsUpdatedRecord(self):
106        '''
107        Looks up an existing record and checks it is not identical to the new record; if it is
108        incremement the harvest_count value and don't process again
109        @return: True if doc contains changes, False otherwise
110        '''
111        logging.info("Checking the updated document actually contains changes")
112
113        sql = "SELECT harvest_count FROM ORIGINAL_DOCUMENT where original_document_id = " + \
114            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
115        count = db_funcs.runSQLCommand(self._connection, sql)
116
117        # NB, if the document is not identical, the sql command will not find anything
118        if not count:
119            logging.info("Ingested document is different to that in the current DB")
120            return True
121           
122        count = count[0][0]
123        logging.info("Ingested document is identical to document currently in DB - " + \
124                     "incrementing harvest_count")
125        count += 1
126        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
127            " WHERE original_document_id = " + str(self._record.db_id)
128        db_funcs.runSQLCommand(self._connection, sql)
129        return False
130
131
132   
133    def _insertSpatioTemporalData(self):
134        '''
135        Create a record in the postgres DB based on the spatiotemporal data
136        specified in the PostgresRecord object
137        '''
138        logging.info("Adding spatiotemporal data to DB record")
139        # NB, build the query up in two parts so that we only include the correct data
140        sqlStart = ""
141        insert_sql="INSERT INTO "+location_table+" (geom) VALUES ( setsrid('BOX3D("+west+" "+south+","+east+" "+north+")'::box3d,4326) );"
142        sqlEnd = " VALUES (DEFAULT, '" + str(self._record.db_id) 
143        if (not self._record.hasNullCoords()):
144            sqlStart += "coordinates"
145            sqlEnd += "', sbox'((" + self._record.west + \
146                "d , " + self._record.south + "d), (" + self._record.east +"d , " + \
147                self._record.north + "d))' "
148       
149        if str(self._record.startdate)!='nostartdate':
150            if len(sqlStart) > 0:
151                sqlStart += ", "
152                sqlEnd += ", "
153            sqlStart += "startdate"
154            sqlEnd += "'" + self._record.startdate + "'"
155           
156        if str(self._record.enddate)!='noenddate':
157            if len(sqlStart) > 0:
158                sqlStart += ", "
159                sqlEnd += ", "
160            sqlStart += ", enddate"
161            sqlEnd += ", '" + self._record.enddate + "'"
162
163        if len(sqlStart) > 0:
164            sqlStart = ", " + sqlStart
165            sqlEnd = ", " + sqlEnd
166           
167        sqlCmd = "INSERT INTO spatiotemp (id, original_doc_id" + sqlStart + ") " + sqlEnd + ");"
168        db_funcs.runSQLCommand(self._connection, sqlCmd)
169        logging.info("Spatiotemporal data added to DB record")
170           
171   
172    def _updateSpatioTemporalData(self):
173        '''
174        Updates a record in the postgres DB based on the spatiotemporal data
175        specified in the PostgresRecord object
176        '''
177        # NB, build the query up in two parts so that we only include the correct data
178        logging.info("Updating spatiotemporal data to DB record")
179        sqlStart = ""
180        sqlEnd = " WHERE original_doc_id='" + str(self._record.db_id) + "';" 
181        if (not self._record.hasNullCoords()):
182            sqlStart += "coordinates = sbox'((" + self._record.west + "d , " + \
183                self._record.south + "d), (" + self._record.east + "d , " + \
184                self._record.north + "d))'"
185       
186        if str(self._record.startdate)!='nostartdate':
187            if len(sqlStart) > 0:
188                sqlStart += ", "
189            sqlStart += "startdate = '" + self._record.startdate + "'"
190           
191        if str(self._record.enddate)!='noenddate':
192            if len(sqlStart) > 0:
193                sqlStart += ", "
194            sqlStart += "enddate = '" + enddate + "'"
195
196        if len(sqlStart) > 0:
197            sqlStart += ", "
198        sqlCmd = "UPDATE spatiotemp SET (" + sqlStart + "update_time= now()) " + sqlEnd + ");"
199
200        db_funcs.runSQLCommand(self._connection, sqlCmd)
201        logging.info("Spatiotemporal data updated for DB record")
202   
203           
204    def _insertOriginalRecord(self):
205        '''
206        Insert the original metadata doc into the postgres DB
207        '''
208        logging.info("Inserting new original document in Postgres DB")
209        sqlCmd = "INSERT INTO ORIGINAL_DOCUMENT (original_document_id, original_document_filename, " + \
210            "discovery_id, original_format, " + \
211            "original_document, ts_vector, create_date, harvest_count, scn) VALUES (" + \
212            "DEFAULT, '" + self._record.filename + "', '" + self._record.discovery_id + \
213            "', '" + self._record.docType + "', '" + self._record.originalFormat + \
214            "', to_tsvector('english', '" + self._record.originalFormat + "'), current_date, 1, 1);"
215           
216        self._record.db_id = db_funcs.runSQLCommand(self._connection, sqlCmd)
217        logging.info("Original document inserted in Postgres DB")
218       
219   
220    def _updateOriginalRecord(self):
221        '''
222        Update the original doc in the postgres DB
223        '''
224        logging.info("Updating original document in Postgres DB")
225        sqlCmd = "UPDATE ORIGINAL_DOCUMENT SET (original_document_filename = '" + self._record.filename + \
226            "', discovery_id = '" + self._record.discovery_id + "', " + \
227            "original_format = '" + self._record.originalFormat + "', " + \
228            "ts_vector = to_tsvector('english', '" + self._record.originalFormat + "'), " + \
229            "update_date = current_date, harvest_count = 1)" + \
230            " WHERE original_document_id = " + str(self._record.db_id) + ";"
231   
232        db_funcs.runSQLCommand(self._connection, sqlCmd)
233        logging.info("Original document updated in Postgres DB")
234   
235           
236    def _insertMetadataRecords(self):
237        '''
238        Insert the metadata docs into the postgres DB
239        '''
240        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.filename)
241        if self._record.db_id is None:
242            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
243            return
244       
245        for docType, doc in self._record.getAllDocs():
246            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
247                "original_record_id, transformed_format, " \
248                "transformed_document, create_date, scn) VALUES (" \
249                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
250                docType + "', '" + doc + "'), current_date, 1);"
251           
252            db_funcs.runSQLCommand(self._connection, sqlCmd)
253       
254        logging.info("Transformed records created in DB")
255   
256   
257    def _updateMetadataRecords(self):
258        '''
259        Update the metadata docs into the postgres DB
260        '''
261        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.filename)
262        if self._record.db_id is None:
263            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
264            return
265       
266        for docType, doc in self._record.getAllDocs():
267            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET (transformed_document = '" + doc + \
268                "', update_date = current_date) WHERE original_record_id = " + \
269                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
270
271            db_funcs.runSQLCommand(self._connection, sqlCmd)
272   
273        logging.info("Transformed records updated in DB")
274
275
276    def setRecord(self, record):
277        '''
278        Set the record to use with the DAL - to allow object re-use
279        @param record: PostgresRecord to use with the DAL
280        '''
281        self._record = record
282
283
284   
285    def link_spatial_and_temporal(Mid,west,south,east,north,startdate,enddate):
286        '''
287        Creates an entry in the item/time/location table linking an item, time and location.
288        '''
289        locationid = insert_spatial_coverage(Mid,west,south,east,north)
290        timeid = insert_temporal_coverage(Mid,startdate,enddate)
291        if (locationid > -1):
292            locationidstr = "%s" % locationid
293        else:
294            locationidstr = "NULL"
295   
296        if (timeid > -1):
297            timeidstr = "%s" % timeid
298        else:
299            timeidstr = "NULL"
300   
301        itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, locationid, timeid) values ('"+Mid+"', "+locationidstr+", "+timeidstr+" );"
302   
303        #print "ItemTimeLocation:\t"+itemtimelocation_sql
304        cursor = connection.cursor()
305        try:
306            cursor.execute(itemtimelocation_sql)
307        except:
308            print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)
309        connection.commit()
310   
311    def link_spatial(Mid,west,south,east,north):
312            '''Creates an entry in the item/time/location table linking an item and location only'''
313            locationid = insert_spatial_coverage(Mid,west,south,east,north)
314            if (locationid > -1):
315                    locationidstr = "%s" % locationid
316            else:
317                    locationidstr = "NULL"
318   
319            itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, locationid) values ('"+Mid+"', "+locationidstr+" );"
320   
321            #print "ItemTimeLocation:\t"+itemtimelocation_sql
322            cursor = connection.cursor()
323            try:
324                cursor.execute(itemtimelocation_sql)
325            except:
326                print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)
327            connection.commit()
328   
329    def link_temporal(Mid,startdate,enddate):
330            '''Creates an entry in the item/time/location table linking an item and time only'''
331            timeid = insert_temporal_coverage(Mid,startdate,enddate)
332            if (timeid > -1):
333                    timeidstr = "%s" % timeid
334            else:
335                    timeidstr = "NULL"
336   
337            itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, timeid) values ('"+Mid+"', "+timeidstr+" );"
338   
339            #print "ItemTimeLocation:\t"+itemtimelocation_sql
340            cursor = connection.cursor()
341            try:
342                cursor.execute(itemtimelocation_sql)
343            except:
344                print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)
345            connection.commit()
346   
347   
348    def insert_spatial_coverage(Mid,west,south,east,north):
349        '''
350        Attempts to insert spatial coverage into spatial table.
351        If successful, returns newly created location id, -1 otherwise
352        '''
353        returnid = -1
354        if str(west)!='null' and str(south)!='null' and str(east)!='null' and str(north)!='null':
355            insert_sql="INSERT INTO "+location_table+" (geom) VALUES ( setsrid('BOX3D("+west+" "+south+","+east+" "+north+")'::box3d,4326) );"
356            print insert_sql
357        cursor = connection.cursor()
358        try:
359            cursor.execute(insert_sql)
360            if (cursor.rowcount > 0):
361                getid_sql="select currval('"+location_id_seq+"');"
362                cursor.execute(getid_sql)
363                id = commavaluesfromsublist(cursor.fetchall())
364                return int(id)
365            else:
366                return -1
367        except:
368            print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)
369        connection.commit()
370
371   
372    def insert_temporal_coverage(Mid,startdate,enddate):
373        '''Attempts to insert temporal coverage (only) into time table. If successful, returns timeid to be used in item table, -1 otherwise'''
374        returnid = -1
375        if (startdate=='nostartdate' and enddate=='noenddate'):
376            # Skip if both are missing, although it's OK if only 1 is given
377            return -1
378        else:
379            if (startdate=='nostartdate'): startdate='NULL'
380            if (enddate=='noenddate'): enddate='NULL'
381            insert_sql =  "INSERT INTO "+time_table+" (start_time, end_time) VALUES('"+startdate+"', '"+enddate+"');"
382            print insert_sql
383            cursor = connection.cursor()
384            try:
385                    cursor.execute(insert_sql)
386                    if (cursor.rowcount > 0):
387                            getid_sql="select currval('"+time_id_seq+"');"
388                            cursor.execute(getid_sql)
389                            id = commavaluesfromsublist(cursor.fetchall())
390                            return int(id)
391                    else:
392                            return -1
393            except:
394                    print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)
395            connection.commit()
Note: See TracBrowser for help on using the repository browser.