source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py @ 3972

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py@3972
Revision 3972, 13.5 KB checked in by cbyrom, 12 years ago (diff)

Use the short filename in the postgres DB for storing the original
document filename.
Add fix to allow proper handling of scope fields as a ts_vector.
Add TODO comments to highlight areas of concern + update docs.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from DIF import DIF
12from MDIP import MDIP
13import ndgUtils
14from ndgUtils.ndgXqueries import ndgXqueries
15from FileUtilities import FileUtilities
16from PostgresRecord import PostgresRecord
17from PostgresDAO import PostgresDAO
18import db_funcs
19
20class oai_document_ingester:
21        '''
22        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
23        - including running the various transforms and parsings to get all doc types and spatiotemporal
24        data in the correct form in the DB
25        '''
26
27        def getID(self, filename):
28                '''
29                Gets the identifier out of an input metadata xml record.
30                Copes with DIF and MDIP currently.
31                @param filename - name of document file being processed
32                @return: ID - id to use to refer to the document
33                '''
34                logging.info("Retrieving identifier for metadata record " + filename)
35                xml=file(filename).read()
36                if self._datacentre_format == "DIF":
37                    d=DIF(xml)
38                    ID=d.entryID
39                elif self._datacentre_format == "MDIP":
40                    d=MDIP(xml)
41                    ID=d.id
42                else:
43                    raise TypeError, "Only handles DIF or MDIP here."
44       
45                logging.info("Found identifier: " + ID)
46                return ID
47       
48       
49        def addFileToPostgresDB(self, filename):
50                '''
51                Add a file to the postgres DB - extracting and storing all the required
52                data in the process
53                @param filename: full path of file to add to postgres DB
54                '''
55                if not os.path.isfile(filename):
56                        logging.info("Skipping, %s - not a valid file" %filename)
57                        return
58               
59                logging.info("Adding file, " + filename + ", to postgres DB")
60               
61                # first of all create a PostgresRecord - this object represents all the data required
62                # for a DB entry
63                dao = None
64                try:
65                        discoveryID = self.getID(filename)
66                       
67                        record = PostgresRecord(filename, self._NDG_dataProvider, \
68                                                            self._datacentre_groups, self._datacentre_namespace, \
69                                                            discoveryID, self._xq, self._datacentre_format)
70       
71                        # Now create the data access object to interface to the DB
72                        dao = PostgresDAO(record, self._dbConnection)
73               
74                        # Finally, write the new record
75                        if dao.createOrUpdateRecord():
76                                self._no_files_ingested += 1
77                except:
78                        logging.error("Exception thrown - detail: ")
79                        logging.error(sys.exc_info())
80                       
81                        if dao:
82                                logging.info("Removing record and its associated info from DB")
83                                logging.info("- to allow clean ingestion on rerun")
84                                try:
85                                        dao.deleteOriginalRecord()
86                                except:
87                                        logging.error("Problem encountered when removing record: ")
88                                        logging.error(sys.exc_info())
89                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
90
91                        self._no_problem_files += 1
92                        logging.info("Continue processing other files")
93       
94       
95        def getConfigDetails(self, datacentre):
96                '''
97                Get the harvested records directory and groups for this datacentre from the
98                datacentre specific config file.  The harvested records directory depends on the
99                datacentres OAI base url, the set and format. These have to be know up-front.
100                The groups denote which 'portal groups' they belong to - for limiting searches to
101                say NERC-only datacentres records.
102                Groups are added to the intermediate MOLES when it is created.
103                @param datacentre: datacentre to use when looking up config file
104                '''
105                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
106                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
107               
108                # Check this file exists; if not, assume an invalid datacentre has been specified
109                if not os.path.isfile(self._datacentre_config_filename):
110                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
111                        "specified (%s) is invalid\n" %datacentre)
112                   
113                datacentre_config_file = open(self._datacentre_config_filename, "r")
114               
115                for line in datacentre_config_file.readlines():
116                    words  = string.split(line)
117                    if len(words) == 0:
118                        continue
119                    if words[0] == 'host_path':
120                        self._harvest_home = string.rstrip(words[1])
121                    if words[0] == 'groups':
122                        self._datacentre_groups = words[1:]
123                    if words[0] == 'format':
124                        self._datacentre_format = words[1]
125                    if words[0] == 'namespace':
126                        self._datacentre_namespace = words[1]
127                    if words[0] == 'self._NDG_dataProvider':
128                        self._NDG_dataProvider = True
129               
130                datacentre_config_file.close()
131               
132                if self._harvest_home == "":
133                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
134               
135                logging.info("harvested records are in " + self._harvest_home)
136               
137                if self._datacentre_groups == "":
138                    logging.info("No groups/keywords set for datacentre " + datacentre)
139                else:
140                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
141               
142                if self._datacentre_format == "":
143                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
144               
145                logging.info("format being harvested: " + self._datacentre_format)
146               
147                if self._datacentre_namespace == "":
148                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
149               
150                logging.info("datacentre namespace: " + self._datacentre_namespace)
151               
152                if self._NDG_dataProvider:
153                        logging.info("Datacentre classified as an NDG data provider")
154                else:
155                        logging.info("Datacentre is not classified as an NDG data provider")
156                logging.info(self.lineSeparator)
157
158               
159        def _getDBConnection(self):
160                '''
161                Get the default DB connection - by reading in data from the db config file
162                '''
163                logging.info("Setting up connection to postgres DB")
164                dbinfo_file=open('ingest.config', "r")
165                dbinfo = dbinfo_file.read().split()
166                if len(dbinfo) < 4:
167                        raise ValueError, 'Incorrect data in config file'
168               
169                # if port specified in file, use this, otherwise use default
170                if len(dbinfo) > 4:
171                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4])
172                else:
173                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
174                logging.info("Postgres DB connection now set up")
175
176       
177        def usage(self):
178                '''
179                Display input params for the script
180                '''
181                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
182                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
183                print " -v - verbose mode for output logging"
184                print " -d - debug mode for output logging"
185                sys.exit(2)
186
187               
188        def __init__(self, datacentre=None):
189                '''
190                Main entry point for script
191                '''
192                self.lineSeparator = "-----------------------------"
193                print self.lineSeparator
194                print "RUNNING: oai_document_ingester.py"
195               
196                # check for verbose option
197                try:
198                    opts, args = getopt.getopt(sys.argv[1:], "vd")
199                except getopt.GetoptError, err:
200                    # print help information and exit:
201                    print str(err) # will print something like "option -a not recognized"
202                   
203                loggingLevel = logging.WARNING
204                for o, a in opts:
205                    if o == "-v":
206                        print " - Verbose mode ON"
207                        loggingLevel = logging.INFO
208                    elif o == "-d":
209                        print " - Debug mode ON"
210                        loggingLevel = logging.DEBUG
211               
212                print self.lineSeparator
213                logging.basicConfig(level=loggingLevel,
214                                            format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
215               
216                if datacentre is None:
217                        self.usage()
218               
219                # create file utils object to do various file related stuff
220                fileUtils = FileUtilities()
221               
222                numfilesproc = 0
223                self._no_files_ingested = 0
224                self._no_problem_files = 0
225                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
226                       
227                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
228               
229                #Change os directory to that with the harvested documents in it.
230                os.chdir(self._base_dir)
231               
232                # - to run on Windows under cygwin, use the following
233                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
234               
235                # set the global variables to retrieve from the config file
236                self._harvest_home = ""
237                self._datacentre_groups = ""
238                self._datacentre_format = ""
239                self._datacentre_namespace = ""
240                self._NDG_dataProvider = False
241                self.getConfigDetails(datacentre)
242               
243                # check harvest dir exists and that there are any records to harvest?
244                if not os.path.exists(self._harvest_home):
245                        logging.info("Harvest directory for datacentre %s (%s) could not be found - exiting" \
246                                                 %(datacentre, self._harvest_home))
247                        return
248                elif len(os.listdir(self._harvest_home)) == 0:
249                        logging.info("Nothing to harvest this time from %s" %datacentre)
250                        return
251               
252                # The directory to put things for a tape backup (should already exist)
253                backupdir = '/disks/glue1/oaiBackup/'
254               
255                # the following dirs define where the specific documents should go
256                originals_dir = data_dir + "/oai/originals/"
257                discovery_dir = data_dir + "/discovery/"
258               
259                # Create/clear the 'in' directory pristine copy of the discovery records
260                fileUtils.setUpDir(originals_dir)
261                commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
262                logging.info("Executing : " + commandline)
263                status = os.system(commandline)
264
265                if status !=0:
266                    sys.exit("Failed at making pristine copy stage")
267               
268                # Create/clear the directory for the 'out' processed copy of the discovery records.
269                fileUtils.setUpDir(discovery_dir)
270                   
271                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
272                # - also replace any namespace declarations with a standard one which we know works in NDG
273                # NB, this copies files from the original dir to the discovery dir
274                logging.info(self.lineSeparator)
275                logging.info("Renaming files:")
276                for filename in os.listdir(originals_dir):
277                        if filename.endswith('.xml'):
278                                original_filename = originals_dir + filename
279                                try:
280                                        ident=self.getID(original_filename)
281                                except Exception, detail:
282                                        logging.error("Could not retrieve ID from file, %s" %filename)
283                                        logging.error("Detail: %s" %detail)
284                                        logging.info("Continue with next file")
285                                        continue
286                               
287                                if self._NDG_dataProvider:
288                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
289                                else:
290                                                ident = ident.replace(":","-")
291                                                ident = ident.replace("/","-")
292                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
293                                                logging.info("original file = " + original_filename)
294                                                logging.info("newfile = " + new_filename)
295                               
296                                # now correct any namespace issues
297                                try:
298                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
299                                except Exception, detail:
300                                        logging.error("SchemaNameSpace failed on file %s" %original_filename)
301                                        logging.error("Detail: %s" %detail)
302                                        logging.info("Continue with next file")
303                                        continue
304                                numfilesproc += 1
305                        else:
306                                logging.warning('File %s is not xml format. Not processed'  %(filename))
307               
308                logging.info(self.lineSeparator)
309               
310                # now set up the required XQueries
311                # - NB, extract the xquery libraries locally for easy reference
312                self._xq=ndgXqueries()
313                for libFile in self._xq.xqlib:
314                        fileUtils.createFile(libFile, self._xq.xqlib[libFile])
315               
316                # Process the resulting files and put the data into the postgres DB
317                # - firstly set up a db connection to use
318                self._dbConnection = None
319                self._getDBConnection()
320               
321                filenames = os.listdir(discovery_dir)
322                for filename in filenames:
323                        self.addFileToPostgresDB(discovery_dir + filename)
324               
325                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
326                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
327               
328                this_backupdir = backupdir_base + "_originals/"
329                #fileUtils.makeBackUp(originals_dir, this_backupdir)
330               
331                #Clear out the original harvest records area and discovery dir
332                fileUtils.cleanDir(originals_dir)
333                #fileUtils.cleanDir(discovery_dir)
334               
335                logging.info("oai_document_ingest processing complete:")
336                if self._no_problem_files == 0:
337                        logging.info("All files successfully processed - cleaning harvest directory")
338                        #fileUtils.cleanDir(self._harvest_home)
339                else:
340                        logging.error("Problems experienced with %s files" %self._no_problem_files)
341                        logging.error("- harvest directory will not be cleared until these have been fixed and the script has been reran")
342               
343                logging.info(self.lineSeparator)
344                logging.info("INFO: Number of files processed = %s" %numfilesproc)
345                logging.info("INFO: Number of files ingested = %s" %self._no_files_ingested)
346                logging.info(self.lineSeparator)
347                print "Script finished running."
348               
349       
350if __name__=="__main__":
351        opts, args = getopt.getopt(sys.argv[1:], '-vd')
352        if len(args) < 1:
353                oai_document_ingester()
354       
355        oai_document_ingester(args[0])
Note: See TracBrowser for help on using the repository browser.