Changeset 3853 for TI01-discovery
- Timestamp:
- 07/05/08 10:20:37 (13 years ago)
- Location:
- TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py
r3846 r3853 6 6 import sys, os, logging 7 7 import db_funcs 8 from SpatioTemporalData import * 8 9 9 10 class PostgresDAO: … … 62 63 ''' 63 64 Looks up a record in the DB; if it finds it, update it, otherwise create it 65 @return result: True if record created/updated, false if otherwise 64 66 ''' 65 67 self.getRecordID() 66 68 if self._record.db_id: 67 self.updateRecord() 68 else: 69 self.createRecord() 69 return self.updateRecord() 70 else: 71 return self.createRecord() 72 70 73 71 74 … … 73 76 ''' 74 77 Create a record in the postgres DB based on the specified PostgresRecord object 78 @return result: True if record created, false if otherwise 75 79 ''' 76 80 logging.info("Creating a new record in the DB for the metada document") … … 83 87 self._insertSpatioTemporalData() 84 88 logging.info("New record created") 89 return True 85 90 86 91 … … 88 93 ''' 89 94 Update a record in the postgres DB based on the specified PostgresRecord object 95 @return result: True if record updated, false if otherwise 90 96 ''' 91 97 logging.info("Record already existing in DB - performing updates") 98 result = False 92 99 # firstly, check the document is actually new - i.e. not just a repeat of the same data 93 100 if self._checkIsUpdatedRecord(): … … 97 104 self._updateMetadataRecords() 98 105 99 # Now update the spatiotemporal data 100 self._updateSpatioTemporalData() 106 # If doing an update of an existing record, clear out existing spatiotemporal 107 # data, rather than updating it, to keep things simple 108 logging.info("Clearing out existing data for record - to allow clean update") 109 self._deleteSpatioTemporalData() 110 self._insertSpatioTemporalData() 111 result = True 101 112 102 113 logging.info("Finish processing document...") 114 return result 103 115 104 116 … … 111 123 logging.info("Checking the updated document actually contains changes") 112 124 113 sql = "SELECT harvest_count FROM ORIGINAL_DOCUMENT where original_document_id = " + \125 sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \ 114 126 str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';" 115 count= db_funcs.runSQLCommand(self._connection, sql)127 results = db_funcs.runSQLCommand(self._connection, sql) 116 128 117 129 # NB, if the document is not identical, the sql command will not find anything 118 if not count:130 if not results: 119 131 logging.info("Ingested document is different to that in the current DB") 120 132 return True 121 133 122 count = count[0][0] 134 count = results[0][0] 135 136 # get current system change number 137 scn = results[0][1] 138 self._record.scn = scn 123 139 logging.info("Ingested document is identical to document currently in DB - " + \ 124 140 "incrementing harvest_count") … … 130 146 131 147 148 def _deleteSpatioTemporalData(self): 149 ''' 150 Delete all existing spatiotemporal data for the current record 151 - NB, the delete is set to cascade from the linking table so only need to 152 delete entries from there 153 ''' 154 logging.info("Deleting existing spatiotemporal data for record") 155 sqlCmd = "DELETE FROM SPATIAL_TEMPORAL_DATA WHERE original_record_id = " + \ 156 str(self._record.db_id) + ";" 157 158 db_funcs.runSQLCommand(self._connection, sqlCmd) 159 logging.info("Spatiotemporal data deleted successfully") 160 161 162 def _insertSpatioTemporalRow(self, coords, timeRange): 163 ''' 164 Create a single row record in the postgres DB based on the 165 input data 166 @param coords: a Coords object representing NSWE coords 167 @param timeRange: a TimeRange object representing a start/end date 168 ''' 169 logging.info("Adding spatiotemporal row to DB") 170 sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \ 171 str(coords.north) + "', '" + str(coords.south) + "', '" + str(coords.west) + "', '" + \ 172 str(coords.east) + "', " 173 174 # cope with null dates appropriately 175 if timeRange.start == "null": 176 sqlCmd += timeRange.start + ", " 177 else: 178 sqlCmd += "'" + timeRange.start + "', " 179 180 if timeRange.end == "null": 181 sqlCmd += timeRange.end 182 else: 183 sqlCmd += "'" + timeRange.end + "'" 184 185 sqlCmd += ");" 186 187 db_funcs.runSQLCommand(self._connection, sqlCmd) 188 logging.info("Spatiotemporal row added successfully") 189 132 190 133 191 def _insertSpatioTemporalData(self): … … 137 195 ''' 138 196 logging.info("Adding spatiotemporal data to DB record") 139 # NB, build the query up in two parts so that we only include the correct data 140 sqlStart = "" 141 insert_sql="INSERT INTO "+location_table+" (geom) VALUES ( setsrid('BOX3D("+west+" "+south+","+east+" "+north+")'::box3d,4326) );" 142 sqlEnd = " VALUES (DEFAULT, '" + str(self._record.db_id) 143 if (not self._record.hasNullCoords()): 144 sqlStart += "coordinates" 145 sqlEnd += "', sbox'((" + self._record.west + \ 146 "d , " + self._record.south + "d), (" + self._record.east +"d , " + \ 147 self._record.north + "d))' " 148 149 if str(self._record.startdate)!='nostartdate': 150 if len(sqlStart) > 0: 151 sqlStart += ", " 152 sqlEnd += ", " 153 sqlStart += "startdate" 154 sqlEnd += "'" + self._record.startdate + "'" 155 156 if str(self._record.enddate)!='noenddate': 157 if len(sqlStart) > 0: 158 sqlStart += ", " 159 sqlEnd += ", " 160 sqlStart += ", enddate" 161 sqlEnd += ", '" + self._record.enddate + "'" 162 163 if len(sqlStart) > 0: 164 sqlStart = ", " + sqlStart 165 sqlEnd = ", " + sqlEnd 166 167 sqlCmd = "INSERT INTO spatiotemp (id, original_doc_id" + sqlStart + ") " + sqlEnd + ");" 197 198 # Work out the relationship between the spatial and temporal data and handle appropriately 199 timeData = self._record.stData.getTemporalData() 200 spatialData = self._record.stData.getSpatialData() 201 202 # check if we have any spatiotemporal data to add; escape if not 203 if not timeData and not spatialData: 204 logging.info("No spatiotemporal data found for record - skipping") 205 return 206 207 # setup dummy, null data if required 208 if not timeData: 209 timeData = [ TimeRange('null', 'null') ] 210 211 if not spatialData: 212 spatialData = [ Coords('null', 'null', 'null', 'null') ] 213 214 # if both arrays of data are the same length, assume they map together 215 i = 0 216 if len(timeData) == len(spatialData): 217 logging.info("Spatial data is the same size as temporal data; assuming these map together one to one") 218 for timeRange in timeData: 219 self._insertSpatioTemporalRow(spatialData[i], timeRange) 220 i += 1 221 222 # otherwise, map everything to everything 223 else: 224 logging.info("Spatial and temporal data are of different sizes; map everything to everything") 225 for timeRange in timeData: 226 for coords in spatialData: 227 self._insertSpatioTemporalRow(coords, timeRange) 228 logging.info("Spatiotemporal data added to DB record") 229 230 231 def _insertOriginalRecord(self): 232 ''' 233 Insert the original metadata doc into the postgres DB 234 ''' 235 logging.info("Inserting new original document in Postgres DB") 236 sqlCmd = "SELECT create_document('" + self._record.filename + "', '" + \ 237 self._record.discovery_id + "', '" + self._record.docType + "', '" + \ 238 self._record.originalFormat + "');" 239 240 id = db_funcs.runSQLCommand(self._connection, sqlCmd) 241 if len(id) == 0: 242 raise ValueError, 'No PK ID returned from INSERT to DB' 243 244 self._record.db_id = id[0][0] 245 logging.info("Original document inserted in Postgres DB") 246 247 248 def _updateOriginalRecord(self): 249 ''' 250 Update the original doc in the postgres DB 251 ''' 252 logging.info("Updating original document in Postgres DB") 253 sqlCmd = "SELECT update_document('" + str(self._record.db_id) + "', '" + \ 254 self._record.filename + "', '" + \ 255 self._record.discovery_id + "', '" + self._record.docType + "', '" + \ 256 self._record.originalFormat + "', '" + str(self._record.scn) + "');" 257 258 id = db_funcs.runSQLCommand(self._connection, sqlCmd) 168 259 db_funcs.runSQLCommand(self._connection, sqlCmd) 169 logging.info("Spatiotemporal data added to DB record") 170 171 172 def _updateSpatioTemporalData(self): 173 ''' 174 Updates a record in the postgres DB based on the spatiotemporal data 175 specified in the PostgresRecord object 176 ''' 177 # NB, build the query up in two parts so that we only include the correct data 178 logging.info("Updating spatiotemporal data to DB record") 179 sqlStart = "" 180 sqlEnd = " WHERE original_doc_id='" + str(self._record.db_id) + "';" 181 if (not self._record.hasNullCoords()): 182 sqlStart += "coordinates = sbox'((" + self._record.west + "d , " + \ 183 self._record.south + "d), (" + self._record.east + "d , " + \ 184 self._record.north + "d))'" 185 186 if str(self._record.startdate)!='nostartdate': 187 if len(sqlStart) > 0: 188 sqlStart += ", " 189 sqlStart += "startdate = '" + self._record.startdate + "'" 190 191 if str(self._record.enddate)!='noenddate': 192 if len(sqlStart) > 0: 193 sqlStart += ", " 194 sqlStart += "enddate = '" + enddate + "'" 195 196 if len(sqlStart) > 0: 197 sqlStart += ", " 198 sqlCmd = "UPDATE spatiotemp SET (" + sqlStart + "update_time= now()) " + sqlEnd + ");" 199 200 db_funcs.runSQLCommand(self._connection, sqlCmd) 201 logging.info("Spatiotemporal data updated for DB record") 202 203 204 def _insertOriginalRecord(self): 205 ''' 206 Insert the original metadata doc into the postgres DB 207 ''' 208 logging.info("Inserting new original document in Postgres DB") 209 sqlCmd = "INSERT INTO ORIGINAL_DOCUMENT (original_document_id, original_document_filename, " + \ 210 "discovery_id, original_format, " + \ 211 "original_document, ts_vector, create_date, harvest_count, scn) VALUES (" + \ 212 "DEFAULT, '" + self._record.filename + "', '" + self._record.discovery_id + \ 213 "', '" + self._record.docType + "', '" + self._record.originalFormat + \ 214 "', to_tsvector('english', '" + self._record.originalFormat + "'), current_date, 1, 1);" 215 216 self._record.db_id = db_funcs.runSQLCommand(self._connection, sqlCmd) 217 logging.info("Original document inserted in Postgres DB") 218 219 220 def _updateOriginalRecord(self): 221 ''' 222 Update the original doc in the postgres DB 223 ''' 224 logging.info("Updating original document in Postgres DB") 225 sqlCmd = "UPDATE ORIGINAL_DOCUMENT SET (original_document_filename = '" + self._record.filename + \ 226 "', discovery_id = '" + self._record.discovery_id + "', " + \ 227 "original_format = '" + self._record.originalFormat + "', " + \ 228 "ts_vector = to_tsvector('english', '" + self._record.originalFormat + "'), " + \ 229 "update_date = current_date, harvest_count = 1)" + \ 230 " WHERE original_document_id = " + str(self._record.db_id) + ";" 231 232 db_funcs.runSQLCommand(self._connection, sqlCmd) 260 261 # increment scn - saves doing an additional db call 262 self._record.scn += 1 233 263 logging.info("Original document updated in Postgres DB") 234 264 … … 248 278 "transformed_document, create_date, scn) VALUES (" \ 249 279 "DEFAULT, '" + str(self._record.db_id) + "', '" + \ 250 docType + "', '" + doc + "' ), current_date, 1);"280 docType + "', '" + doc + "', current_timestamp, 1);" 251 281 252 282 db_funcs.runSQLCommand(self._connection, sqlCmd) … … 265 295 266 296 for docType, doc in self._record.getAllDocs(): 267 sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET (transformed_document = '" + doc + \268 "', update_date = current_ date)WHERE original_record_id = " + \297 sqlCmd = "UPDATE TRANSFORMED_DOCUMENT SET transformed_document = '" + doc + \ 298 "', update_date = current_timestamp WHERE original_record_id = " + \ 269 299 str(self._record.db_id) + " AND transformed_format = '" + docType + "';" 270 300 … … 281 311 self._record = record 282 312 283 284 285 def link_spatial_and_temporal(Mid,west,south,east,north,startdate,enddate):286 '''287 Creates an entry in the item/time/location table linking an item, time and location.288 '''289 locationid = insert_spatial_coverage(Mid,west,south,east,north)290 timeid = insert_temporal_coverage(Mid,startdate,enddate)291 if (locationid > -1):292 locationidstr = "%s" % locationid293 else:294 locationidstr = "NULL"295 296 if (timeid > -1):297 timeidstr = "%s" % timeid298 else:299 timeidstr = "NULL"300 301 itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, locationid, timeid) values ('"+Mid+"', "+locationidstr+", "+timeidstr+" );"302 303 #print "ItemTimeLocation:\t"+itemtimelocation_sql304 cursor = connection.cursor()305 try:306 cursor.execute(itemtimelocation_sql)307 except:308 print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)309 connection.commit()310 311 def link_spatial(Mid,west,south,east,north):312 '''Creates an entry in the item/time/location table linking an item and location only'''313 locationid = insert_spatial_coverage(Mid,west,south,east,north)314 if (locationid > -1):315 locationidstr = "%s" % locationid316 else:317 locationidstr = "NULL"318 319 itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, locationid) values ('"+Mid+"', "+locationidstr+" );"320 321 #print "ItemTimeLocation:\t"+itemtimelocation_sql322 cursor = connection.cursor()323 try:324 cursor.execute(itemtimelocation_sql)325 except:326 print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)327 connection.commit()328 329 def link_temporal(Mid,startdate,enddate):330 '''Creates an entry in the item/time/location table linking an item and time only'''331 timeid = insert_temporal_coverage(Mid,startdate,enddate)332 if (timeid > -1):333 timeidstr = "%s" % timeid334 else:335 timeidstr = "NULL"336 337 itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, timeid) values ('"+Mid+"', "+timeidstr+" );"338 339 #print "ItemTimeLocation:\t"+itemtimelocation_sql340 cursor = connection.cursor()341 try:342 cursor.execute(itemtimelocation_sql)343 except:344 print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)345 connection.commit()346 347 348 def insert_spatial_coverage(Mid,west,south,east,north):349 '''350 Attempts to insert spatial coverage into spatial table.351 If successful, returns newly created location id, -1 otherwise352 '''353 returnid = -1354 if str(west)!='null' and str(south)!='null' and str(east)!='null' and str(north)!='null':355 insert_sql="INSERT INTO "+location_table+" (geom) VALUES ( setsrid('BOX3D("+west+" "+south+","+east+" "+north+")'::box3d,4326) );"356 print insert_sql357 cursor = connection.cursor()358 try:359 cursor.execute(insert_sql)360 if (cursor.rowcount > 0):361 getid_sql="select currval('"+location_id_seq+"');"362 cursor.execute(getid_sql)363 id = commavaluesfromsublist(cursor.fetchall())364 return int(id)365 else:366 return -1367 except:368 print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)369 connection.commit()370 371 372 def insert_temporal_coverage(Mid,startdate,enddate):373 '''Attempts to insert temporal coverage (only) into time table. If successful, returns timeid to be used in item table, -1 otherwise'''374 returnid = -1375 if (startdate=='nostartdate' and enddate=='noenddate'):376 # Skip if both are missing, although it's OK if only 1 is given377 return -1378 else:379 if (startdate=='nostartdate'): startdate='NULL'380 if (enddate=='noenddate'): enddate='NULL'381 insert_sql = "INSERT INTO "+time_table+" (start_time, end_time) VALUES('"+startdate+"', '"+enddate+"');"382 print insert_sql383 cursor = connection.cursor()384 try:385 cursor.execute(insert_sql)386 if (cursor.rowcount > 0):387 getid_sql="select currval('"+time_id_seq+"');"388 cursor.execute(getid_sql)389 id = commavaluesfromsublist(cursor.fetchall())390 return int(id)391 else:392 return -1393 except:394 print "Error: database error %s %s" %(sys.exc_type, sys.exc_value)395 connection.commit() -
TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresRecord.py
r3847 r3853 70 70 self.mdipFormat = None 71 71 self.iso19139Format = None 72 self.scn = 1 # system change number - keeps track of number of mods to a particular row 72 73 73 74 # do some initial setting up of record … … 320 321 enddate= date.DateRangeEnd 321 322 if startdate==None or startdate=='None': 322 startdate="n ostartdate"323 startdate="null" 323 324 if enddate==None or enddate=='None': 324 enddate="n oenddate"325 enddate="null" 325 326 326 327 self.stData.addTimeRange(startdate, enddate) -
TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py
r3845 r3853 91 91 92 92 # Finally, write the new record 93 dao.createOrUpdateRecord() 93 if dao.createOrUpdateRecord(): 94 self._no_files_ingested += 1 95 94 96 95 97 … … 165 167 dbinfo_file=open('ingest.config', "r") 166 168 dbinfo = dbinfo_file.read().split() 167 if len(dbinfo) !=4:169 if len(dbinfo) < 4: 168 170 raise ValueError, 'Incorrect data in config file' 169 171 170 self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3]) 172 # if port specified in file, use this, otherwise use default 173 if len(dbinfo) > 4: 174 self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4]) 175 else: 176 self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3]) 171 177 logging.info("Postgres DB connection now set up") 172 178 … … 217 223 status = 0 218 224 numfilesproc = 0 225 self._no_files_ingested = 0 219 226 self._base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from 220 227 … … 320 327 321 328 print self.lineSeparator 322 print "INFO: No. of files pre-processed = %s" %numfilesproc 329 print "INFO: Number of files processed = %s" %numfilesproc 330 print "INFO: Number of files ingested = %s" %self._no_files_ingested 323 331 if status == 0: 324 332 print "INFO: Procedure oai_ingest.py completed"
Note: See TracChangeset
for help on using the changeset viewer.