source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py @ 6544

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py@6544
Revision 6544, 17.5 KB checked in by sdonegan, 11 years ago (diff)

Updated version of MEDIN ingest - will ingest NEODC DIFs, convert to ISO then load ISO into DB - all original NDG3 functionality inc configurable ingest now included.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3Class representing the data access object wrapping the Postgres DB
4C Byrom Apr 08
5'''
6import sys, os, logging, datetime
7from SpatioTemporalData import *
8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
9
10class PostgresDAO:
11   
12    def __init__(self, record, isoDataModel,transformationDir, pgClient = None):
13        '''
14        Constructor - to initialise the DAL and do some initial setting up
15        @param record: the PostgresRecord object to add or update in the DB
16        @keyword pgClient: a postgresClient object to use to connect to postgres
17        - NB, if not set, will try to set one up by looking for a pg.config file
18        locally
19        '''
20        if record == "":
21            sys.exit("USAGE: argument 1 = PostgresRecord object to process")
22        else:
23            logging.info("INFO: Creating/updating DB entry for record, %s" %record.discovery_id)
24
25        # setup a connection to the db - if none specified
26        if pgClient is None:
27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE)
28        else:
29            self.pgc = pgClient
30
31        self._record = record
32       
33        self.isoDataModel = isoDataModel       
34        self.discovery_dir = transformationDir
35       
36       
37       
38
39    def getRecordID(self):
40        '''
41        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
42        returns '-1'
43        @return: id of record, if it exists, '-1' if it doesn't
44        '''
45        logging.info("Looking up record, " + self._record.discovery_id + " in DB")
46        if self._record.db_id is not None and self._record.db_id > 0:
47            logging.info("Already looked up record - ID is " + str(self._record.db_id))
48            return self._record.db_id
49       
50        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \
51            self._record.discovery_id + "';"
52        dbId = self.pgc.runSQLCommand(sql)
53        if dbId:
54            self._record.db_id = dbId[0][0]
55           
56    def getRecordID_using_OriginalDocumentFilename(self):
57        '''
58        Looks up a record in the DB and returns its DB ID, if it exists, otherwise
59        returns '-1'
60        @return: id of record, if it exists, '-1' if it doesn't
61        '''
62        logging.info("Looking up original_document_id for filename: " + self._record.filename + " in DB")
63       
64        '''if self._record.db_id is not None and self._record.db_id > 0:
65            logging.info("Already looked up record - ID is " + str(self._record.db_id))
66            return self._record.db_id'''
67       
68        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
69       
70        dbId = self.pgc.runSQLCommand(sql)
71       
72        if dbId:
73            self._record.db_id = dbId[0][0]
74           
75           
76    def getDiscoveryID_using_OriginalDocumentFilename(self):
77        '''
78        Looks up a record in the DB and returns its discovery ID, if it exists, otherwise
79        returns '-1'
80        @return: id of record, if it exists, '-1' if it doesn't
81        '''
82        logging.info("Looking up discovery_of for filename: " + self._record.filename + " in DB")
83       
84        '''if self._record.db_id is not None and self._record.db_id > 0:
85            logging.info("Already looked up record - ID is " + str(self._record.db_id))
86            return self._record.db_id'''
87       
88        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
89       
90        dbId = self.pgc.runSQLCommand(sql)
91       
92        if dbId:
93            self._record.discovery_id = dbId[0][0]
94           
95       
96    def getTemporalDataId(self):
97       
98        '''
99        Looks up the temporal data id using the original document id
100        '''
101       
102        logging.info("Looking up temporal_data_id for filename: " + self._record.filename + " in DB")
103       
104        sql = "SELECT discovery_id FROM ORIGINAL_DOCUMENT where original_document_filename = '" + self._record.filename + "';"
105
106       
107       
108
109    def createOrUpdateRecord(self):
110        '''
111        Looks up a record in the DB; if it finds it, update it, otherwise create it
112        @return result: True if record created/updated, false if otherwise
113        '''
114       
115        self.getRecordID()
116        returnCode=0 # 0 if failed, 1 if update, 2 if create
117         
118        if self._record.db_id:               
119            if self.updateRecord():             
120                returnCode = 1
121        else:
122                #create the record!         
123            if self.createRecord():
124                returnCode = 2
125               
126        return returnCode
127           
128
129           
130    def createRecord(self):
131        '''
132        Create a record in the postgres DB based on the specified PostgresRecord object
133        @return result: True if record created, false if otherwise
134        '''
135        logging.info("Creating a new record in the DB for the metada document")
136        # firstly store the actual documents contained by the record - i.e. the original doc +
137        # the various transforms required
138        self._insertOriginalRecord()
139        self._insertMetadataRecords()
140       
141        # Now add the spatiotemporal data
142        self._insertSpatioTemporalData()
143        logging.info("New record created")
144        return True
145           
146
147    def updateRecord(self):
148        '''
149        Update a record in the postgres DB based on the specified PostgresRecord object
150        @return result: True if record updated, false if otherwise
151        '''
152        logging.info("Record already existing in DB - performing updates")
153        result = False
154       
155        # firstly, check the document is actually new - i.e. not just a repeat of the same data
156        if self._checkIsUpdatedRecord():
157           
158            # firstly, update the original record
159            self._updateOriginalRecord()
160           
161            # now update the actual documents contained by the record - i.e. the original doc +
162            # the various transforms required
163            self._updateMetadataRecords()
164           
165            # If doing an update of an existing record, clear out existing spatiotemporal
166            # data, rather than updating it, to keep things simple
167            logging.info("Clearing out existing data for record - to allow clean update")
168           
169            self._deleteSpatioTemporalData()
170            self._insertSpatioTemporalData()
171            result = True
172
173        logging.info("Finish processing document...")
174       
175        return result
176       
177       
178       
179    def _checkIsUpdatedRecord(self):
180        '''
181        Looks up an existing record and checks it is not identical to the new record; if it is
182        incremement the harvest_count value and don't process again
183        @return: True if doc contains changes, False otherwise
184        '''
185        logging.info("Checking the updated document actually contains changes")
186
187        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
188            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';"
189        results = self.pgc.runSQLCommand(sql)
190
191        # NB, if the document is not identical, the sql command will not find anything
192        if not results:
193            logging.info("Ingested document is different to that in the current DB")
194            # get the current SCN
195            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \
196                    str(self._record.db_id) + ";"
197            results = self.pgc.runSQLCommand(sql)
198            self._record.scn = results[0][0]
199            return True
200           
201        count = results[0][0]
202
203        # get current system change number
204        scn = results[0][1]
205        self._record.scn = scn
206        logging.info("Ingested document is identical to document currently in DB - " + \
207                     "incrementing harvest_count")
208        count += 1
209        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \
210            " WHERE original_document_id = " + str(self._record.db_id)
211        self.pgc.runSQLCommand(sql)
212        return False
213
214
215    def _deleteSpatioTemporalData(self):
216        '''
217        Delete all existing spatiotemporal data for the current record
218        - NB, the delete is set to cascade from the linking table so only need to
219        delete entries from there
220        '''
221        logging.info("Deleting existing spatiotemporal data for record")
222        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_document_id = " + \
223            str(self._record.db_id) + ";"     
224
225        self.pgc.runSQLCommand(sqlCmd)
226        logging.info("Spatiotemporal data deleted successfully")
227       
228
229
230    def _insertSpatioTemporalRow(self, coords, timeRange):
231        '''
232        Create a single row record in the postgres DB based on the
233        input data.  Updated to operate with dictionary methods for coords and timeRange as part of MEDIN ISO upgrade
234        @param coords: a Coords object representing NSWE coords
235        @param timeRange: a TimeRange object representing a start/end date
236        '''
237        logging.info("Adding spatiotemporal row to DB")
238               
239        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \
240            str(coords['north']) + "', '" + str(coords['south']) + "', '" + str(coords['west']) + "', '" + \
241            str(coords['east']) + "', '" + timeRange['start'] + "', '" + timeRange['end'] + "');"
242           
243        # fix any null strings
244        sqlCmd = sqlCmd.replace("'null'", "null")
245       
246       
247
248        self.pgc.runSQLCommand(sqlCmd)
249        logging.info("Spatiotemporal row added successfully")
250       
251       
252   
253    def _insertSpatioTemporalData(self):
254        '''
255        Create a record in the postgres DB based on the spatiotemporal data
256        specified in the PostgresRecord object
257       
258        Note updated to work with ISO metadata object where coords and times already defined as individual items.
259       
260        '''
261        logging.info("Adding spatiotemporal data to DB record")
262       
263        # Work out the relationship between the spatial and temporal data and handle appropriately
264       
265       
266        TimeRange = self._record.parseTemporalInfo(self._record.datasetTemporalData)
267        SpatialCoords = self._record.parseSpatialInfo(self._record.datasetSpatialData)
268       
269               
270        if (len(TimeRange) == 0) and (len(SpatialCoords) == 0):
271                logging.info("No spatiotemporal data found for record - skipping")
272                return
273       
274       
275        # if both arrays of data are the same length, assume they map together       
276        i = 0
277        if len(TimeRange) == len(SpatialCoords):
278            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one")
279            for times in TimeRange:
280                self._insertSpatioTemporalRow(SpatialCoords[i], times)
281                i += 1
282
283        # otherwise, map everything to everything
284        else:
285            logging.info("Spatial and temporal data are of different sizes; map everything to everything")
286            for times in TimeRange:
287                for coords in SpatialCoords:
288                   
289                    self._insertSpatioTemporalRow(coords, times)
290                   
291        logging.info("Spatiotemporal data added to DB record")
292       
293       
294   
295    def _insertOriginalRecord(self):
296        '''
297        Insert the original metadata doc into the postgres DB
298        '''
299        logging.info("Inserting new original document in Postgres DB")
300       
301       
302        ''' ndg3 style command
303        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
304            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
305            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
306            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
307            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
308                   
309        '''
310       
311       
312        sqlCmd = "SELECT create_document('" + self._record.shortFilename + "', '" + \
313            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
314            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
315            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + \
316            self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + \
317            self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');"
318       
319       
320        #sort out any nulls..
321        sqlCmd = sqlCmd.replace("'NULL'","NULL")
322       
323        #sort out any Nones
324        sqlCmd = sqlCmd.replace("'None'","NULL")
325       
326        id = self.pgc.runSQLCommand(sqlCmd)
327        if len(id) == 0:
328            raise ValueError, 'No PK ID returned from INSERT to DB'
329       
330        self._record.db_id = id[0][0] 
331       
332        logging.info("Original document inserted in Postgres DB")
333           
334   
335    def deleteOriginalRecord(self):
336        '''
337        Delete the original metadata doc from the postgres DB
338        '''
339        logging.info("Deleting original document from Postgres DB")
340        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');" 
341
342        self.pgc.runSQLCommand(sqlCmd)
343        logging.info("Original document deleted from Postgres DB")
344       
345   
346    def _updateOriginalRecord(self):
347        '''
348        Update the original doc in the postgres DB
349        '''
350        logging.info("Updating original document in Postgres DB")
351        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \
352            self._record.shortFilename + "', '" + \
353            self._record.discovery_id + "', '" + self._record.docType + "', '" + \
354            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \
355            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + "', '" + self._record.dataset_name + "', '" + self._record.datacentre_name + "', '" + self._record.dataset_lastEdit + "', '" + self._record.datasetStartNom + "', '" + self._record.datasetEndNom + "');" 
356           
357       
358        #sort out any NULL values"
359        sqlCmd = sqlCmd.replace("'NULL'","NULL")
360       
361        #sort out any Nones
362        sqlCmd = sqlCmd.replace("'None'","NULL")       
363       
364        logging.info("Submitting SQL command")
365        #logging.info("SQl command:   " + sqlCmd)         
366       
367        self.pgc.runSQLCommand(sqlCmd)
368       
369        # increment scn - saves doing an additional db call
370        self._record.scn += 1
371       
372        logging.info("Original document updated in Postgres DB")
373   
374           
375    def _insertMetadataRecords(self):
376        '''
377        Insert the metadata docs into the postgres DB
378        '''
379        logging.info("Inserting transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
380       
381       
382        if self._record.db_id is None:
383            logging.info("No DB ID for the original record exists; cannot add associated transformed docs")
384            return
385       
386        for docType, doc in self._record.getAllDocs(self.discovery_dir):
387           
388            #if docType is original input xml then put that in rather than some lossy transformed guff.
389            if docType == self._record.docType:
390                doc = self._record.originalXMLdoc
391           
392                                 
393            sqlCmd = "INSERT INTO TRANSFORMED_DOCUMENT (transformed_document_id, " \
394                "original_document_id, transformed_format, " \
395                "transformed_document, create_date, scn) VALUES (" \
396                "DEFAULT, '" + str(self._record.db_id) + "', '" + \
397                docType + "', '" + doc + "', current_timestamp, 1);"
398           
399           
400            self.pgc.runSQLCommand(sqlCmd)
401       
402        logging.info("Transformed records created in DB")
403   
404   
405    def _updateMetadataRecords(self):
406        '''
407        Update the metadata docs into the postgres DB
408        '''
409       
410        logging.info("Updating transformed documents for original document, %s, in Postgres DB", self._record.shortFilename)
411        if self._record.db_id is None:
412            logging.info("No DB ID for the original record exists; cannot update associated transformed docs")
413            return
414       
415        for docType, doc in self._record.getAllDocs(self.discovery_dir):
416           
417            #if docType is original input xml then put that in rather than some lossy transformed guff.
418            if docType == self._record.docType:
419                doc = self.originalXMLdoc             
420           
421            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \
422                "', update_date = current_timestamp WHERE original_document_id = " + \
423                str(self._record.db_id) + " AND transformed_format = '" + docType + "';"
424               
425           
426            self.pgc.runSQLCommand(sqlCmd)
427   
428        logging.info("Transformed records updated in DB")
429
430
431    def setRecord(self, record):
432        '''
433        Set the record to use with the DAL - to allow object re-use
434        @param record: PostgresRecord to use with the DAL
435        '''
436        self._record = record
437
Note: See TracBrowser for help on using the repository browser.