source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 3814

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@3814
Revision 3814, 10.4 KB checked in by cbyrom, 12 years ago (diff)

Fill out functionality for DAO - including check for uniqueness of ingested records
+ improved logging + fixes to the SQL to match current data model.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging
7import molesReadWrite as MRW
8import db_funcs
9
10class PostgresDAO:
11   
12    def __init__(self, record):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        '''
17        if record == "":
18            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
19        else:
20            print "INFO: Creating/updating DB entry for record, %s" %record.id
21
22        # setup a connection to the db
23        self._connection = db_connect()
24        self._record = record
25        self.id = None
26       
27           
28    def getRecordID(self):
29        '''
30        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
31        returns '-1'
32        @return: id of record, if it exists, '-1' if it doesn't
33        '''
34        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
35        if self.id is not None & self.id > 0:
36            logging.info("Already looked up record - ID is " + self.id)
37            return self.id
38       
39        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + self._record.discovery_id + "';"
40        self.id = db_funcs.runSQLCommand(self._connection, sql)
41        print "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXKKKK", self.id
42       
43
44    def createOrUpdateRecord(self):
45        '''
46        Looks up a record in the DB; if it finds it, update it, otherwise create it
47        '''
48        if getRecordID() > 0:
49            createRecord()
50        else:
51            updateRecord()
52
53           
54    def createRecord(self):
55        '''
56        Create a record in the postgres DB based on the specified PostgresRecord object
57        '''
58        logging.info("Creating a new record in the DB for the metada document")
59        # firstly store the actual documents contained by the record - i.e. the original doc +
60        # the various transforms required
61        _insertOriginalRecord()
62        _insertMetadataRecords()
63       
64        # Now add the spatiotemporal data
65        _insertSpatioTemporalData()
66        logging.info("New record created")
67           
68
69    def updateRecord(self):
70        '''
71        Update a record in the postgres DB based on the specified PostgresRecord object
72        '''
73        logging.info("Record already existing in DB - performing updates")
74        # firstly, check the document is actually new - i.e. not just a repeat of the same data
75        if _checkIsUpdatedRecord():
76       
77            # firstly update the actual documents contained by the record - i.e. the original doc +
78            # the various transforms required
79            _updateMetadataRecords(record)
80           
81            # Now update the spatiotemporal data
82            _updateSpatioTemporalData(record)
83       
84       
85    def _checkIsUpdatedRecord(self):
86        '''
87        Looks up an existing record and checks it is not identical to the new record; if it is
88        incremement the harvest_count value and don't process again
89        @return: True if doc contains changes, False otherwise
90        '''
91        logging.info("Checking the updated document actually contains changes")
92        sql = "SELECT harvest_count FROM ORIGINAL_DOCUMENT where original_document_id = " + self.id + \
93            " AND original_document = " + self._record.originalFormat + "';"
94        count = db_funcs.runSQLCommand(self._connection, sql)
95       
96        # NB, if the document is not identical, the sql command will not find anything
97        if count > 0:
98            logging.info("Ingested document is identical to document currently in DB - incrementing harvest_count")
99            count += 1
100            sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + count + " WHERE original_document_id = " + self.id
101            count = db_funcs.runSQLCommand(self._connection, sql)
102            return False
103
104        logging.info("Ingested document is different to that in the current DB")
105        return True
106
107   
108    def _insertSpatioTemporalData():
109        '''
110        Create a record in the postgres DB based on the spatiotemporal data
111        specified in the PostgresRecord object
112        '''
113        logging.info("Adding spatiotemporal data to DB record")
114        # NB, build the query up in two parts so that we only include the correct data
115        sqlStart = ""
116        sqlEnd = " VALUES (DEFAULT, '" + self._record.id
117        if (not self._record.hasNullCoords()):
118            sqlStart += "coordinates"
119            sqlEnd += "', sbox'((" + self._record.west + \
120                "d , " + self._record.south + "d), (" + self._record.east +"d , " + \
121                self._record.north + "d))' "
122       
123        if str(self._record.startdate)!='nostartdate':
124            if len(sqlStart) > 0:
125                sqlStart += ", "
126                sqlEnd += ", "
127            sqlStart += "startdate"
128            sqlEnd += "'" + self._record.startdate + "'"
129           
130        if str(self._record.enddate)!='noenddate':
131            if len(sqlStart) > 0:
132                sqlStart += ", "
133                sqlEnd += ", "
134            sqlStart += ", enddate"
135            sqlEnd += ", '" + self._record.enddate + "'"
136
137        if len(sqlStart) > 0:
138            sqlStart = ", " + sqlStart
139            sqlEnd = ", " + sqlEnd
140        sqlCmd = "INSERT INTO spatiotemp (id, original_doc_id" + sqlStart + ") " + sqlEnd + ");"
141        db_funcs.runSQLCommand(self._connection, sqlCmd)
142        logging.info("Spatiotemporal data added to DB record")
143           
144   
145    def _updateSpatioTemporalData():
146        '''
147        Updates a record in the postgres DB based on the spatiotemporal data
148        specified in the PostgresRecord object
149        '''
150        # NB, build the query up in two parts so that we only include the correct data
151        logging.info("Updating spatiotemporal data to DB record")
152        sqlStart = ""
153        sqlEnd = " WHERE original_doc_id='" + self._record.id + "';" 
154        if (not self._record.hasNullCoords()):
155            sqlStart += "coordinates = sbox'((" + self._record.west + "d , " + \
156                self._record.south + "d), (" + self._record.east + "d , " + \
157                self._record.north + "d))'"
158       
159        if str(self._record.startdate)!='nostartdate':
160            if len(sqlStart) > 0:
161                sqlStart += ", "
162            sqlStart += "startdate = '" + self._record.startdate + "'"
163           
164        if str(self._record.enddate)!='noenddate':
165            if len(sqlStart) > 0:
166                sqlStart += ", "
167            sqlStart += "enddate = '" + enddate + "'"
168
169        if len(sqlStart) > 0:
170            sqlStart += ", "
171        sqlCmd = "UPDATE spatiotemp SET (" + sqlStart + "update_time= now()) " + sqlEnd + ");"
172
173        db_funcs.runSQLCommand(self._connection, sqlCmd)
174        logging.info("Spatiotemporal data updated for DB record")
175   
176           
177    def _insertOriginalRecord():
178        '''
179        Insert the original metadata doc into the postgres DB
180        '''
181        logging.info("Inserting new original document in Postgres DB")
182        sqlCmd = "INSERT INTO ORIGINAL_DOCUMENT (original_document_id, original_document_name, original_format, " \
183            "original_document, ts_vector, create_date, harvest_count, scn) VALUES (" \
184            "DEFAULT, '" + self._record.filename + "', '" + self._record.docType + "', '" + self._record.originalFormat + \
185            "', to_tsvector('english', " + self._record.originalFormat + "), current_date, 1, 1);"
186           
187        id = db_funcs.runSQLCommand(self._connection, sqlCmd)
188        self.id = id
189        self._record.db_id = id
190        logging.info("Original document inserted in Postgres DB")
191       
192   
193    def _updateOriginalRecord():
194        '''
195        Update the original doc in the postgres DB
196        '''
197        logging.info("Updating original document in Postgres DB")
198        sqlCmd = "UPDATE ORIGINAL_DOCUMENT SET (original_document_name = '" + self._record.filename + "', " \
199            "original_format = '" + self._record.originalFormat + "', " \
200            "ts_vector = to_tsvector('english', " + self._record.originalFormat + "), " \
201            "update_date = current_date, " \
202            "harvest_count = 1)" \
203            " WHERE original_document_id = " + self._record.db_id + ";"
204   
205        db_funcs.runSQLCommand(self._connection, sqlCmd)
206        logging.info("Original document updated in Postgres DB")
207   
208           
209    def _insertMetadataRecords():
210        '''
211        Insert the metadata docs into the postgres DB
212        '''
213        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.filename)
214        if self._record.db_id is None:
215            print "No DB ID for the original record exists; cannot add associated transformed docs"
216            return
217       
218        for docType, doc in record.getAllDocs():
219            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
220                "original_record_id, transformed_format, " \
221                "transformed_document, create_date, scn) VALUES (" \
222                "DEFAULT, '" + self._record.db_id + "', '" + \
223                docType + "', '" + doc + "'), current_date, 1);"
224           
225            db_funcs.runSQLCommand(self._connection, sqlCmd)
226       
227        logging.info("Transformed records created in DB")
228   
229   
230    def _updateMetadataRecords():
231        '''
232        Update the metadata docs into the postgres DB
233        '''
234        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.filename)
235        if self._record.db_id is None:
236            print "No DB ID for the original record exists; cannot update associated transformed docs"
237            return
238       
239        for docType, doc in record.getAllDocs():
240            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET (transformed_document = '" + doc + \
241                "', update_date = current_date) WHERE original_record_id = " + \
242                self._record.db_id + " AND transformed_format = '" + docType + "';"
243
244            db_funcs.runSQLCommand(self._connection, sqlCmd)
245   
246        logging.info("Transformed records updated in DB")
247
248
249    def setRecord(self, record):
250        '''
251        Set the record to use with the DAL - to allow object re-use
252        @param record: PostgresRecord to use with the DAL
253        '''
254        self._record = record
Note: See TracBrowser for help on using the repository browser.