Changeset 3853


Ignore:
Timestamp:
07/05/08 10:20:37 (11 years ago)
Author:
cbyrom
Message:

Update python scripts to call the new stored procedures, to properly
handle SCN (system change number) values in the DB, to provide status
on the update/create of records - to allow 'number of ingested records'
result to be displayed at end + include method to delete existing
spatiotemporal data when doing updates + add additional logging.

Location:
TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py

    r3846 r3853  
    66import sys, os, logging 
    77import db_funcs 
     8from SpatioTemporalData import * 
    89 
    910class PostgresDAO: 
     
    6263        ''' 
    6364        Looks up a record in the DB; if it finds it, update it, otherwise create it 
     65        @return result: True if record created/updated, false if otherwise 
    6466        ''' 
    6567        self.getRecordID() 
    6668        if self._record.db_id: 
    67             self.updateRecord() 
    68         else: 
    69             self.createRecord() 
     69            return self.updateRecord() 
     70        else: 
     71            return self.createRecord() 
     72             
    7073 
    7174             
     
    7376        ''' 
    7477        Create a record in the postgres DB based on the specified PostgresRecord object 
     78        @return result: True if record created, false if otherwise 
    7579        ''' 
    7680        logging.info("Creating a new record in the DB for the metada document") 
     
    8387        self._insertSpatioTemporalData() 
    8488        logging.info("New record created") 
     89        return True 
    8590             
    8691 
     
    8893        ''' 
    8994        Update a record in the postgres DB based on the specified PostgresRecord object 
     95        @return result: True if record updated, false if otherwise 
    9096        ''' 
    9197        logging.info("Record already existing in DB - performing updates") 
     98        result = False 
    9299        # firstly, check the document is actually new - i.e. not just a repeat of the same data 
    93100        if self._checkIsUpdatedRecord(): 
     
    97104            self._updateMetadataRecords() 
    98105 
    99             # Now update the spatiotemporal data 
    100             self._updateSpatioTemporalData() 
     106            # If doing an update of an existing record, clear out existing spatiotemporal 
     107            # data, rather than updating it, to keep things simple 
     108            logging.info("Clearing out existing data for record - to allow clean update") 
     109            self._deleteSpatioTemporalData() 
     110            self._insertSpatioTemporalData() 
     111            result = True 
    101112 
    102113        logging.info("Finish processing document...") 
     114        return result 
    103115         
    104116         
     
    111123        logging.info("Checking the updated document actually contains changes") 
    112124 
    113         sql = "SELECT harvest_count FROM ORIGINAL_DOCUMENT where original_document_id = " + \ 
     125        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \ 
    114126            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';" 
    115         count = db_funcs.runSQLCommand(self._connection, sql) 
     127        results = db_funcs.runSQLCommand(self._connection, sql) 
    116128 
    117129        # NB, if the document is not identical, the sql command will not find anything 
    118         if not count: 
     130        if not results: 
    119131            logging.info("Ingested document is different to that in the current DB") 
    120132            return True 
    121133             
    122         count = count[0][0] 
     134        count = results[0][0] 
     135 
     136        # get current system change number 
     137        scn = results[0][1] 
     138        self._record.scn = scn 
    123139        logging.info("Ingested document is identical to document currently in DB - " + \ 
    124140                     "incrementing harvest_count") 
     
    130146 
    131147 
     148    def _deleteSpatioTemporalData(self): 
     149        ''' 
     150        Delete all existing spatiotemporal data for the current record 
     151        - NB, the delete is set to cascade from the linking table so only need to 
     152        delete entries from there 
     153        ''' 
     154        logging.info("Deleting existing spatiotemporal data for record") 
     155        sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_record_id = " + \ 
     156            str(self._record.db_id) + ";"       
     157 
     158        db_funcs.runSQLCommand(self._connection, sqlCmd) 
     159        logging.info("Spatiotemporal data deleted successfully") 
     160 
     161 
     162    def _insertSpatioTemporalRow(self, coords, timeRange): 
     163        ''' 
     164        Create a single row record in the postgres DB based on the 
     165        input data 
     166        @param coords: a Coords object representing NSWE coords 
     167        @param timeRange: a TimeRange object representing a start/end date  
     168        ''' 
     169        logging.info("Adding spatiotemporal row to DB") 
     170        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \ 
     171            str(coords.north) + "', '" + str(coords.south) + "', '" + str(coords.west) + "', '" + \ 
     172            str(coords.east) + "', " 
     173         
     174        # cope with null dates appropriately 
     175        if timeRange.start == "null": 
     176            sqlCmd += timeRange.start + ", " 
     177        else: 
     178            sqlCmd += "'" + timeRange.start + "', " 
     179             
     180        if timeRange.end == "null": 
     181            sqlCmd += timeRange.end 
     182        else: 
     183            sqlCmd += "'" + timeRange.end + "'" 
     184         
     185        sqlCmd += ");"       
     186 
     187        db_funcs.runSQLCommand(self._connection, sqlCmd) 
     188        logging.info("Spatiotemporal row added successfully") 
     189         
    132190     
    133191    def _insertSpatioTemporalData(self): 
     
    137195        ''' 
    138196        logging.info("Adding spatiotemporal data to DB record") 
    139         # NB, build the query up in two parts so that we only include the correct data 
    140         sqlStart = "" 
    141         insert_sql="INSERT INTO "+location_table+" (geom) VALUES ( setsrid('BOX3D("+west+" "+south+","+east+" "+north+")'::box3d,4326) );" 
    142         sqlEnd = " VALUES (DEFAULT, '" + str(self._record.db_id)  
    143         if (not self._record.hasNullCoords()): 
    144             sqlStart += "coordinates" 
    145             sqlEnd += "', sbox'((" + self._record.west + \ 
    146                 "d , " + self._record.south + "d), (" + self._record.east +"d , " + \ 
    147                 self._record.north + "d))' " 
    148          
    149         if str(self._record.startdate)!='nostartdate': 
    150             if len(sqlStart) > 0: 
    151                 sqlStart += ", " 
    152                 sqlEnd += ", " 
    153             sqlStart += "startdate" 
    154             sqlEnd += "'" + self._record.startdate + "'" 
    155              
    156         if str(self._record.enddate)!='noenddate': 
    157             if len(sqlStart) > 0: 
    158                 sqlStart += ", " 
    159                 sqlEnd += ", " 
    160             sqlStart += ", enddate" 
    161             sqlEnd += ", '" + self._record.enddate + "'" 
    162  
    163         if len(sqlStart) > 0: 
    164             sqlStart = ", " + sqlStart 
    165             sqlEnd = ", " + sqlEnd 
    166              
    167         sqlCmd = "INSERT INTO spatiotemp (id, original_doc_id" + sqlStart + ") " + sqlEnd + ");" 
     197         
     198        # Work out the relationship between the spatial and temporal data and handle appropriately 
     199        timeData = self._record.stData.getTemporalData() 
     200        spatialData = self._record.stData.getSpatialData() 
     201         
     202        # check if we have any spatiotemporal data to add; escape if not 
     203        if not timeData and not spatialData: 
     204            logging.info("No spatiotemporal data found for record - skipping") 
     205            return 
     206         
     207        # setup dummy, null data if required 
     208        if not timeData: 
     209            timeData = [ TimeRange('null', 'null') ] 
     210         
     211        if not spatialData: 
     212            spatialData = [ Coords('null', 'null', 'null', 'null') ] 
     213         
     214        # if both arrays of data are the same length, assume they map together 
     215        i = 0 
     216        if len(timeData) == len(spatialData): 
     217            logging.info("Spatial data is the same size as temporal data; assuming these map together one to one") 
     218            for timeRange in timeData: 
     219                self._insertSpatioTemporalRow(spatialData[i], timeRange) 
     220                i += 1 
     221 
     222        # otherwise, map everything to everything 
     223        else: 
     224            logging.info("Spatial and temporal data are of different sizes; map everything to everything") 
     225            for timeRange in timeData: 
     226                for coords in spatialData: 
     227                    self._insertSpatioTemporalRow(coords, timeRange) 
     228        logging.info("Spatiotemporal data added to DB record") 
     229             
     230     
     231    def _insertOriginalRecord(self): 
     232        ''' 
     233        Insert the original metadata doc into the postgres DB 
     234        ''' 
     235        logging.info("Inserting new original document in Postgres DB") 
     236        sqlCmd = "SELECT create_document('" + self._record.filename + "', '" + \ 
     237            self._record.discovery_id + "', '" + self._record.docType + "', '" + \ 
     238            self._record.originalFormat + "');"       
     239 
     240        id = db_funcs.runSQLCommand(self._connection, sqlCmd) 
     241        if len(id) == 0: 
     242            raise ValueError, 'No PK ID returned from INSERT to DB' 
     243         
     244        self._record.db_id = id[0][0]  
     245        logging.info("Original document inserted in Postgres DB") 
     246         
     247     
     248    def _updateOriginalRecord(self): 
     249        ''' 
     250        Update the original doc in the postgres DB 
     251        ''' 
     252        logging.info("Updating original document in Postgres DB") 
     253        sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \ 
     254            self._record.filename + "', '" + \ 
     255            self._record.discovery_id + "', '" + self._record.docType + "', '" + \ 
     256            self._record.originalFormat + "', '" + str(self._record.scn) + "');" 
     257 
     258        id = db_funcs.runSQLCommand(self._connection, sqlCmd) 
    168259        db_funcs.runSQLCommand(self._connection, sqlCmd) 
    169         logging.info("Spatiotemporal data added to DB record") 
    170              
    171      
    172     def _updateSpatioTemporalData(self): 
    173         ''' 
    174         Updates a record in the postgres DB based on the spatiotemporal data  
    175         specified in the PostgresRecord object 
    176         ''' 
    177         # NB, build the query up in two parts so that we only include the correct data 
    178         logging.info("Updating spatiotemporal data to DB record") 
    179         sqlStart = "" 
    180         sqlEnd = " WHERE original_doc_id='" + str(self._record.db_id) + "';"  
    181         if (not self._record.hasNullCoords()): 
    182             sqlStart += "coordinates = sbox'((" + self._record.west + "d , " + \ 
    183                 self._record.south + "d), (" + self._record.east + "d , " + \ 
    184                 self._record.north + "d))'" 
    185          
    186         if str(self._record.startdate)!='nostartdate': 
    187             if len(sqlStart) > 0: 
    188                 sqlStart += ", " 
    189             sqlStart += "startdate = '" + self._record.startdate + "'" 
    190              
    191         if str(self._record.enddate)!='noenddate': 
    192             if len(sqlStart) > 0: 
    193                 sqlStart += ", " 
    194             sqlStart += "enddate = '" + enddate + "'" 
    195  
    196         if len(sqlStart) > 0: 
    197             sqlStart += ", " 
    198         sqlCmd = "UPDATE spatiotemp SET (" + sqlStart + "update_time= now()) " + sqlEnd + ");" 
    199  
    200         db_funcs.runSQLCommand(self._connection, sqlCmd) 
    201         logging.info("Spatiotemporal data updated for DB record") 
    202      
    203              
    204     def _insertOriginalRecord(self): 
    205         ''' 
    206         Insert the original metadata doc into the postgres DB 
    207         ''' 
    208         logging.info("Inserting new original document in Postgres DB") 
    209         sqlCmd = "INSERT INTO ORIGINAL_DOCUMENT (original_document_id, original_document_filename, " + \ 
    210             "discovery_id, original_format, " + \ 
    211             "original_document, ts_vector, create_date, harvest_count, scn) VALUES (" + \ 
    212             "DEFAULT, '" + self._record.filename + "', '" + self._record.discovery_id + \ 
    213             "', '" + self._record.docType + "', '" + self._record.originalFormat + \ 
    214             "', to_tsvector('english', '" + self._record.originalFormat + "'), current_date, 1, 1);" 
    215              
    216         self._record.db_id = db_funcs.runSQLCommand(self._connection, sqlCmd) 
    217         logging.info("Original document inserted in Postgres DB") 
    218          
    219      
    220     def _updateOriginalRecord(self): 
    221         ''' 
    222         Update the original doc in the postgres DB 
    223         ''' 
    224         logging.info("Updating original document in Postgres DB") 
    225         sqlCmd = "UPDATE ORIGINAL_DOCUMENT SET (original_document_filename = '" + self._record.filename + \ 
    226             "', discovery_id = '" + self._record.discovery_id + "', " + \ 
    227             "original_format = '" + self._record.originalFormat + "', " + \ 
    228             "ts_vector = to_tsvector('english', '" + self._record.originalFormat + "'), " + \ 
    229             "update_date = current_date, harvest_count = 1)" + \ 
    230             " WHERE original_document_id = " + str(self._record.db_id) + ";" 
    231      
    232         db_funcs.runSQLCommand(self._connection, sqlCmd) 
     260         
     261        # increment scn - saves doing an additional db call 
     262        self._record.scn += 1 
    233263        logging.info("Original document updated in Postgres DB") 
    234264     
     
    248278                "transformed_document, create_date, scn) VALUES (" \ 
    249279                "DEFAULT, '" + str(self._record.db_id) + "', '" + \ 
    250                 docType + "', '" + doc + "'), current_date, 1);" 
     280                docType + "', '" + doc + "', current_timestamp, 1);" 
    251281             
    252282            db_funcs.runSQLCommand(self._connection, sqlCmd) 
     
    265295         
    266296        for docType, doc in self._record.getAllDocs(): 
    267             sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET (transformed_document = '" + doc + \ 
    268                 "', update_date = current_date) WHERE original_record_id = " + \ 
     297            sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \ 
     298                "', update_date = current_timestamp WHERE original_record_id = " + \ 
    269299                str(self._record.db_id) + " AND transformed_format = '" + docType + "';" 
    270300 
     
    281311        self._record = record 
    282312 
    283  
    284      
    285     def link_spatial_and_temporal(Mid,west,south,east,north,startdate,enddate): 
    286         ''' 
    287         Creates an entry in the item/time/location table linking an item, time and location. 
    288         ''' 
    289         locationid = insert_spatial_coverage(Mid,west,south,east,north) 
    290         timeid = insert_temporal_coverage(Mid,startdate,enddate) 
    291         if (locationid > -1): 
    292             locationidstr = "%s" % locationid 
    293         else: 
    294             locationidstr = "NULL" 
    295      
    296         if (timeid > -1): 
    297             timeidstr = "%s" % timeid 
    298         else: 
    299             timeidstr = "NULL" 
    300      
    301         itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, locationid, timeid) values ('"+Mid+"', "+locationidstr+", "+timeidstr+" );" 
    302      
    303         #print "ItemTimeLocation:\t"+itemtimelocation_sql 
    304         cursor = connection.cursor() 
    305         try: 
    306             cursor.execute(itemtimelocation_sql) 
    307         except: 
    308             print "Error: database error %s %s" %(sys.exc_type, sys.exc_value) 
    309         connection.commit() 
    310      
    311     def link_spatial(Mid,west,south,east,north): 
    312             '''Creates an entry in the item/time/location table linking an item and location only''' 
    313             locationid = insert_spatial_coverage(Mid,west,south,east,north) 
    314             if (locationid > -1): 
    315                     locationidstr = "%s" % locationid 
    316             else: 
    317                     locationidstr = "NULL" 
    318      
    319             itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, locationid) values ('"+Mid+"', "+locationidstr+" );" 
    320      
    321             #print "ItemTimeLocation:\t"+itemtimelocation_sql 
    322             cursor = connection.cursor() 
    323             try: 
    324                 cursor.execute(itemtimelocation_sql) 
    325             except: 
    326                 print "Error: database error %s %s" %(sys.exc_type, sys.exc_value) 
    327             connection.commit() 
    328      
    329     def link_temporal(Mid,startdate,enddate): 
    330             '''Creates an entry in the item/time/location table linking an item and time only''' 
    331             timeid = insert_temporal_coverage(Mid,startdate,enddate) 
    332             if (timeid > -1): 
    333                     timeidstr = "%s" % timeid 
    334             else: 
    335                     timeidstr = "NULL" 
    336      
    337             itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, timeid) values ('"+Mid+"', "+timeidstr+" );" 
    338      
    339             #print "ItemTimeLocation:\t"+itemtimelocation_sql 
    340             cursor = connection.cursor() 
    341             try: 
    342                 cursor.execute(itemtimelocation_sql) 
    343             except: 
    344                 print "Error: database error %s %s" %(sys.exc_type, sys.exc_value) 
    345             connection.commit() 
    346      
    347      
    348     def insert_spatial_coverage(Mid,west,south,east,north): 
    349         ''' 
    350         Attempts to insert spatial coverage into spatial table.  
    351         If successful, returns newly created location id, -1 otherwise 
    352         ''' 
    353         returnid = -1 
    354         if str(west)!='null' and str(south)!='null' and str(east)!='null' and str(north)!='null': 
    355             insert_sql="INSERT INTO "+location_table+" (geom) VALUES ( setsrid('BOX3D("+west+" "+south+","+east+" "+north+")'::box3d,4326) );" 
    356             print insert_sql 
    357         cursor = connection.cursor() 
    358         try: 
    359             cursor.execute(insert_sql) 
    360             if (cursor.rowcount > 0): 
    361                 getid_sql="select currval('"+location_id_seq+"');" 
    362                 cursor.execute(getid_sql) 
    363                 id = commavaluesfromsublist(cursor.fetchall()) 
    364                 return int(id) 
    365             else: 
    366                 return -1 
    367         except: 
    368             print "Error: database error %s %s" %(sys.exc_type, sys.exc_value) 
    369         connection.commit() 
    370  
    371      
    372     def insert_temporal_coverage(Mid,startdate,enddate): 
    373         '''Attempts to insert temporal coverage (only) into time table. If successful, returns timeid to be used in item table, -1 otherwise''' 
    374         returnid = -1 
    375         if (startdate=='nostartdate' and enddate=='noenddate'): 
    376             # Skip if both are missing, although it's OK if only 1 is given 
    377             return -1 
    378         else: 
    379             if (startdate=='nostartdate'): startdate='NULL' 
    380             if (enddate=='noenddate'): enddate='NULL' 
    381             insert_sql =  "INSERT INTO "+time_table+" (start_time, end_time) VALUES('"+startdate+"', '"+enddate+"');" 
    382             print insert_sql 
    383             cursor = connection.cursor() 
    384             try: 
    385                     cursor.execute(insert_sql) 
    386                     if (cursor.rowcount > 0): 
    387                             getid_sql="select currval('"+time_id_seq+"');" 
    388                             cursor.execute(getid_sql) 
    389                             id = commavaluesfromsublist(cursor.fetchall()) 
    390                             return int(id) 
    391                     else: 
    392                             return -1 
    393             except: 
    394                     print "Error: database error %s %s" %(sys.exc_type, sys.exc_value) 
    395             connection.commit() 
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresRecord.py

    r3847 r3853  
    7070        self.mdipFormat = None 
    7171        self.iso19139Format = None 
     72        self.scn = 1    # system change number - keeps track of number of mods to a particular row 
    7273         
    7374        # do some initial setting up of record 
     
    320321                enddate= date.DateRangeEnd 
    321322                if startdate==None or startdate=='None': 
    322                     startdate="nostartdate" 
     323                    startdate="null" 
    323324                if enddate==None or enddate=='None': 
    324                     enddate="noenddate" 
     325                    enddate="null" 
    325326                     
    326327                self.stData.addTimeRange(startdate, enddate) 
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py

    r3845 r3853  
    9191                 
    9292                # Finally, write the new record 
    93                 dao.createOrUpdateRecord() 
     93                if dao.createOrUpdateRecord(): 
     94                        self._no_files_ingested += 1 
     95                         
    9496         
    9597         
     
    165167                dbinfo_file=open('ingest.config', "r") 
    166168                dbinfo = dbinfo_file.read().split() 
    167                 if len(dbinfo) != 4: 
     169                if len(dbinfo) < 4: 
    168170                        raise ValueError, 'Incorrect data in config file' 
    169171                 
    170                 self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3]) 
     172                # if port specified in file, use this, otherwise use default 
     173                if len(dbinfo) > 4: 
     174                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4]) 
     175                else: 
     176                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3]) 
    171177                logging.info("Postgres DB connection now set up") 
    172178 
     
    217223                status = 0 
    218224                numfilesproc = 0 
     225                self._no_files_ingested = 0 
    219226                self._base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from 
    220227                         
     
    320327                 
    321328                print self.lineSeparator 
    322                 print "INFO: No. of files pre-processed = %s" %numfilesproc 
     329                print "INFO: Number of files processed = %s" %numfilesproc 
     330                print "INFO: Number of files ingested = %s" %self._no_files_ingested 
    323331                if status == 0: 
    324332                    print "INFO: Procedure oai_ingest.py completed" 
Note: See TracChangeset for help on using the changeset viewer.