source: TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/oai_document_ingester.py @ 5027

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/oai_document_ingester.py@5027
Revision 5027, 18.0 KB checked in by sdonegan, 11 years ago (diff)

Added extra fields to ingest into original document table to aid in search result ordering

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from DIF import DIF
12from MDIP import MDIP
13from DatasetBasicParameters import DatasetBasicParameters
14import ndgUtils
15from ndgUtils.ndgXqueries import ndgXqueries
16from FileUtilities import FileUtilities
17from PostgresRecord import PostgresRecord
18from PostgresDAO import PostgresDAO
19import datetime,time
20import db_funcs
21 
22class oai_document_ingester:
23        '''
24        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
25        - including running the various transforms and parsings to get all doc types and spatiotemporal
26        data in the correct form in the DB
27        '''
28
29        def getID(self, filename):
30                '''
31                Gets the identifier out of an input metadata xml record.
32                Copes with DIF and MDIP currently.
33                @param filename - name of document file being processed
34                @return: ID - id to use to refer to the document
35                '''
36                logging.info("Retrieving identifier for metadata record " + filename)
37                xml=file(filename).read()
38                if self._datacentre_format == "DIF":
39                    d=DIF(xml)
40                    ID=d.entryID
41                elif self._datacentre_format == "MDIP":
42                    d=MDIP(xml)
43                    ID=d.id
44                else:
45                    raise TypeError, "Only handles DIF or MDIP here."
46       
47                return ID
48               
49       
50        def addFileToPostgresDB(self, filename):
51                '''
52                Add a file to the postgres DB - extracting and storing all the required
53                data in the process
54                @param filename: full path of file to add to postgres DB
55                '''
56               
57                #filename = self.discDir + actualFilename
58               
59                if not os.path.isfile(filename):
60                        logging.info("Skipping, %s - not a valid file" %filename)
61                        return
62               
63                logging.info("Adding file, " + filename + ", to postgres DB")
64               
65                numSlash = len(filename.split('/'))
66                shortFilename = filename.split('/')[numSlash - 1]
67               
68                # first of all create a PostgresRecord - this object represents all the data required
69                # for a DB entry
70                dao = None
71                try:
72                       
73                        #Update: add some extra vas in for ingest to aid in search/present ordering of datasets
74                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
75                        discoveryID = basicParameters.datasetID
76                        datasetName = basicParameters.datasetName
77                        datacentreName = basicParameters.datacentreName
78                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
79                       
80                        logging.info("gggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggg 1")
81                       
82                        record = PostgresRecord(filename, self._NDG_dataProvider, \
83                                                            self._datacentre_groups, self._datacentre_namespace, \
84                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate, \
85                                                            self._xq, self._datacentre_format)
86       
87                        logging.info("gggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggg 2")                       
88                       
89                        # Now create the data access object to interface to the DB
90                        dao = PostgresDAO(record, self._dbConnection)
91                        logging.info("gggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggg 3")                       
92                       
93                        # Finally, write the new record
94                        if dao.createOrUpdateRecord():                         
95                                self._no_files_ingested += 1
96                               
97                       
98                except:
99                       
100                        #if error encountered, add to failure lisr
101                        logging.error("Could not update: " + filename)
102                       
103                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
104                        self.updateFailList.append(originalRecordFilename)
105                       
106                        logging.error("Exception thrown - detail: ")
107                        logging.error(sys.exc_info())
108                       
109                        if dao:
110                                logging.info("Removing record and its associated info from DB")
111                                logging.info("- to allow clean ingestion on rerun")
112                                try:
113                                        dao.deleteOriginalRecord()
114                                except:
115                                        logging.error("Problem encountered when removing record: ")
116                                        logging.error(sys.exc_info())
117                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
118
119                        self._no_problem_files += 1
120                        logging.info("Continue processing other files")
121       
122       
123        def getConfigDetails(self, datacentre):
124                '''
125                Get the harvested records directory and groups for this datacentre from the
126                datacentre specific config file.  The harvested records directory depends on the
127                datacentres OAI base url, the set and format. These have to be know up-front.
128                The groups denote which 'portal groups' they belong to - for limiting searches to
129                say NERC-only datacentres records.
130                Groups are added to the intermediate MOLES when it is created.
131                @param datacentre: datacentre to use when looking up config file
132                '''
133                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
134                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
135               
136                # Check this file exists; if not, assume an invalid datacentre has been specified
137                if not os.path.isfile(self._datacentre_config_filename):
138                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
139                        "specified (%s) is invalid\n" %datacentre)
140                   
141                datacentre_config_file = open(self._datacentre_config_filename, "r")
142               
143                for line in datacentre_config_file.readlines():
144                    words  = string.split(line)
145                    if len(words) == 0:
146                        continue
147                    if words[0] == 'host_path':
148                        self._harvest_home = string.rstrip(words[1])
149                    if words[0] == 'groups':
150                        self._datacentre_groups = words[1:]
151                    if words[0] == 'format':
152                        self._datacentre_format = words[1]
153                    if words[0] == 'namespace':
154                        self._datacentre_namespace = words[1]
155                    if words[0] == 'self._NDG_dataProvider':
156                        self._NDG_dataProvider = True
157               
158                datacentre_config_file.close()
159               
160                if self._harvest_home == "":
161                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
162               
163                logging.info("harvested records are in " + self._harvest_home)
164               
165                if self._datacentre_groups == "":
166                    logging.info("No groups/keywords set for datacentre " + datacentre)
167                else:
168                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
169               
170                if self._datacentre_format == "":
171                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
172               
173                logging.info("format being harvested: " + self._datacentre_format)
174               
175                if self._datacentre_namespace == "":
176                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
177               
178                logging.info("datacentre namespace: " + self._datacentre_namespace)
179               
180                if self._NDG_dataProvider:
181                        logging.info("Datacentre classified as an NDG data provider")
182                else:
183                        logging.info("Datacentre is not classified as an NDG data provider")
184                logging.info(self.lineSeparator)
185
186               
187        def _getDBConnection(self):
188                '''
189                Get the default DB connection - by reading in data from the db config file
190                '''
191                logging.info("Setting up connection to postgres DB")
192                dbinfo_file=open('ingest.config', "r")
193                dbinfo = dbinfo_file.read().split()
194                if len(dbinfo) < 4:
195                        raise ValueError, 'Incorrect data in config file'
196               
197                # if port specified in file, use this, otherwise use default
198                if len(dbinfo) > 4:
199                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4])
200                else:
201                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
202                logging.info("Postgres DB connection now set up")
203
204       
205        def usage(self):
206                '''
207                Display input params for the script
208                '''
209                print "Usage: python oai_document_ingester.py [OPTION] <datacentre> <individual update file>"
210                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
211                print " -v - verbose mode for output logging"
212                print " -d - debug mode for output logging"
213                print " -i - specify individual file to upload rather than batch processing as defined in properties file.  \n      (NOTE: script still uses properties file for other parameters)\n"
214                sys.exit(2)
215
216               
217        def __init__(self, datacentre=None, indFileToIngest=None):
218                '''
219                Main entry point for script
220                '''
221                self.lineSeparator = "-----------------------------"
222                print self.lineSeparator
223                print "RUNNING: oai_document_ingester.py"
224               
225                # check for verbose option
226                try:
227                    opts, args = getopt.getopt(sys.argv[1:], "vdi")
228                   
229                except getopt.GetoptError, err:
230                    # print help information and exit:
231                    print str(err) # will print something like "option -a not recognized"
232                   
233                loggingLevel = logging.WARNING
234                indFile = False
235                for o, a in opts:
236                    if o == "-v":
237                        print " - Verbose mode ON"
238                        loggingLevel = logging.INFO
239                    elif o == "-d":
240                        print " - Debug mode ON"
241                        loggingLevel = logging.DEBUG
242                    elif o == "-i":
243                        indFile = True
244                       
245                        #check second arguement is present
246                        if len(sys.argv) < 4:
247                                print " - could not find individual path to file/ specified data centre!\n\n"
248                                self.usage()
249                       
250                        print " - Use INDIVIDUAL file: " + indFileToIngest + " to load"
251               
252                 
253                print self.lineSeparator
254                logging.basicConfig(level=loggingLevel,
255                                            format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
256               
257                if datacentre is None:
258                        self.usage()
259                       
260                #create list to to hold files ingest failed on.
261                self.updateFailList = []
262               
263                # create file utils object to do various file related stuff
264                fileUtils = FileUtilities()
265               
266                numfilesproc = 0
267                self._no_files_ingested = 0
268                self._no_problem_files = 0
269                #self._base_dir = "/usr/local/WSClientsIngestUpdate/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/" #os.getcwd() + "/" # this is the base dir that the script is ran from
270                self._base_dir = "/disks/aer1/users/sdonegan/PROJECTS/ndgWorkspace/discoveryIngestIgnisPRODUCTION/OAIBatch/"
271                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
272               
273                #Change os directory to that with the harvested documents in it.
274                os.chdir(self._base_dir)
275               
276                # - to run on Windows under cygwin, use the following
277                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
278               
279                # set the global variables to retrieve from the config file
280                self._harvest_home = ""
281                self._datacentre_groups = ""
282                self._datacentre_format = ""
283                self._datacentre_namespace = ""
284                self._NDG_dataProvider = False
285               
286                self.getConfigDetails(datacentre)
287               
288                # The directory to put things for a tape backup (should already exist)
289                backupdir = '/disks/glue1/oaiBackup/'
290               
291                # the following dirs define where the specific documents should go
292                originals_dir = data_dir + "/oai/originals/"
293                discovery_dir = data_dir + "/discovery/"
294               
295               
296                # check harvest dir exists and that there are any records to harvest?
297                if not indFile:
298                        if not os.path.exists(self._harvest_home):
299                                logging.warn("Harvest directory for datacentre %s (%s) could not be found - exiting" \
300                                                 %(datacentre, self._harvest_home))
301                                return
302                        elif len(os.listdir(self._harvest_home)) == 0:
303                                logging.warn("Nothing to harvest this time from %s" %datacentre)
304                                return
305                       
306                        # Create/clear the 'in' directory pristine copy of the discovery records
307                        fileUtils.setUpDir(originals_dir)
308                        commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
309                        logging.info("Executing : " + commandline)
310                        status = os.system(commandline)
311                                               
312                else:
313                        #must be looking for an individual file to upload
314                        if not os.path.exists(indFileToIngest):
315                                logging .warn("Specified file does not exist")
316                                return
317                       
318                        # Create/clear the 'in' directory pristine copy of the discovery records
319                        fileUtils.setUpDir(originals_dir)
320                        commandline = "cp " + indFileToIngest + " " + originals_dir
321                        logging.info("Executing : " + commandline)
322                        status = os.system(commandline)
323
324                #did transfer command work?
325                if status !=0:
326                        sys.exit("Failed at making pristine copy stage")
327               
328                # Create/clear the directory for the 'out' processed copy of the discovery records.
329                fileUtils.setUpDir(discovery_dir)
330                   
331                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
332                # - also replace any namespace declarations with a standard one which we know works in NDG
333                # NB, this copies files from the original dir to the discovery dir             
334                self.inputFileOrigFinal = {}
335               
336                logging.info(self.lineSeparator)
337                logging.info("Renaming files:")
338       
339                for filename in os.listdir(originals_dir):
340                        if filename.endswith('.xml'):                           
341                                original_filename = originals_dir + filename
342                                                               
343                                try:
344                                        ident=self.getID(original_filename)
345                                       
346                                except Exception, detail:
347                                        logging.error("Could not retrieve ID from file, %s" %filename)
348                                        logging.error("Detail: %s" %detail)
349                                        logging.info("Continue with next file")
350                                        continue
351                               
352                                if self._NDG_dataProvider:
353                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
354                                else:
355                                                ident = ident.replace(":","-")
356                                                ident = ident.replace("/","-")
357                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
358                                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
359                                                logging.info("original file = " + original_filename)
360                                                logging.info("newfile = " + new_filename)
361                                               
362                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
363                                                #this links a derived filename in processing dir with original filename
364                                                #get basic filename from the path+filename passed to this function to use as key               
365                                                self.inputFileOrigFinal[new_filename_short]=filename
366                                                                                                               
367                                # now correct any namespace issues
368                                try:
369                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
370                                except Exception, detail:
371                                        logging.error("SchemaNameSpace failed on file %s" %original_filename)
372                                        logging.error("Detail: %s" %detail)
373                                        logging.info("Continue with next file")
374                                        continue
375                                numfilesproc += 1
376                        else:
377                                logging.warning('File %s is not xml format. Not processed'  %(filename))
378                               
379                       
380               
381                logging.info(self.lineSeparator)
382               
383                # now set up the required XQueries
384                # - NB, extract the xquery libraries locally for easy reference
385                self._xq=ndgXqueries()
386                for libFile in self._xq.xqlib:
387                        fileUtils.createFile(libFile, self._xq.xqlib[libFile])
388               
389                # Process the resulting files and put the data into the postgres DB
390                # - firstly set up a db connection to use
391                self._dbConnection = None
392                self._getDBConnection()
393               
394                #add this to self so can produce list of failed files
395                #self.discDir = discovery_dir
396               
397                filenames = os.listdir(discovery_dir)
398                for filename in filenames:
399                        self.addFileToPostgresDB(discovery_dir + filename)
400               
401                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
402                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
403               
404                this_backupdir = backupdir_base + "_originals/"
405                #fileUtils.makeBackUp(originals_dir, this_backupdir)
406               
407                #Clear out the original harvest records area and discovery dir
408                fileUtils.cleanDir(originals_dir)
409                #fileUtils.cleanDir(discovery_dir)
410               
411                #create a summary file for each data centre ingest
412                data_dir = self._base_dir + "data/"
413                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
414                recOpFile = open(recOpFileName,'w')
415               
416                logging.info("oai_document_ingest processing complete:")
417               
418                recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
419                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
420                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
421                recOpFile.write("PROCESSED " + str(numfilesproc) + "\n")
422                recOpFile.write("INGESTED " + str(self._no_files_ingested)  + "\n")
423               
424                if self._no_problem_files == 0:
425                        logging.info("All files successfully processed - cleaning harvest directory")
426                        #fileUtils.cleanDir(self._harvest_home)
427                else:
428                        logging.error("Problems experienced with %s files" %self._no_problem_files)
429                        logging.error("- harvest directory will not be cleared until these have been fixed and the script has been reran")
430                       
431                        logging.info("INFO: Could not ingest the following files into the Discovery Database: ")
432                       
433                        recOpFile.write("PROBLEM_NUM "  + str(self._no_problem_files)  + "\n")
434                       
435                        for badFile in self.updateFailList:
436                                logging.info("INFO: Could not ingest = %s" %badFile)
437                                recOpFile.write("PROBLEM_FILE " + badFile + "\n")                                       
438               
439                logging.info(self.lineSeparator)
440                logging.info("INFO: Number of files processed = %s" %numfilesproc)
441                logging.info("INFO: Number of files ingested = %s" %self._no_files_ingested)
442                logging.info(self.lineSeparator)
443               
444                recOpFile.close()
445               
446                #if run on single file report info to screen
447                if indFile:
448                        print self.lineSeparator
449                        if self._no_problem_files == 0:
450                                print "File successfully ingested at " + str(datetime.datetime.now())
451                               
452                        else:   
453                                for badFile in self.updateFailList:
454                                        print "Could not ingest: " + badFile
455                        print self.lineSeparator
456               
457                else:           
458                        print "\nScript finished running."
459               
460         
461if __name__=="__main__":
462       
463        opts, args = getopt.getopt(sys.argv[1:], '-vdi')
464       
465        optList=[]
466        for i in opts:
467                optList.append(i[0])
468               
469        if len(args) < 1:
470                oai_document_ingester()
471   
472        if '-i' not in optList:         
473                oai_document_ingester(args[0])         
474        else:
475                #send both arguements needed for individual files
476                oai_document_ingester(args[0], args[1])
Note: See TracBrowser for help on using the repository browser.