source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 3839

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@3839
Revision 3839, 15.9 KB checked in by cbyrom, 12 years ago (diff)

Add script to run ingest for all avaiable config files.
Make oai_ingest_new2 a proper object.
Adjust db_funcs - now pass in details to set up database connection

  • although defaults available, if not done.

Simplify coord parsing in PostgresRecord? by using a reusable function.
+ various tidy ups and fixes.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging
7import molesReadWrite as MRW
8import db_funcs
9
10class PostgresDAO:
11   
12    def __init__(self, record, connection=None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        '''
17        if record == "":
18            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
19        else:
20            print "INFO: Creating/updating DB entry for record, %s" %record.discovery_id
21
22        # setup a connection to the db - if none specified
23        if connection is None:
24            connection = db_funcs.db_connect()
25        self._connection = connection
26        self._record = record
27        self.id = None
28       
29
30    def commavaluesfromsublist(self, list):
31        '''
32        Creates an comma-separated string of values from a list of sublists, taking
33        the first element of each sublist
34        '''
35        csvString = ""
36        for item in list:
37            if item:
38                if len(csvString) > 0:
39                    csvString += ","   
40                csvString += "%s" % item[0]
41           
42        return csvString
43
44           
45    def getRecordID(self):
46        '''
47        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
48        returns '-1'
49        @return: id of record, if it exists, '-1' if it doesn't
50        '''
51        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
52        if self.id is not None and self.id > 0:
53            logging.info("Already looked up record - ID is " + self.id)
54            return self.id
55       
56        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + self._record.discovery_id + "';"
57        self.id = db_funcs.runSQLCommand(self._connection, sql)
58        print "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXKKKK", self.id
59       
60
61    def createOrUpdateRecord(self):
62        '''
63        Looks up a record in the DB; if it finds it, update it, otherwise create it
64        '''
65        if self.getRecordID() > 0:
66            self.createRecord()
67        else:
68            self.updateRecord()
69
70           
71    def createRecord(self):
72        '''
73        Create a record in the postgres DB based on the specified PostgresRecord object
74        '''
75        logging.info("Creating a new record in the DB for the metada document")
76        # firstly store the actual documents contained by the record - i.e. the original doc +
77        # the various transforms required
78        self._insertOriginalRecord()
79        self._insertMetadataRecords()
80       
81        # Now add the spatiotemporal data
82        self._insertSpatioTemporalData()
83        logging.info("New record created")
84           
85
86    def updateRecord(self):
87        '''
88        Update a record in the postgres DB based on the specified PostgresRecord object
89        '''
90        logging.info("Record already existing in DB - performing updates")
91        # firstly, check the document is actually new - i.e. not just a repeat of the same data
92        if _checkIsUpdatedRecord():
93       
94            # firstly update the actual documents contained by the record - i.e. the original doc +
95            # the various transforms required
96            self._updateMetadataRecords(record)
97           
98            # Now update the spatiotemporal data
99            self._updateSpatioTemporalData(record)
100       
101       
102    def _checkIsUpdatedRecord(self):
103        '''
104        Looks up an existing record and checks it is not identical to the new record; if it is
105        incremement the harvest_count value and don't process again
106        @return: True if doc contains changes, False otherwise
107        '''
108        logging.info("Checking the updated document actually contains changes")
109        sql = "SELECT harvest_count FROM ORIGINAL_DOCUMENT where original_document_id = " + self.id + \
110            " AND original_document = " + self._record.originalFormat + "';"
111        count = db_funcs.runSQLCommand(self._connection, sql)
112       
113        # NB, if the document is not identical, the sql command will not find anything
114        if count > 0:
115            logging.info("Ingested document is identical to document currently in DB - incrementing harvest_count")
116            count += 1
117            sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + count + " WHERE original_document_id = " + self.id
118            count = db_funcs.runSQLCommand(self._connection, sql)
119            return False
120
121        logging.info("Ingested document is different to that in the current DB")
122        return True
123
124   
125    def _insertSpatioTemporalData(self):
126        '''
127        Create a record in the postgres DB based on the spatiotemporal data
128        specified in the PostgresRecord object
129        '''
130        logging.info("Adding spatiotemporal data to DB record")
131        # NB, build the query up in two parts so that we only include the correct data
132        sqlStart = ""
133        insert_sql="INSERT INTO "+location_table+" (geom) VALUES ( setsrid('BOX3D("+west+" "+south+","+east+" "+north+")'::box3d,4326) );"
134        sqlEnd = " VALUES (DEFAULT, '" + self._record.id
135        if (not self._record.hasNullCoords()):
136            sqlStart += "coordinates"
137            sqlEnd += "', sbox'((" + self._record.west + \
138                "d , " + self._record.south + "d), (" + self._record.east +"d , " + \
139                self._record.north + "d))' "
140       
141        if str(self._record.startdate)!='nostartdate':
142            if len(sqlStart) > 0:
143                sqlStart += ", "
144                sqlEnd += ", "
145            sqlStart += "startdate"
146            sqlEnd += "'" + self._record.startdate + "'"
147           
148        if str(self._record.enddate)!='noenddate':
149            if len(sqlStart) > 0:
150                sqlStart += ", "
151                sqlEnd += ", "
152            sqlStart += ", enddate"
153            sqlEnd += ", '" + self._record.enddate + "'"
154
155        if len(sqlStart) > 0:
156            sqlStart = ", " + sqlStart
157            sqlEnd = ", " + sqlEnd
158        sqlCmd = "INSERT INTO spatiotemp (id, original_doc_id" + sqlStart + ") " + sqlEnd + ");"
159        db_funcs.runSQLCommand(self._connection, sqlCmd)
160        logging.info("Spatiotemporal data added to DB record")
161           
162   
163    def _updateSpatioTemporalData(self):
164        '''
165        Updates a record in the postgres DB based on the spatiotemporal data
166        specified in the PostgresRecord object
167        '''
168        # NB, build the query up in two parts so that we only include the correct data
169        logging.info("Updating spatiotemporal data to DB record")
170        sqlStart = ""
171        sqlEnd = " WHERE original_doc_id='" + self._record.id + "';" 
172        if (not self._record.hasNullCoords()):
173            sqlStart += "coordinates = sbox'((" + self._record.west + "d , " + \
174                self._record.south + "d), (" + self._record.east + "d , " + \
175                self._record.north + "d))'"
176       
177        if str(self._record.startdate)!='nostartdate':
178            if len(sqlStart) > 0:
179                sqlStart += ", "
180            sqlStart += "startdate = '" + self._record.startdate + "'"
181           
182        if str(self._record.enddate)!='noenddate':
183            if len(sqlStart) > 0:
184                sqlStart += ", "
185            sqlStart += "enddate = '" + enddate + "'"
186
187        if len(sqlStart) > 0:
188            sqlStart += ", "
189        sqlCmd = "UPDATE spatiotemp SET (" + sqlStart + "update_time= now()) " + sqlEnd + ");"
190
191        db_funcs.runSQLCommand(self._connection, sqlCmd)
192        logging.info("Spatiotemporal data updated for DB record")
193   
194           
195    def _insertOriginalRecord(self):
196        '''
197        Insert the original metadata doc into the postgres DB
198        '''
199        logging.info("Inserting new original document in Postgres DB")
200        sqlCmd = "INSERT INTO ORIGINAL_DOCUMENT (original_document_id, original_document_name, original_format, " \
201            "original_document, ts_vector, create_date, harvest_count, scn) VALUES (" \
202            "DEFAULT, '" + self._record.filename + "', '" + self._record.docType + "', '" + self._record.originalFormat + \
203            "', to_tsvector('english', " + self._record.originalFormat + "), current_date, 1, 1);"
204           
205        id = db_funcs.runSQLCommand(self._connection, sqlCmd)
206        self.id = id
207        self._record.db_id = id
208        logging.info("Original document inserted in Postgres DB")
209       
210   
211    def _updateOriginalRecord(self):
212        '''
213        Update the original doc in the postgres DB
214        '''
215        logging.info("Updating original document in Postgres DB")
216        sqlCmd = "UPDATE ORIGINAL_DOCUMENT SET (original_document_name = '" + self._record.filename + "', " \
217            "original_format = '" + self._record.originalFormat + "', " \
218            "ts_vector = to_tsvector('english', " + self._record.originalFormat + "), " \
219            "update_date = current_date, " \
220            "harvest_count = 1)" \
221            " WHERE original_document_id = " + self._record.db_id + ";"
222   
223        db_funcs.runSQLCommand(self._connection, sqlCmd)
224        logging.info("Original document updated in Postgres DB")
225   
226           
227    def _insertMetadataRecords(self):
228        '''
229        Insert the metadata docs into the postgres DB
230        '''
231        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.filename)
232        if self._record.db_id is None:
233            print "No DB ID for the original record exists; cannot add associated transformed docs"
234            return
235       
236        for docType, doc in record.getAllDocs():
237            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
238                "original_record_id, transformed_format, " \
239                "transformed_document, create_date, scn) VALUES (" \
240                "DEFAULT, '" + self._record.db_id + "', '" + \
241                docType + "', '" + doc + "'), current_date, 1);"
242           
243            db_funcs.runSQLCommand(self._connection, sqlCmd)
244       
245        logging.info("Transformed records created in DB")
246   
247   
248    def _updateMetadataRecords(self):
249        '''
250        Update the metadata docs into the postgres DB
251        '''
252        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.filename)
253        if self._record.db_id is None:
254            print "No DB ID for the original record exists; cannot update associated transformed docs"
255            return
256       
257        for docType, doc in record.getAllDocs():
258            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET (transformed_document = '" + doc + \
259                "', update_date = current_date) WHERE original_record_id = " + \
260                self._record.db_id + " AND transformed_format = '" + docType + "';"
261
262            db_funcs.runSQLCommand(self._connection, sqlCmd)
263   
264        logging.info("Transformed records updated in DB")
265
266
267    def setRecord(self, record):
268        '''
269        Set the record to use with the DAL - to allow object re-use
270        @param record: PostgresRecord to use with the DAL
271        '''
272        self._record = record
273
274
275   
276    def link_spatial_and_temporal(Mid,west,south,east,north,startdate,enddate):
277        '''
278        Creates an entry in the item/time/location table linking an item, time and location.
279        '''
280        locationid = insert_spatial_coverage(Mid,west,south,east,north)
281        timeid = insert_temporal_coverage(Mid,startdate,enddate)
282        if (locationid > -1):
283            locationidstr = "%s" % locationid
284        else:
285            locationidstr = "NULL"
286   
287        if (timeid > -1):
288            timeidstr = "%s" % timeid
289        else:
290            timeidstr = "NULL"
291   
292        itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, locationid, timeid) values ('"+Mid+"', "+locationidstr+", "+timeidstr+" );"
293   
294        print "ItemTimeLocation:\t"+itemtimelocation_sql
295        cursor = connection.cursor()
296        try:
297            cursor.execute(itemtimelocation_sql)
298        except:
299            print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)
300        connection.commit()
301   
302    def link_spatial(Mid,west,south,east,north):
303            '''Creates an entry in the item/time/location table linking an item and location only'''
304            locationid = insert_spatial_coverage(Mid,west,south,east,north)
305            if (locationid > -1):
306                    locationidstr = "%s" % locationid
307            else:
308                    locationidstr = "NULL"
309   
310            itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, locationid) values ('"+Mid+"', "+locationidstr+" );"
311   
312            print "ItemTimeLocation:\t"+itemtimelocation_sql
313            cursor = connection.cursor()
314            try:
315                cursor.execute(itemtimelocation_sql)
316            except:
317                print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)
318            connection.commit()
319   
320    def link_temporal(Mid,startdate,enddate):
321            '''Creates an entry in the item/time/location table linking an item and time only'''
322            timeid = insert_temporal_coverage(Mid,startdate,enddate)
323            if (timeid > -1):
324                    timeidstr = "%s" % timeid
325            else:
326                    timeidstr = "NULL"
327   
328            itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, timeid) values ('"+Mid+"', "+timeidstr+" );"
329   
330            print "ItemTimeLocation:\t"+itemtimelocation_sql
331            cursor = connection.cursor()
332            try:
333                cursor.execute(itemtimelocation_sql)
334            except:
335                print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)
336            connection.commit()
337   
338   
339    def insert_spatial_coverage(Mid,west,south,east,north):
340        '''
341        Attempts to insert spatial coverage into spatial table.
342        If successful, returns newly created location id, -1 otherwise
343        '''
344        returnid = -1
345        if str(west)!='null' and str(south)!='null' and str(east)!='null' and str(north)!='null':
346            insert_sql="INSERT INTO "+location_table+" (geom) VALUES ( setsrid('BOX3D("+west+" "+south+","+east+" "+north+")'::box3d,4326) );"
347            print insert_sql
348        cursor = connection.cursor()
349        try:
350            cursor.execute(insert_sql)
351            if (cursor.rowcount > 0):
352                getid_sql="select currval('"+location_id_seq+"');"
353                cursor.execute(getid_sql)
354                id = commavaluesfromsublist(cursor.fetchall())
355                return int(id)
356            else:
357                return -1
358        except:
359            print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)
360        connection.commit()
361   
362    def insert_temporal_coverage(Mid,startdate,enddate):
363        '''Attempts to insert temporal coverage (only) into time table. If successful, returns timeid to be used in item table, -1 otherwise'''
364        returnid = -1
365        if (startdate=='nostartdate' and enddate=='noenddate'):
366            # Skip if both are missing, although it's OK if only 1 is given
367            return -1
368        else:
369            if (startdate=='nostartdate'): startdate='NULL'
370            if (enddate=='noenddate'): enddate='NULL'
371            insert_sql =  "INSERT INTO "+time_table+" (start_time, end_time) VALUES('"+startdate+"', '"+enddate+"');"
372            print insert_sql
373            cursor = connection.cursor()
374            try:
375                    cursor.execute(insert_sql)
376                    if (cursor.rowcount > 0):
377                            getid_sql="select currval('"+time_id_seq+"');"
378                            cursor.execute(getid_sql)
379                            id = commavaluesfromsublist(cursor.fetchall())
380                            return int(id)
381                    else:
382                            return -1
383            except:
384                    print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)
385            connection.commit()
Note: See TracBrowser for help on using the repository browser.