source: TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/oai_document_ingester.py @ 4888

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/tags/stable-TI01-ingestAutomation_Proglue_upgradesAndReporting/temp/OAIBatch/oai_document_ingester.py@4888
Revision 4888, 15.5 KB checked in by sdonegan, 12 years ago (diff)

Create tagged release of stable developed version for ingestion and ingestion reporting. Note - this is pre- Calums restructuring of ndgUtils and update to use atom feed. Will need to merge the two at some stage.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from DIF import DIF
12from MDIP import MDIP
13import ndgUtils
14from ndgUtils.ndgXqueries import ndgXqueries
15from FileUtilities import FileUtilities
16from PostgresRecord import PostgresRecord
17from PostgresDAO import PostgresDAO
18from datetime import date
19import db_funcs
20
21class oai_document_ingester:
22        '''
23        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
24        - including running the various transforms and parsings to get all doc types and spatiotemporal
25        data in the correct form in the DB
26        '''
27
28        def getID(self, filename):
29                '''
30                Gets the identifier out of an input metadata xml record.
31                Copes with DIF and MDIP currently.
32                @param filename - name of document file being processed
33                @return: ID - id to use to refer to the document
34                '''
35                logging.info("Retrieving identifier for metadata record " + filename)
36                xml=file(filename).read()
37                if self._datacentre_format == "DIF":
38                    d=DIF(xml)
39                    ID=d.entryID
40                elif self._datacentre_format == "MDIP":
41                    d=MDIP(xml)
42                    ID=d.id
43                else:
44                    raise TypeError, "Only handles DIF or MDIP here."
45       
46                return ID
47       
48       
49        def addFileToPostgresDB(self, filename):
50                '''
51                Add a file to the postgres DB - extracting and storing all the required
52                data in the process
53                @param filename: full path of file to add to postgres DB
54                '''
55               
56                #filename = self.discDir + actualFilename
57               
58                if not os.path.isfile(filename):
59                        logging.info("Skipping, %s - not a valid file" %filename)
60                        return
61               
62                logging.info("Adding file, " + filename + ", to postgres DB")
63               
64                numSlash = len(filename.split('/'))
65                shortFilename = filename.split('/')[numSlash - 1]
66               
67                # first of all create a PostgresRecord - this object represents all the data required
68                # for a DB entry
69                dao = None
70                try:
71                       
72                        discoveryID = self.getID(filename)
73                       
74                        record = PostgresRecord(filename, self._NDG_dataProvider, \
75                                                            self._datacentre_groups, self._datacentre_namespace, \
76                                                            discoveryID, self._xq, self._datacentre_format)
77       
78                       
79                        # Now create the data access object to interface to the DB
80                        dao = PostgresDAO(record, self._dbConnection)
81                       
82                        # Finally, write the new record
83                        if dao.createOrUpdateRecord():                         
84                                self._no_files_ingested += 1
85                               
86                       
87                except:
88                       
89                        #if error encountered, add to failure lisr
90                        logging.error("Could not update: " + filename)
91                       
92                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
93                        self.updateFailList.append(originalRecordFilename)
94                       
95                        logging.error("Exception thrown - detail: ")
96                        logging.error(sys.exc_info())
97                       
98                        if dao:
99                                logging.info("Removing record and its associated info from DB")
100                                logging.info("- to allow clean ingestion on rerun")
101                                try:
102                                        dao.deleteOriginalRecord()
103                                except:
104                                        logging.error("Problem encountered when removing record: ")
105                                        logging.error(sys.exc_info())
106                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
107
108                        self._no_problem_files += 1
109                        logging.info("Continue processing other files")
110       
111       
112        def getConfigDetails(self, datacentre):
113                '''
114                Get the harvested records directory and groups for this datacentre from the
115                datacentre specific config file.  The harvested records directory depends on the
116                datacentres OAI base url, the set and format. These have to be know up-front.
117                The groups denote which 'portal groups' they belong to - for limiting searches to
118                say NERC-only datacentres records.
119                Groups are added to the intermediate MOLES when it is created.
120                @param datacentre: datacentre to use when looking up config file
121                '''
122                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
123                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
124               
125                # Check this file exists; if not, assume an invalid datacentre has been specified
126                if not os.path.isfile(self._datacentre_config_filename):
127                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
128                        "specified (%s) is invalid\n" %datacentre)
129                   
130                datacentre_config_file = open(self._datacentre_config_filename, "r")
131               
132                for line in datacentre_config_file.readlines():
133                    words  = string.split(line)
134                    if len(words) == 0:
135                        continue
136                    if words[0] == 'host_path':
137                        self._harvest_home = string.rstrip(words[1])
138                    if words[0] == 'groups':
139                        self._datacentre_groups = words[1:]
140                    if words[0] == 'format':
141                        self._datacentre_format = words[1]
142                    if words[0] == 'namespace':
143                        self._datacentre_namespace = words[1]
144                    if words[0] == 'self._NDG_dataProvider':
145                        self._NDG_dataProvider = True
146               
147                datacentre_config_file.close()
148               
149                if self._harvest_home == "":
150                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
151               
152                logging.info("harvested records are in " + self._harvest_home)
153               
154                if self._datacentre_groups == "":
155                    logging.info("No groups/keywords set for datacentre " + datacentre)
156                else:
157                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
158               
159                if self._datacentre_format == "":
160                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
161               
162                logging.info("format being harvested: " + self._datacentre_format)
163               
164                if self._datacentre_namespace == "":
165                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
166               
167                logging.info("datacentre namespace: " + self._datacentre_namespace)
168               
169                if self._NDG_dataProvider:
170                        logging.info("Datacentre classified as an NDG data provider")
171                else:
172                        logging.info("Datacentre is not classified as an NDG data provider")
173                logging.info(self.lineSeparator)
174
175               
176        def _getDBConnection(self):
177                '''
178                Get the default DB connection - by reading in data from the db config file
179                '''
180                logging.info("Setting up connection to postgres DB")
181                dbinfo_file=open('ingest.config', "r")
182                dbinfo = dbinfo_file.read().split()
183                if len(dbinfo) < 4:
184                        raise ValueError, 'Incorrect data in config file'
185               
186                # if port specified in file, use this, otherwise use default
187                if len(dbinfo) > 4:
188                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4])
189                else:
190                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
191                logging.info("Postgres DB connection now set up")
192
193       
194        def usage(self):
195                '''
196                Display input params for the script
197                '''
198                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
199                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
200                print " -v - verbose mode for output logging"
201                print " -d - debug mode for output logging"
202                sys.exit(2)
203
204               
205        def __init__(self, datacentre=None):
206                '''
207                Main entry point for script
208                '''
209                self.lineSeparator = "-----------------------------"
210                print self.lineSeparator
211                print "RUNNING: oai_document_ingester.py"
212               
213                # check for verbose option
214                try:
215                    opts, args = getopt.getopt(sys.argv[1:], "vd")
216                except getopt.GetoptError, err:
217                    # print help information and exit:
218                    print str(err) # will print something like "option -a not recognized"
219                   
220                loggingLevel = logging.WARNING
221                for o, a in opts:
222                    if o == "-v":
223                        print " - Verbose mode ON"
224                        loggingLevel = logging.INFO
225                    elif o == "-d":
226                        print " - Debug mode ON"
227                        loggingLevel = logging.DEBUG
228               
229                print self.lineSeparator
230                logging.basicConfig(level=loggingLevel,
231                                            format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
232               
233                if datacentre is None:
234                        self.usage()
235                       
236                #create list to to hold files ingest failed on.
237                self.updateFailList = []
238               
239                # create file utils object to do various file related stuff
240                fileUtils = FileUtilities()
241               
242                numfilesproc = 0
243                self._no_files_ingested = 0
244                self._no_problem_files = 0
245                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
246                       
247                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
248               
249                #Change os directory to that with the harvested documents in it.
250                os.chdir(self._base_dir)
251               
252                # - to run on Windows under cygwin, use the following
253                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
254               
255                # set the global variables to retrieve from the config file
256                self._harvest_home = ""
257                self._datacentre_groups = ""
258                self._datacentre_format = ""
259                self._datacentre_namespace = ""
260                self._NDG_dataProvider = False
261                self.getConfigDetails(datacentre)
262               
263                # check harvest dir exists and that there are any records to harvest?
264                if not os.path.exists(self._harvest_home):
265                        logging.info("Harvest directory for datacentre %s (%s) could not be found - exiting" \
266                                                 %(datacentre, self._harvest_home))
267                        return
268                elif len(os.listdir(self._harvest_home)) == 0:
269                        logging.info("Nothing to harvest this time from %s" %datacentre)
270                        return
271               
272                # The directory to put things for a tape backup (should already exist)
273                backupdir = '/disks/glue1/oaiBackup/'
274               
275                # the following dirs define where the specific documents should go
276                originals_dir = data_dir + "/oai/originals/"
277                discovery_dir = data_dir + "/discovery/"
278               
279                # Create/clear the 'in' directory pristine copy of the discovery records
280                fileUtils.setUpDir(originals_dir)
281                commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
282                logging.info("Executing : " + commandline)
283                status = os.system(commandline)
284
285                if status !=0:
286                    sys.exit("Failed at making pristine copy stage")
287               
288                # Create/clear the directory for the 'out' processed copy of the discovery records.
289                fileUtils.setUpDir(discovery_dir)
290                   
291                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
292                # - also replace any namespace declarations with a standard one which we know works in NDG
293                # NB, this copies files from the original dir to the discovery dir             
294                self.inputFileOrigFinal = {}
295               
296                logging.info(self.lineSeparator)
297                logging.info("Renaming files:")
298       
299                for filename in os.listdir(originals_dir):
300                        if filename.endswith('.xml'):                           
301                                original_filename = originals_dir + filename
302                                                               
303                                try:
304                                        ident=self.getID(original_filename)
305                                       
306                                except Exception, detail:
307                                        logging.error("Could not retrieve ID from file, %s" %filename)
308                                        logging.error("Detail: %s" %detail)
309                                        logging.info("Continue with next file")
310                                        continue
311                               
312                                if self._NDG_dataProvider:
313                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
314                                else:
315                                                ident = ident.replace(":","-")
316                                                ident = ident.replace("/","-")
317                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
318                                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
319                                                logging.info("original file = " + original_filename)
320                                                logging.info("newfile = " + new_filename)
321                                               
322                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
323                                                #this links a derived filename in processing dir with original filename
324                                                #get basic filename from the path+filename passed to this function to use as key               
325                                                self.inputFileOrigFinal[new_filename_short]=filename
326                                                                                                               
327                                # now correct any namespace issues
328                                try:
329                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
330                                except Exception, detail:
331                                        logging.error("SchemaNameSpace failed on file %s" %original_filename)
332                                        logging.error("Detail: %s" %detail)
333                                        logging.info("Continue with next file")
334                                        continue
335                                numfilesproc += 1
336                        else:
337                                logging.warning('File %s is not xml format. Not processed'  %(filename))
338                               
339                       
340               
341                logging.info(self.lineSeparator)
342               
343                # now set up the required XQueries
344                # - NB, extract the xquery libraries locally for easy reference
345                self._xq=ndgXqueries()
346                for libFile in self._xq.xqlib:
347                        fileUtils.createFile(libFile, self._xq.xqlib[libFile])
348               
349                # Process the resulting files and put the data into the postgres DB
350                # - firstly set up a db connection to use
351                self._dbConnection = None
352                self._getDBConnection()
353               
354                #add this to self so can produce list of failed files
355                #self.discDir = discovery_dir
356               
357                filenames = os.listdir(discovery_dir)
358                for filename in filenames:
359                        self.addFileToPostgresDB(discovery_dir + filename)
360               
361                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
362                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
363               
364                this_backupdir = backupdir_base + "_originals/"
365                #fileUtils.makeBackUp(originals_dir, this_backupdir)
366               
367                #Clear out the original harvest records area and discovery dir
368                fileUtils.cleanDir(originals_dir)
369                #fileUtils.cleanDir(discovery_dir)
370               
371                #create a summary file for each data centre ingest
372                data_dir = self._base_dir + "data/"
373                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
374                recOpFile = open(recOpFileName,'w')
375               
376                logging.info("oai_document_ingest processing complete:")
377               
378                recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
379                recOpFile.write("Ingest date: " + str(date.today()) + "\n")
380                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
381                recOpFile.write("PROCESSED " + str(numfilesproc) + "\n")
382                recOpFile.write("INGESTED " + str(self._no_files_ingested)  + "\n")
383               
384                if self._no_problem_files == 0:
385                        logging.info("All files successfully processed - cleaning harvest directory")
386                        #fileUtils.cleanDir(self._harvest_home)
387                else:
388                        logging.error("Problems experienced with %s files" %self._no_problem_files)
389                        logging.error("- harvest directory will not be cleared until these have been fixed and the script has been reran")
390                       
391                        logging.info("INFO: Could not ingest the following files into the Discovery Database: ")
392                       
393                        recOpFile.write("PROBLEM_NUM "  + str(self._no_problem_files)  + "\n")
394                       
395                        for badFile in self.updateFailList:
396                                logging.info("INFO: Could not ingest = %s" %badFile)
397                                recOpFile.write("PROBLEM_FILE " + badFile + "\n")                                       
398               
399                logging.info(self.lineSeparator)
400                logging.info("INFO: Number of files processed = %s" %numfilesproc)
401                logging.info("INFO: Number of files ingested = %s" %self._no_files_ingested)
402                logging.info(self.lineSeparator)
403               
404                recOpFile.close()
405                               
406               
407                print "\nScript finished running."
408       
409if __name__=="__main__":
410        opts, args = getopt.getopt(sys.argv[1:], '-vd')
411        if len(args) < 1:
412                oai_document_ingester()
413       
414        oai_document_ingester(args[0])
Note: See TracBrowser for help on using the repository browser.