source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py @ 4257

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py@4257
Revision 4257, 13.5 KB checked in by cbyrom, 11 years ago (diff)

Fix handling of ndg hosted data - properly reading config settings from file
in ingest + improve use of default value checking.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from DIF import DIF
12from MDIP import MDIP
13import ndgUtils
14from ndgUtils.ndgXqueries import ndgXqueries
15from FileUtilities import FileUtilities
16from PostgresRecord import PostgresRecord
17from PostgresDAO import PostgresDAO
18import db_funcs
19
20class oai_document_ingester(object):
21        '''
22        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
23        - including running the various transforms and parsings to get all doc types and spatiotemporal
24        data in the correct form in the DB
25        '''
26
27        def getID(self, filename):
28                '''
29                Gets the identifier out of an input metadata xml record.
30                Copes with DIF and MDIP currently.
31                @param filename - name of document file being processed
32                @return: ID - id to use to refer to the document
33                '''
34                logging.info("Retrieving identifier for metadata record " + filename)
35                xml=file(filename).read()
36                if self._datacentre_format == "DIF":
37                    d=DIF(xml)
38                    ID=d.entryID
39                elif self._datacentre_format == "MDIP":
40                    d=MDIP(xml)
41                    ID=d.id
42                else:
43                    raise TypeError, "Only handles DIF or MDIP here."
44       
45                logging.info("Found identifier: " + ID)
46                return ID
47       
48       
49        def addFileToPostgresDB(self, filename):
50                '''
51                Add a file to the postgres DB - extracting and storing all the required
52                data in the process
53                @param filename: full path of file to add to postgres DB
54                '''
55                if not os.path.isfile(filename):
56                        logging.info("Skipping, %s - not a valid file" %filename)
57                        return
58               
59                logging.info("Adding file, " + filename + ", to postgres DB")
60               
61                # first of all create a PostgresRecord - this object represents all the data required
62                # for a DB entry
63                dao = None
64                try:
65                        discoveryID = self.getID(filename)
66                        record = PostgresRecord(filename, self._NDG_dataProvider, \
67                                                            self._datacentre_groups, self._datacentre_namespace, \
68                                                            discoveryID, self._xq, self._datacentre_format)
69       
70                        # Now create the data access object to interface to the DB
71                        dao = PostgresDAO(record, self._dbConnection)
72               
73                        # Finally, write the new record
74                        if dao.createOrUpdateRecord():
75                                self._no_files_ingested += 1
76                except:
77                        logging.error("Exception thrown - detail: ")
78                        logging.error(sys.exc_info())
79                       
80                        if dao:
81                                logging.info("Removing record and its associated info from DB")
82                                logging.info("- to allow clean ingestion on rerun")
83                                try:
84                                        dao.deleteOriginalRecord()
85                                except:
86                                        logging.error("Problem encountered when removing record: ")
87                                        logging.error(sys.exc_info())
88                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
89
90                        self._no_problem_files += 1
91                        logging.info("Continue processing other files")
92       
93       
94        def getConfigDetails(self, datacentre):
95                '''
96                Get the harvested records directory and groups for this datacentre from the
97                datacentre specific config file.  The harvested records directory depends on the
98                datacentres OAI base url, the set and format. These have to be know up-front.
99                The groups denote which 'portal groups' they belong to - for limiting searches to
100                say NERC-only datacentres records.
101                Groups are added to the intermediate MOLES when it is created.
102                @param datacentre: datacentre to use when looking up config file
103                '''
104                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
105                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
106               
107                # Check this file exists; if not, assume an invalid datacentre has been specified
108                if not os.path.isfile(self._datacentre_config_filename):
109                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
110                        "specified (%s) is invalid\n" %datacentre)
111                   
112                datacentre_config_file = open(self._datacentre_config_filename, "r")
113               
114                for line in datacentre_config_file.readlines():
115                    words  = string.split(line)
116                    if len(words) == 0:
117                        continue
118                    if words[0] == 'host_path':
119                        self._harvest_home = string.rstrip(words[1])
120                    if words[0] == 'groups':
121                        self._datacentre_groups = words[1:]
122                    if words[0] == 'format':
123                        self._datacentre_format = words[1]
124                    if words[0] == 'namespace':
125                        self._datacentre_namespace = words[1]
126                    if words[0] == 'NDG_dataProvider':
127                        self._NDG_dataProvider = True
128               
129                datacentre_config_file.close()
130               
131                if self._harvest_home == "":
132                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
133               
134                logging.info("harvested records are in " + self._harvest_home)
135               
136                if self._datacentre_groups == "":
137                    logging.info("No groups/keywords set for datacentre " + datacentre)
138                else:
139                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
140               
141                if self._datacentre_format == "":
142                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
143               
144                logging.info("format being harvested: " + self._datacentre_format)
145               
146                if self._datacentre_namespace == "":
147                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
148               
149                logging.info("datacentre namespace: " + self._datacentre_namespace)
150               
151                if self._NDG_dataProvider:
152                        logging.info("Datacentre classified as an NDG data provider")
153                else:
154                        logging.info("Datacentre is not classified as an NDG data provider")
155                logging.info(self.lineSeparator)
156
157               
158        def _getDBConnection(self):
159                '''
160                Get the default DB connection - by reading in data from the db config file
161                '''
162                logging.info("Setting up connection to postgres DB")
163                dbinfo_file=open('ingest.config', "r")
164                dbinfo = dbinfo_file.read().split()
165                if len(dbinfo) < 4:
166                        raise ValueError, 'Incorrect data in config file'
167               
168                # if port specified in file, use this, otherwise use default
169                if len(dbinfo) > 4:
170                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4])
171                else:
172                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
173                logging.info("Postgres DB connection now set up")
174
175       
176        def usage(self):
177                '''
178                Display input params for the script
179                '''
180                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
181                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
182                print " -v - verbose mode for output logging"
183                print " -d - debug mode for output logging"
184                sys.exit(2)
185
186               
187        def __init__(self, datacentre=None):
188                '''
189                Main entry point for script
190                '''
191                self.lineSeparator = "-----------------------------"
192                print self.lineSeparator
193                print "RUNNING: oai_document_ingester.py"
194               
195                # check for verbose option
196                try:
197                    opts, args = getopt.getopt(sys.argv[1:], "vd")
198                except getopt.GetoptError, err:
199                    # print help information and exit:
200                    print str(err) # will print something like "option -a not recognized"
201                   
202                loggingLevel = logging.WARNING
203                for o, a in opts:
204                    if o == "-v":
205                        print " - Verbose mode ON"
206                        loggingLevel = logging.INFO
207                    elif o == "-d":
208                        print " - Debug mode ON"
209                        loggingLevel = logging.DEBUG
210               
211                print self.lineSeparator
212                logging.basicConfig(level=loggingLevel,
213                                            format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
214               
215                if datacentre is None:
216                        self.usage()
217               
218                # create file utils object to do various file related stuff
219                fileUtils = FileUtilities()
220               
221                numfilesproc = 0
222                self._no_files_ingested = 0
223                self._no_problem_files = 0
224                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
225                       
226                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
227               
228                #Change os directory to that with the harvested documents in it.
229                os.chdir(self._base_dir)
230               
231                # - to run on Windows under cygwin, use the following
232                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
233               
234                # set the global variables to retrieve from the config file
235                self._harvest_home = ""
236                self._datacentre_groups = ""
237                self._datacentre_format = ""
238                self._datacentre_namespace = ""
239                self._NDG_dataProvider = False
240                self.getConfigDetails(datacentre)
241               
242                # check harvest dir exists and that there are any records to harvest?
243                if not os.path.exists(self._harvest_home):
244                        logging.info("Harvest directory for datacentre %s (%s) could not be found - exiting" \
245                                                 %(datacentre, self._harvest_home))
246                        return
247                elif len(os.listdir(self._harvest_home)) == 0:
248                        logging.info("Nothing to harvest this time from %s" %datacentre)
249                        return
250               
251                # The directory to put things for a tape backup (should already exist)
252                backupdir = '/disks/glue1/oaiBackup/'
253               
254                # the following dirs define where the specific documents should go
255                originals_dir = data_dir + "/oai/originals/"
256                discovery_dir = data_dir + "/discovery/"
257               
258                # Create/clear the 'in' directory pristine copy of the discovery records
259                fileUtils.setUpDir(originals_dir)
260                commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
261                logging.info("Executing : " + commandline)
262                status = os.system(commandline)
263
264                if status !=0:
265                    sys.exit("Failed at making pristine copy stage")
266               
267                # Create/clear the directory for the 'out' processed copy of the discovery records.
268                fileUtils.setUpDir(discovery_dir)
269                   
270                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
271                # - also replace any namespace declarations with a standard one which we know works in NDG
272                # NB, this copies files from the original dir to the discovery dir
273                logging.info(self.lineSeparator)
274                logging.info("Renaming files:")
275                for filename in os.listdir(originals_dir):
276                        if filename.endswith('.xml'):
277                                original_filename = originals_dir + filename
278                                try:
279                                        ident=self.getID(original_filename)
280                                except Exception, detail:
281                                        logging.error("Could not retrieve ID from file, %s" %filename)
282                                        logging.error("Detail: %s" %detail)
283                                        logging.info("Continue with next file")
284                                        continue
285                               
286                                if self._NDG_dataProvider:
287                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
288                                else:
289                                                ident = ident.replace(":","-")
290                                                ident = ident.replace("/","-")
291                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
292                                                logging.info("original file = " + original_filename)
293                                                logging.info("newfile = " + new_filename)
294                               
295                                # now correct any namespace issues
296                                try:
297                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
298                                except Exception, detail:
299                                        logging.error("SchemaNameSpace failed on file %s" %original_filename)
300                                        logging.error("Detail: %s" %detail)
301                                        logging.info("Continue with next file")
302                                        continue
303                                numfilesproc += 1
304                        else:
305                                logging.warning('File %s is not xml format. Not processed'  %(filename))
306               
307                logging.info(self.lineSeparator)
308               
309                # now set up the required XQueries
310                # - NB, extract the xquery libraries locally for easy reference
311                self._xq=ndgXqueries()
312                for libFile in self._xq.xqlib:
313                        fileUtils.createFile(libFile, self._xq.xqlib[libFile])
314               
315                # Process the resulting files and put the data into the postgres DB
316                # - firstly set up a db connection to use
317                self._dbConnection = None
318                self._getDBConnection()
319               
320                filenames = os.listdir(discovery_dir)
321                for filename in filenames:
322                        self.addFileToPostgresDB(discovery_dir + filename)
323               
324                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
325                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
326               
327                this_backupdir = backupdir_base + "_originals/"
328                #fileUtils.makeBackUp(originals_dir, this_backupdir)
329               
330                #Clear out the original harvest records area and discovery dir
331                fileUtils.cleanDir(originals_dir)
332                #fileUtils.cleanDir(discovery_dir)
333               
334                logging.info("oai_document_ingest processing complete:")
335                if self._no_problem_files == 0:
336                        logging.info("All files successfully processed - cleaning harvest directory")
337                        #fileUtils.cleanDir(self._harvest_home)
338                else:
339                        logging.error("Problems experienced with %s files" %self._no_problem_files)
340                        logging.error("- harvest directory will not be cleared until these have been fixed and the script has been reran")
341               
342                logging.info(self.lineSeparator)
343                logging.info("INFO: Number of files processed = %s" %numfilesproc)
344                logging.info("INFO: Number of files ingested = %s" %self._no_files_ingested)
345                logging.info(self.lineSeparator)
346                print "Script finished running."
347               
348       
349if __name__=="__main__":
350        opts, args = getopt.getopt(sys.argv[1:], '-vd')
351        if len(args) < 1:
352                oai_document_ingester()
353       
354        oai_document_ingester(args[0])
Note: See TracBrowser for help on using the repository browser.