source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py @ 4714

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py@4714
Revision 4714, 13.0 KB checked in by sdonegan, 11 years ago (diff)

SJD cock up (4712)- put Calums latest (4664) back as head revision!

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndgUtils.ndgXqueries import ndgXqueries
12import ndgUtils.lib.fileutilities as FileUtilities
13from PostgresRecord import PostgresRecord
14from PostgresDAO import PostgresDAO
15from Utilities import idget
16import db_funcs
17
18class oai_document_ingester(object):
19        '''
20        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
21        - including running the various transforms and parsings to get all doc types and spatiotemporal
22        data in the correct form in the DB
23        '''
24
25        def getID(self, filename):
26                '''
27                Gets the identifier out of an input metadata xml record.
28                @param filename - name of document file being processed
29                @return: ID - id to use to refer to the document
30                '''
31                logging.info("Retrieving identifier for metadata record " + filename)
32                xml=file(filename).read()
33                ID = idget(xml)
34                return ID
35       
36       
37        def addFileToPostgresDB(self, filename):
38                '''
39                Add a file to the postgres DB - extracting and storing all the required
40                data in the process
41                @param filename: full path of file to add to postgres DB
42                '''
43                if not os.path.isfile(filename):
44                        logging.info("Skipping, %s - not a valid file" %filename)
45                        return
46               
47                logging.info("Adding file, " + filename + ", to postgres DB")
48               
49                # first of all create a PostgresRecord - this object represents all the data required
50                # for a DB entry
51                dao = None
52                try:
53                        discoveryID = self.getID(filename)
54                        record = PostgresRecord(filename, self._NDG_dataProvider, \
55                                                            self._datacentre_groups, self._datacentre_namespace, \
56                                                            discoveryID, self._xq, self._datacentre_format)
57       
58                        # Now create the data access object to interface to the DB
59                        dao = PostgresDAO(record, self._dbConnection)
60               
61                        # Finally, write the new record
62                        if dao.createOrUpdateRecord():
63                                self._no_files_ingested += 1
64                except:
65                        logging.error("Exception thrown - detail: ")
66                        logging.error(sys.exc_info())
67                       
68                        if dao:
69                                logging.info("Removing record and its associated info from DB")
70                                logging.info("- to allow clean ingestion on rerun")
71                                try:
72                                        dao.deleteOriginalRecord()
73                                except:
74                                        logging.error("Problem encountered when removing record: ")
75                                        logging.error(sys.exc_info())
76                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
77
78                        self._no_problem_files += 1
79                        logging.info("Continue processing other files")
80       
81       
82        def getConfigDetails(self, datacentre):
83                '''
84                Get the harvested records directory and groups for this datacentre from the
85                datacentre specific config file.  The harvested records directory depends on the
86                datacentres OAI base url, the set and format. These have to be know up-front.
87                The groups denote which 'portal groups' they belong to - for limiting searches to
88                say NERC-only datacentres records.
89                Groups are added to the intermediate MOLES when it is created.
90                @param datacentre: datacentre to use when looking up config file
91                '''
92                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
93                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
94               
95                # Check this file exists; if not, assume an invalid datacentre has been specified
96                if not os.path.isfile(self._datacentre_config_filename):
97                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
98                        "specified (%s) is invalid\n" %datacentre)
99                   
100                datacentre_config_file = open(self._datacentre_config_filename, "r")
101               
102                for line in datacentre_config_file.readlines():
103                    words  = string.split(line)
104                    if len(words) == 0:
105                        continue
106                    if words[0] == 'host_path':
107                        self._harvest_home = string.rstrip(words[1])
108                    if words[0] == 'groups':
109                        self._datacentre_groups = words[1:]
110                    if words[0] == 'format':
111                        self._datacentre_format = words[1]
112                    if words[0] == 'namespace':
113                        self._datacentre_namespace = words[1]
114                    if words[0] == 'NDG_dataProvider':
115                        self._NDG_dataProvider = True
116               
117                datacentre_config_file.close()
118               
119                if self._harvest_home == "":
120                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
121               
122                logging.info("harvested records are in " + self._harvest_home)
123               
124                if self._datacentre_groups == "":
125                    logging.info("No groups/keywords set for datacentre " + datacentre)
126                else:
127                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
128               
129                if self._datacentre_format == "":
130                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
131               
132                logging.info("format being harvested: " + self._datacentre_format)
133               
134                if self._datacentre_namespace == "":
135                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
136               
137                logging.info("datacentre namespace: " + self._datacentre_namespace)
138               
139                if self._NDG_dataProvider:
140                        logging.info("Datacentre classified as an NDG data provider")
141                else:
142                        logging.info("Datacentre is not classified as an NDG data provider")
143                logging.info(self.lineSeparator)
144
145               
146        def _getDBConnection(self):
147                '''
148                Get the default DB connection - by reading in data from the db config file
149                '''
150                logging.info("Setting up connection to postgres DB")
151                dbinfo_file=open('ingest.config', "r")
152                dbinfo = dbinfo_file.read().split()
153                if len(dbinfo) < 4:
154                        raise ValueError, 'Incorrect data in config file'
155               
156                # if port specified in file, use this, otherwise use default
157                if len(dbinfo) > 4:
158                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4])
159                else:
160                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
161                logging.info("Postgres DB connection now set up")
162
163       
164        def usage(self):
165                '''
166                Display input params for the script
167                '''
168                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
169                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
170                print " -v - verbose mode for output logging"
171                print " -d - debug mode for output logging"
172                sys.exit(2)
173
174               
175        def __init__(self, datacentre=None):
176                '''
177                Main entry point for script
178                '''
179                self.lineSeparator = "-----------------------------"
180                print self.lineSeparator
181                print "RUNNING: oai_document_ingester.py"
182               
183                # check for verbose option
184                try:
185                    opts, args = getopt.getopt(sys.argv[1:], "vd")
186                except getopt.GetoptError, err:
187                    # print help information and exit:
188                    print str(err) # will print something like "option -a not recognized"
189                   
190                loggingLevel = logging.WARNING
191                for o, a in opts:
192                    if o == "-v":
193                        print " - Verbose mode ON"
194                        loggingLevel = logging.INFO
195                    elif o == "-d":
196                        print " - Debug mode ON"
197                        loggingLevel = logging.DEBUG
198               
199                print self.lineSeparator
200                logging.basicConfig(level=loggingLevel,
201                                            format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
202               
203                if datacentre is None:
204                        self.usage()
205               
206                numfilesproc = 0
207                self._no_files_ingested = 0
208                self._no_problem_files = 0
209                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
210                       
211                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
212               
213                #Change os directory to that with the harvested documents in it.
214                os.chdir(self._base_dir)
215               
216                # - to run on Windows under cygwin, use the following
217                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
218               
219                # set the global variables to retrieve from the config file
220                self._harvest_home = ""
221                self._datacentre_groups = ""
222                self._datacentre_format = ""
223                self._datacentre_namespace = ""
224                self._NDG_dataProvider = False
225                self.getConfigDetails(datacentre)
226               
227                # check harvest dir exists and that there are any records to harvest?
228                if not os.path.exists(self._harvest_home):
229                        logging.info("Harvest directory for datacentre %s (%s) could not be found - exiting" \
230                                                 %(datacentre, self._harvest_home))
231                        return
232                elif len(os.listdir(self._harvest_home)) == 0:
233                        logging.info("Nothing to harvest this time from %s" %datacentre)
234                        return
235               
236                # The directory to put things for a tape backup (should already exist)
237                backupdir = '/disks/glue1/oaiBackup/'
238               
239                # the following dirs define where the specific documents should go
240                originals_dir = data_dir + "/oai/originals/"
241                discovery_dir = data_dir + "/discovery/"
242               
243                # Create/clear the 'in' directory pristine copy of the discovery records
244                FileUtilities.setUpDir(originals_dir)
245                commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
246                logging.info("Executing : " + commandline)
247                status = os.system(commandline)
248
249                if status !=0:
250                    sys.exit("Failed at making pristine copy stage")
251               
252                # Create/clear the directory for the 'out' processed copy of the discovery records.
253                FileUtilities.setUpDir(discovery_dir)
254                   
255                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
256                # - also replace any namespace declarations with a standard one which we know works in NDG
257                # NB, this copies files from the original dir to the discovery dir
258                logging.info(self.lineSeparator)
259                logging.info("Renaming files:")
260                for filename in os.listdir(originals_dir):
261                        if filename.endswith('.xml'):
262                                original_filename = originals_dir + filename
263                                try:
264                                        ident=self.getID(original_filename)
265                                except Exception, detail:
266                                        logging.error("Could not retrieve ID from file, %s" %filename)
267                                        logging.error("Detail: %s" %detail)
268                                        logging.info("Continue with next file")
269                                        continue
270                               
271                                if self._NDG_dataProvider:
272                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
273                                else:
274                                                ident = ident.replace(":","-")
275                                                ident = ident.replace("/","-")
276                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
277                                                logging.info("original file = " + original_filename)
278                                                logging.info("newfile = " + new_filename)
279                               
280                                # now correct any namespace issues
281                                try:
282                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
283                                except Exception, detail:
284                                        logging.error("SchemaNameSpace failed on file %s" %original_filename)
285                                        logging.error("Detail: %s" %detail)
286                                        logging.info("Continue with next file")
287                                        continue
288                                numfilesproc += 1
289                        else:
290                                logging.warning('File %s is not xml format. Not processed'  %(filename))
291               
292                logging.info(self.lineSeparator)
293               
294                # now set up the required XQueries
295                # - NB, extract the xquery libraries locally for easy reference
296                self._xq=ndgXqueries()
297                for libFile in self._xq.xqlib:
298                        FileUtilities.createFile(libFile, self._xq.xqlib[libFile])
299               
300                # Process the resulting files and put the data into the postgres DB
301                # - firstly set up a db connection to use
302                self._dbConnection = None
303                self._getDBConnection()
304               
305                filenames = os.listdir(discovery_dir)
306                for filename in filenames:
307                        self.addFileToPostgresDB(discovery_dir + filename)
308               
309                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
310                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
311               
312                this_backupdir = backupdir_base + "_originals/"
313                FileUtilities.makeBackUp(originals_dir, this_backupdir)
314               
315                #Clear out the original harvest records area and discovery dir
316                FileUtilities.cleanDir(originals_dir)
317                FileUtilities.cleanDir(discovery_dir)
318               
319                logging.info("oai_document_ingest processing complete:")
320                if self._no_problem_files == 0:
321                        logging.info("All files successfully processed - cleaning harvest directory")
322                        FileUtilities.cleanDir(self._harvest_home)
323                else:
324                        logging.error("Problems experienced with %s files" %self._no_problem_files)
325                        logging.error("- harvest directory will not be cleared until these have been fixed and the script has been reran")
326               
327                logging.info(self.lineSeparator)
328                logging.info("INFO: Number of files processed = %s" %numfilesproc)
329                logging.info("INFO: Number of files ingested = %s" %self._no_files_ingested)
330                logging.info(self.lineSeparator)
331                print "Script finished running."
332               
333       
334if __name__=="__main__":
335        opts, args = getopt.getopt(sys.argv[1:], '-vd')
336        if len(args) < 1:
337                oai_document_ingester()
338       
339        oai_document_ingester(args[0])
Note: See TracBrowser for help on using the repository browser.