source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py @ 4258

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py@4258
Revision 4258, 13.1 KB checked in by cbyrom, 11 years ago (diff)

Extend idget function to cope with MDIP records and implement usage of this

  • to remove the need for the DIF and MDIP models - delete these from the codebase.
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11import ndgUtils
12from ndgUtils.ndgXqueries import ndgXqueries
13from FileUtilities import FileUtilities
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17import db_funcs
18
19class oai_document_ingester(object):
20        '''
21        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
22        - including running the various transforms and parsings to get all doc types and spatiotemporal
23        data in the correct form in the DB
24        '''
25
26        def getID(self, filename):
27                '''
28                Gets the identifier out of an input metadata xml record.
29                @param filename - name of document file being processed
30                @return: ID - id to use to refer to the document
31                '''
32                logging.info("Retrieving identifier for metadata record " + filename)
33                xml=file(filename).read()
34                ID = idget(xml)
35                return ID
36       
37       
38        def addFileToPostgresDB(self, filename):
39                '''
40                Add a file to the postgres DB - extracting and storing all the required
41                data in the process
42                @param filename: full path of file to add to postgres DB
43                '''
44                if not os.path.isfile(filename):
45                        logging.info("Skipping, %s - not a valid file" %filename)
46                        return
47               
48                logging.info("Adding file, " + filename + ", to postgres DB")
49               
50                # first of all create a PostgresRecord - this object represents all the data required
51                # for a DB entry
52                dao = None
53                try:
54                        discoveryID = self.getID(filename)
55                        record = PostgresRecord(filename, self._NDG_dataProvider, \
56                                                            self._datacentre_groups, self._datacentre_namespace, \
57                                                            discoveryID, self._xq, self._datacentre_format)
58       
59                        # Now create the data access object to interface to the DB
60                        dao = PostgresDAO(record, self._dbConnection)
61               
62                        # Finally, write the new record
63                        if dao.createOrUpdateRecord():
64                                self._no_files_ingested += 1
65                except:
66                        logging.error("Exception thrown - detail: ")
67                        logging.error(sys.exc_info())
68                       
69                        if dao:
70                                logging.info("Removing record and its associated info from DB")
71                                logging.info("- to allow clean ingestion on rerun")
72                                try:
73                                        dao.deleteOriginalRecord()
74                                except:
75                                        logging.error("Problem encountered when removing record: ")
76                                        logging.error(sys.exc_info())
77                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
78
79                        self._no_problem_files += 1
80                        logging.info("Continue processing other files")
81       
82       
83        def getConfigDetails(self, datacentre):
84                '''
85                Get the harvested records directory and groups for this datacentre from the
86                datacentre specific config file.  The harvested records directory depends on the
87                datacentres OAI base url, the set and format. These have to be know up-front.
88                The groups denote which 'portal groups' they belong to - for limiting searches to
89                say NERC-only datacentres records.
90                Groups are added to the intermediate MOLES when it is created.
91                @param datacentre: datacentre to use when looking up config file
92                '''
93                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
94                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
95               
96                # Check this file exists; if not, assume an invalid datacentre has been specified
97                if not os.path.isfile(self._datacentre_config_filename):
98                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
99                        "specified (%s) is invalid\n" %datacentre)
100                   
101                datacentre_config_file = open(self._datacentre_config_filename, "r")
102               
103                for line in datacentre_config_file.readlines():
104                    words  = string.split(line)
105                    if len(words) == 0:
106                        continue
107                    if words[0] == 'host_path':
108                        self._harvest_home = string.rstrip(words[1])
109                    if words[0] == 'groups':
110                        self._datacentre_groups = words[1:]
111                    if words[0] == 'format':
112                        self._datacentre_format = words[1]
113                    if words[0] == 'namespace':
114                        self._datacentre_namespace = words[1]
115                    if words[0] == 'NDG_dataProvider':
116                        self._NDG_dataProvider = True
117               
118                datacentre_config_file.close()
119               
120                if self._harvest_home == "":
121                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
122               
123                logging.info("harvested records are in " + self._harvest_home)
124               
125                if self._datacentre_groups == "":
126                    logging.info("No groups/keywords set for datacentre " + datacentre)
127                else:
128                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
129               
130                if self._datacentre_format == "":
131                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
132               
133                logging.info("format being harvested: " + self._datacentre_format)
134               
135                if self._datacentre_namespace == "":
136                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
137               
138                logging.info("datacentre namespace: " + self._datacentre_namespace)
139               
140                if self._NDG_dataProvider:
141                        logging.info("Datacentre classified as an NDG data provider")
142                else:
143                        logging.info("Datacentre is not classified as an NDG data provider")
144                logging.info(self.lineSeparator)
145
146               
147        def _getDBConnection(self):
148                '''
149                Get the default DB connection - by reading in data from the db config file
150                '''
151                logging.info("Setting up connection to postgres DB")
152                dbinfo_file=open('ingest.config', "r")
153                dbinfo = dbinfo_file.read().split()
154                if len(dbinfo) < 4:
155                        raise ValueError, 'Incorrect data in config file'
156               
157                # if port specified in file, use this, otherwise use default
158                if len(dbinfo) > 4:
159                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4])
160                else:
161                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
162                logging.info("Postgres DB connection now set up")
163
164       
165        def usage(self):
166                '''
167                Display input params for the script
168                '''
169                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
170                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
171                print " -v - verbose mode for output logging"
172                print " -d - debug mode for output logging"
173                sys.exit(2)
174
175               
176        def __init__(self, datacentre=None):
177                '''
178                Main entry point for script
179                '''
180                self.lineSeparator = "-----------------------------"
181                print self.lineSeparator
182                print "RUNNING: oai_document_ingester.py"
183               
184                # check for verbose option
185                try:
186                    opts, args = getopt.getopt(sys.argv[1:], "vd")
187                except getopt.GetoptError, err:
188                    # print help information and exit:
189                    print str(err) # will print something like "option -a not recognized"
190                   
191                loggingLevel = logging.WARNING
192                for o, a in opts:
193                    if o == "-v":
194                        print " - Verbose mode ON"
195                        loggingLevel = logging.INFO
196                    elif o == "-d":
197                        print " - Debug mode ON"
198                        loggingLevel = logging.DEBUG
199               
200                print self.lineSeparator
201                logging.basicConfig(level=loggingLevel,
202                                            format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
203               
204                if datacentre is None:
205                        self.usage()
206               
207                # create file utils object to do various file related stuff
208                fileUtils = FileUtilities()
209               
210                numfilesproc = 0
211                self._no_files_ingested = 0
212                self._no_problem_files = 0
213                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
214                       
215                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
216               
217                #Change os directory to that with the harvested documents in it.
218                os.chdir(self._base_dir)
219               
220                # - to run on Windows under cygwin, use the following
221                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
222               
223                # set the global variables to retrieve from the config file
224                self._harvest_home = ""
225                self._datacentre_groups = ""
226                self._datacentre_format = ""
227                self._datacentre_namespace = ""
228                self._NDG_dataProvider = False
229                self.getConfigDetails(datacentre)
230               
231                # check harvest dir exists and that there are any records to harvest?
232                if not os.path.exists(self._harvest_home):
233                        logging.info("Harvest directory for datacentre %s (%s) could not be found - exiting" \
234                                                 %(datacentre, self._harvest_home))
235                        return
236                elif len(os.listdir(self._harvest_home)) == 0:
237                        logging.info("Nothing to harvest this time from %s" %datacentre)
238                        return
239               
240                # The directory to put things for a tape backup (should already exist)
241                backupdir = '/disks/glue1/oaiBackup/'
242               
243                # the following dirs define where the specific documents should go
244                originals_dir = data_dir + "/oai/originals/"
245                discovery_dir = data_dir + "/discovery/"
246               
247                # Create/clear the 'in' directory pristine copy of the discovery records
248                fileUtils.setUpDir(originals_dir)
249                commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
250                logging.info("Executing : " + commandline)
251                status = os.system(commandline)
252
253                if status !=0:
254                    sys.exit("Failed at making pristine copy stage")
255               
256                # Create/clear the directory for the 'out' processed copy of the discovery records.
257                fileUtils.setUpDir(discovery_dir)
258                   
259                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
260                # - also replace any namespace declarations with a standard one which we know works in NDG
261                # NB, this copies files from the original dir to the discovery dir
262                logging.info(self.lineSeparator)
263                logging.info("Renaming files:")
264                for filename in os.listdir(originals_dir):
265                        if filename.endswith('.xml'):
266                                original_filename = originals_dir + filename
267                                try:
268                                        ident=self.getID(original_filename)
269                                except Exception, detail:
270                                        logging.error("Could not retrieve ID from file, %s" %filename)
271                                        logging.error("Detail: %s" %detail)
272                                        logging.info("Continue with next file")
273                                        continue
274                               
275                                if self._NDG_dataProvider:
276                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
277                                else:
278                                                ident = ident.replace(":","-")
279                                                ident = ident.replace("/","-")
280                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
281                                                logging.info("original file = " + original_filename)
282                                                logging.info("newfile = " + new_filename)
283                               
284                                # now correct any namespace issues
285                                try:
286                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
287                                except Exception, detail:
288                                        logging.error("SchemaNameSpace failed on file %s" %original_filename)
289                                        logging.error("Detail: %s" %detail)
290                                        logging.info("Continue with next file")
291                                        continue
292                                numfilesproc += 1
293                        else:
294                                logging.warning('File %s is not xml format. Not processed'  %(filename))
295               
296                logging.info(self.lineSeparator)
297               
298                # now set up the required XQueries
299                # - NB, extract the xquery libraries locally for easy reference
300                self._xq=ndgXqueries()
301                for libFile in self._xq.xqlib:
302                        fileUtils.createFile(libFile, self._xq.xqlib[libFile])
303               
304                # Process the resulting files and put the data into the postgres DB
305                # - firstly set up a db connection to use
306                self._dbConnection = None
307                self._getDBConnection()
308               
309                filenames = os.listdir(discovery_dir)
310                for filename in filenames:
311                        self.addFileToPostgresDB(discovery_dir + filename)
312               
313                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
314                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
315               
316                this_backupdir = backupdir_base + "_originals/"
317                #fileUtils.makeBackUp(originals_dir, this_backupdir)
318               
319                #Clear out the original harvest records area and discovery dir
320                fileUtils.cleanDir(originals_dir)
321                #fileUtils.cleanDir(discovery_dir)
322               
323                logging.info("oai_document_ingest processing complete:")
324                if self._no_problem_files == 0:
325                        logging.info("All files successfully processed - cleaning harvest directory")
326                        #fileUtils.cleanDir(self._harvest_home)
327                else:
328                        logging.error("Problems experienced with %s files" %self._no_problem_files)
329                        logging.error("- harvest directory will not be cleared until these have been fixed and the script has been reran")
330               
331                logging.info(self.lineSeparator)
332                logging.info("INFO: Number of files processed = %s" %numfilesproc)
333                logging.info("INFO: Number of files ingested = %s" %self._no_files_ingested)
334                logging.info(self.lineSeparator)
335                print "Script finished running."
336               
337       
338if __name__=="__main__":
339        opts, args = getopt.getopt(sys.argv[1:], '-vd')
340        if len(args) < 1:
341                oai_document_ingester()
342       
343        oai_document_ingester(args[0])
Note: See TracBrowser for help on using the repository browser.