source: TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/oai_document_ingester.py @ 5040

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/oai_document_ingester.py@5040
Revision 5040, 17.2 KB checked in by sdonegan, 11 years ago (diff)

Debug new ingest classes - previous commit had problems with mdip records.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11#from DIF import DIF
12#from MDIP import MDIP
13from DatasetBasicParameters import DatasetBasicParameters
14import ndgUtils
15from ndgUtils.ndgXqueries import ndgXqueries
16from FileUtilities import FileUtilities
17from PostgresRecord import PostgresRecord
18from PostgresDAO import PostgresDAO
19import datetime,time
20import db_funcs
21 
22class oai_document_ingester:
23        '''
24        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
25        - including running the various transforms and parsings to get all doc types and spatiotemporal
26        data in the correct form in the DB
27        '''
28       
29       
30        def addFileToPostgresDB(self, filename,basicParameters):
31                '''
32                Add a file to the postgres DB - extracting and storing all the required
33                data in the process
34                @param filename: full path of file to add to postgres DB
35                '''
36               
37                #filename = self.discDir + actualFilename
38               
39                if not os.path.isfile(filename):
40                        logging.info("Skipping, %s - not a valid file" %filename)
41                        return
42               
43                logging.info("Adding file, " + filename + ", to postgres DB")
44               
45                numSlash = len(filename.split('/'))
46                shortFilename = filename.split('/')[numSlash - 1]
47               
48                # first of all create a PostgresRecord - this object represents all the data required
49                # for a DB entry
50                dao = None
51                try:
52                                               
53                        discoveryID = basicParameters.datasetID
54                        datasetName = basicParameters.datasetName
55                        datacentreName = basicParameters.datacentreName
56                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
57                       
58                       
59                        record = PostgresRecord(filename, self._NDG_dataProvider, \
60                                                            self._datacentre_groups, self._datacentre_namespace, \
61                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate, \
62                                                            self._xq, self._datacentre_format)
63       
64                       
65                        # Now create the data access object to interface to the DB
66                        dao = PostgresDAO(record, self._dbConnection)
67                       
68                        # Finally, write the new record
69                        if dao.createOrUpdateRecord():                         
70                                self._no_files_ingested += 1
71                               
72                       
73                except:
74                       
75                        #if error encountered, add to failure lisr
76                        logging.error("Could not update: " + filename)
77                       
78                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
79                        self.updateFailList.append(originalRecordFilename)
80                       
81                        logging.error("Exception thrown - detail: ")
82                        logging.error(sys.exc_info())
83                       
84                        if dao:
85                                logging.info("Removing record and its associated info from DB")
86                                logging.info("- to allow clean ingestion on rerun")
87                                try:
88                                        dao.deleteOriginalRecord()
89                                except:
90                                        logging.error("Problem encountered when removing record: ")
91                                        logging.error(sys.exc_info())
92                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
93
94                        self._no_problem_files += 1
95                        logging.info("Continue processing other files")
96       
97       
98        def getConfigDetails(self, datacentre):
99                '''
100                Get the harvested records directory and groups for this datacentre from the
101                datacentre specific config file.  The harvested records directory depends on the
102                datacentres OAI base url, the set and format. These have to be know up-front.
103                The groups denote which 'portal groups' they belong to - for limiting searches to
104                say NERC-only datacentres records.
105                Groups are added to the intermediate MOLES when it is created.
106                @param datacentre: datacentre to use when looking up config file
107                '''
108                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
109                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
110               
111                # Check this file exists; if not, assume an invalid datacentre has been specified
112                if not os.path.isfile(self._datacentre_config_filename):
113                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
114                        "specified (%s) is invalid\n" %datacentre)
115                   
116                datacentre_config_file = open(self._datacentre_config_filename, "r")
117               
118                for line in datacentre_config_file.readlines():
119                    words  = string.split(line)
120                    if len(words) == 0:
121                        continue
122                    if words[0] == 'host_path':
123                        self._harvest_home = string.rstrip(words[1])
124                    if words[0] == 'groups':
125                        self._datacentre_groups = words[1:]
126                    if words[0] == 'format':                   
127                        self._datacentre_format = words[1]
128                    if words[0] == 'namespace':
129                        self._datacentre_namespace = words[1]
130                    if words[0] == 'self._NDG_dataProvider':
131                        self._NDG_dataProvider = True
132               
133                datacentre_config_file.close()
134               
135                if self._harvest_home == "":
136                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
137               
138                logging.info("harvested records are in " + self._harvest_home)
139               
140                if self._datacentre_groups == "":
141                    logging.info("No groups/keywords set for datacentre " + datacentre)
142                else:
143                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
144               
145                if self._datacentre_format == "":
146                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
147               
148                logging.info("format being harvested: " + self._datacentre_format)
149               
150                if self._datacentre_namespace == "":
151                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
152               
153                logging.info("datacentre namespace: " + self._datacentre_namespace)
154               
155                if self._NDG_dataProvider:
156                        logging.info("Datacentre classified as an NDG data provider")
157                else:
158                        logging.info("Datacentre is not classified as an NDG data provider")
159                logging.info(self.lineSeparator)
160
161               
162        def _getDBConnection(self):
163                '''
164                Get the default DB connection - by reading in data from the db config file
165                '''
166                logging.info("Setting up connection to postgres DB")
167                dbinfo_file=open('ingest.config', "r")
168                dbinfo = dbinfo_file.read().split()
169                if len(dbinfo) < 4:
170                        raise ValueError, 'Incorrect data in config file'
171               
172                # if port specified in file, use this, otherwise use default
173                if len(dbinfo) > 4:
174                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4])
175                else:
176                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
177                logging.info("Postgres DB connection now set up")
178
179       
180        def usage(self):
181                '''
182                Display input params for the script
183                '''
184                print "Usage: python oai_document_ingester.py [OPTION] <datacentre> <individual update file>"
185                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
186                print " -v - verbose mode for output logging"
187                print " -d - debug mode for output logging"
188                print " -i - specify individual file to upload rather than batch processing as defined in properties file.  \n      (NOTE: script still uses properties file for other parameters)\n"
189                sys.exit(2)
190
191               
192        def __init__(self, datacentre=None, indFileToIngest=None):
193                '''
194                Main entry point for script
195                '''
196                self.lineSeparator = "-----------------------------"
197                print self.lineSeparator
198                print "RUNNING: oai_document_ingester.py"
199               
200                # check for verbose option
201                try:
202                    opts, args = getopt.getopt(sys.argv[1:], "vdi")
203                   
204                except getopt.GetoptError, err:
205                    # print help information and exit:
206                    print str(err) # will print something like "option -a not recognized"
207                   
208                loggingLevel = logging.WARNING
209                indFile = False
210                for o, a in opts:
211                    if o == "-v":
212                        print " - Verbose mode ON"
213                        loggingLevel = logging.INFO
214                    elif o == "-d":
215                        print " - Debug mode ON"
216                        loggingLevel = logging.DEBUG
217                    elif o == "-i":
218                        indFile = True
219                       
220                        #check second arguement is present
221                        if len(sys.argv) < 4:
222                                print " - could not find individual path to file/ specified data centre!\n\n"
223                                self.usage()
224                       
225                        print " - Use INDIVIDUAL file: " + indFileToIngest + " to load"
226               
227                 
228                print self.lineSeparator
229                logging.basicConfig(level=loggingLevel,
230                                            format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
231               
232                if datacentre is None:
233                        self.usage()
234                       
235                #create list to to hold files ingest failed on.
236                self.updateFailList = []
237               
238                # create file utils object to do various file related stuff
239                fileUtils = FileUtilities()
240               
241                numfilesproc = 0
242                self._no_files_ingested = 0
243                self._no_problem_files = 0
244                self._base_dir = "/usr/local/WSClientsIngestUpdate/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/" #os.getcwd() + "/" # this is the base dir that the script is ran from
245                #self._base_dir = "/disks/aer1/users/sdonegan/PROJECTS/ndgWorkspace/discoveryIngestIgnisPRODUCTION/OAIBatch/"
246                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
247               
248                #Change os directory to that with the harvested documents in it.
249                os.chdir(self._base_dir)
250               
251                # - to run on Windows under cygwin, use the following
252                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
253               
254                # set the global variables to retrieve from the config file
255                self._harvest_home = ""
256                self._datacentre_groups = ""
257                self._datacentre_format = ""
258                self._datacentre_namespace = ""
259                self._NDG_dataProvider = False
260               
261                self.getConfigDetails(datacentre)
262               
263                # The directory to put things for a tape backup (should already exist)
264                backupdir = '/disks/glue1/oaiBackup/'
265               
266                # the following dirs define where the specific documents should go
267                originals_dir = data_dir + "/oai/originals/"
268                discovery_dir = data_dir + "/discovery/"
269               
270               
271                # check harvest dir exists and that there are any records to harvest?
272                if not indFile:
273                        if not os.path.exists(self._harvest_home):
274                                logging.warn("Harvest directory for datacentre %s (%s) could not be found - exiting" \
275                                                 %(datacentre, self._harvest_home))
276                                return
277                        elif len(os.listdir(self._harvest_home)) == 0:
278                                logging.warn("Nothing to harvest this time from %s" %datacentre)
279                                return
280                       
281                        # Create/clear the 'in' directory pristine copy of the discovery records
282                        fileUtils.setUpDir(originals_dir)
283                        commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
284                        logging.info("Executing : " + commandline)
285                        status = os.system(commandline)
286                                               
287                else:
288                        #must be looking for an individual file to upload
289                        if not os.path.exists(indFileToIngest):
290                                logging .warn("Specified file does not exist")
291                                return
292                       
293                        # Create/clear the 'in' directory pristine copy of the discovery records
294                        fileUtils.setUpDir(originals_dir)
295                        commandline = "cp " + indFileToIngest + " " + originals_dir
296                        logging.info("Executing : " + commandline)
297                        status = os.system(commandline)
298
299                #did transfer command work?
300                if status !=0:
301                        sys.exit("Failed at making pristine copy stage")
302               
303                # Create/clear the directory for the 'out' processed copy of the discovery records.
304                fileUtils.setUpDir(discovery_dir)
305                   
306                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
307                # - also replace any namespace declarations with a standard one which we know works in NDG
308                # NB, this copies files from the original dir to the discovery dir             
309                self.inputFileOrigFinal = {}
310               
311                logging.info(self.lineSeparator)
312                logging.info("Renaming files:")
313                               
314                for filename in os.listdir(originals_dir):
315                        if filename.endswith('.xml'):                           
316                                original_filename = originals_dir + filename
317                               
318                                #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
319                                basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
320                                                                       
321                                try:
322                                        ident=basicParameters.datasetID #self.getID(original_filename)
323                                       
324                                except Exception, detail:
325                                        logging.error("Could not retrieve ID from file, %s" %filename)
326                                        logging.error("Detail: %s" %detail)
327                                        logging.info("Continue with next file")
328                                        continue
329                               
330                                if self._NDG_dataProvider:
331                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
332                                else:
333                                                ident = ident.replace(":","-")
334                                                ident = ident.replace("/","-")
335                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
336                                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
337                                                logging.info("original file = " + original_filename)
338                                                logging.info("newfile = " + new_filename)
339                                               
340                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
341                                                #this links a derived filename in processing dir with original filename
342                                                #get basic filename from the path+filename passed to this function to use as key               
343                                                self.inputFileOrigFinal[new_filename_short]=filename
344                                                                                                               
345                                # now correct any namespace issues
346                                try:
347                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
348                                except Exception, detail:
349                                        logging.error("SchemaNameSpace failed on file %s" %original_filename)
350                                        logging.error("Detail: %s" %detail)
351                                        logging.info("Continue with next file")
352                                        continue
353                                numfilesproc += 1
354                        else:
355                                logging.warning('File %s is not xml format. Not processed'  %(filename))
356                               
357                       
358               
359                logging.info(self.lineSeparator)
360               
361                # now set up the required XQueries
362                # - NB, extract the xquery libraries locally for easy reference
363                self._xq=ndgXqueries()
364                for libFile in self._xq.xqlib:
365                        fileUtils.createFile(libFile, self._xq.xqlib[libFile])
366               
367                # Process the resulting files and put the data into the postgres DB
368                # - firstly set up a db connection to use
369                self._dbConnection = None
370                self._getDBConnection()
371               
372                #add this to self so can produce list of failed files
373                #self.discDir = discovery_dir
374               
375                filenames = os.listdir(discovery_dir)
376                for filename in filenames:
377                        self.addFileToPostgresDB(discovery_dir + filename,basicParameters)
378               
379                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
380                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
381               
382                this_backupdir = backupdir_base + "_originals/"
383                #fileUtils.makeBackUp(originals_dir, this_backupdir)
384               
385                #Clear out the original harvest records area and discovery dir
386                fileUtils.cleanDir(originals_dir)
387                #fileUtils.cleanDir(discovery_dir)
388               
389                #create a summary file for each data centre ingest
390                data_dir = self._base_dir + "data/"
391                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
392                recOpFile = open(recOpFileName,'w')
393               
394                logging.info("oai_document_ingest processing complete:")
395               
396                recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
397                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
398                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
399                recOpFile.write("PROCESSED " + str(numfilesproc) + "\n")
400                recOpFile.write("INGESTED " + str(self._no_files_ingested)  + "\n")
401               
402                if self._no_problem_files == 0:
403                        logging.info("All files successfully processed - cleaning harvest directory")
404                        #fileUtils.cleanDir(self._harvest_home)
405                else:
406                        logging.error("Problems experienced with %s files" %self._no_problem_files)
407                        logging.error("- harvest directory will not be cleared until these have been fixed and the script has been reran")
408                       
409                        logging.info("INFO: Could not ingest the following files into the Discovery Database: ")
410                       
411                        recOpFile.write("PROBLEM_NUM "  + str(self._no_problem_files)  + "\n")
412                       
413                        for badFile in self.updateFailList:
414                                logging.info("INFO: Could not ingest = %s" %badFile)
415                                recOpFile.write("PROBLEM_FILE " + badFile + "\n")                                       
416               
417                logging.info(self.lineSeparator)
418                logging.info("INFO: Number of files processed = %s" %numfilesproc)
419                logging.info("INFO: Number of files ingested = %s" %self._no_files_ingested)
420                logging.info(self.lineSeparator)
421               
422                recOpFile.close()
423               
424                #if run on single file report info to screen
425                if indFile:
426                        print self.lineSeparator
427                        if self._no_problem_files == 0:
428                                print "File successfully ingested at " + str(datetime.datetime.now())
429                               
430                        else:   
431                                for badFile in self.updateFailList:
432                                        print "Could not ingest: " + badFile
433                        print self.lineSeparator
434               
435                else:           
436                        print "\nScript finished running."
437               
438         
439if __name__=="__main__":
440       
441        opts, args = getopt.getopt(sys.argv[1:], '-vdi')
442       
443        optList=[]
444        for i in opts:
445                optList.append(i[0])
446               
447        if len(args) < 1:
448                oai_document_ingester()
449   
450        if '-i' not in optList:         
451                oai_document_ingester(args[0])         
452        else:
453                #send both arguements needed for individual files
454                oai_document_ingester(args[0], args[1])
Note: See TracBrowser for help on using the repository browser.