source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py @ 3817

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py@3817
Revision 3817, 11.4 KB checked in by cbyrom, 11 years ago (diff)

Add default logging support + create new version of ingest script, removing
all traces of the eXist DB + improve documentation and output.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26# 11/04/08 CByrom Tidy up script by organising code into reusable functions + variables
27# + remove dependency on eXist DB
28
29import os, sys, string, getopt, logging
30from time import strftime
31from SchemaNameSpace import SchemaNameSpace
32from DIF import DIF
33from MDIP import MDIP
34import ndgUtils
35from ndgUtils.ndgXqueries import ndgXqueries
36from ndgUtils.ndgObject import ndgObject
37from FileUtilities import FileUtilities
38from PostgresRecord import PostgresRecord
39from PostgresDAO import PostgresDAO
40
41def getID(filename):
42        '''
43        Gets the identifier out of an input metadata xml record.
44        Copes with DIF and MDIP currently.
45        @param filename - name of document file being processed
46        @return: ID - id to use to refer to the document
47        '''
48        logging.info("Retrieving identifier for metadata record " + filename)
49        xml=file(filename).read()
50        if datacentre_format == "DIF":
51            d=DIF(xml)
52            ID=d.entryID
53        elif datacentre_format == "MDIP":
54            d=MDIP(xml)
55            ID=d.id
56        else:
57            sys.exit("Only handles DIF or MDIP here.")
58
59        logging.info("Found identifier: " + ID)
60        return ID
61
62
63def addFileToPostgresDB(filename):
64        '''
65        Add a file to the postgres DB - extracting and storing all the required
66        data in the process
67        '''
68        logging.info("Adding file, " + filename + ", to postgres DB")
69        discoveryID = getID(filename)
70       
71        # NB, if we're dealing with an NDG data provider, the details are slightly different
72        if NDG_dataProvider:
73                discObj=ndgObject(discoveryID)
74        discoveryID = discObj.localID
75        datacentre_namespace = discObj.repository
76       
77        # first of all create a PostgresRecord - this object represents all the data required
78        # for a DB entry
79        record = PostgresRecord(filename, datacentre_groups, datacentre_namespace, discoveryID, xq, datacentre_format)
80
81        # Now create the data access object to interface to the DB
82        dao = PostgresDAO(record)
83       
84        # Finally, write the new record
85        dao.createOrUpdateRecord()
86
87
88def getConfigDetails(datacentre):
89        '''
90        Get the harvested records directory and groups for this datacentre from the
91        datacentre specific config file.  The harvested records directory depends on the
92        datacentres OAI base url, the set and format. These have to be know up-front.
93        The groups denote which 'portal groups' they belong to - for limiting searches to
94        say NERC-only datacentres records.
95        Groups are added to the intermediate MOLES when it is created.
96        @param datacentre: datacentre to use when looking up config file
97        '''
98        # set the variables to use the global copies, not the local ones
99        global harvest_home, datacentre_groups, datacentre_format, datacentre_namespace
100        global datacentre_config_filename, NDG_dataProvider
101        datacentre_config_filename = base_dir + datacentre + "_config.properties"
102        logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
103       
104        # Check this file exists; if not, assume an invalid datacentre has been specified
105        if not os.path.isfile(datacentre_config_filename):
106            sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
107                "specified (%s) is invalid\n" %datacentre)
108           
109        datacentre_config_file = open(datacentre_config_filename, "r")
110       
111        for line in datacentre_config_file.readlines():
112            words  = string.split(line)
113            if len(words) == 0:
114                continue
115            if words[0] == 'host_path':
116                harvest_home = string.rstrip(words[1])
117            if words[0] == 'groups':
118                datacentre_groups = words[1:]
119            if words[0] == 'format':
120                datacentre_format = words[1]
121            if words[0] == 'namespace':
122                datacentre_namespace = words[1]
123            if words[0] == 'NDG_dataProvider':
124                NDG_dataProvider = True
125       
126        datacentre_config_file.close()
127       
128        if harvest_home == "":
129            sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
130       
131        logging.info("harvested records are in " + harvest_home)
132       
133        if datacentre_groups == "":
134            logging.info("No groups/keywords set for datacentre " + datacentre)
135        else:
136            logging.info("datacentre groups/keywords: " + datacentre_groups)
137       
138        if datacentre_format == "":
139            sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
140       
141        logging.info("format being harvested: " + datacentre_format)
142       
143        if datacentre_namespace == "":
144            sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
145       
146        logging.info("datacentre namespace: " + datacentre_namespace)
147        print lineSeparator
148       
149
150def usage():
151        '''
152        Display input params for the script
153        '''
154        print "Usage: python -v oai_ingest.py <datacentre>"
155        print " - where:\n   <datacentre> is the data centre to ingest data from; and"
156        print " -v - verbose mode for output logging"
157        sys.exit(2)
158       
159lineSeparator = "-----------------------------"
160print lineSeparator
161print "RUNNING: oai_ingest.py"         
162
163verboseMode = False
164
165
166# check for verbose option
167try:
168    opts, args = getopt.getopt(sys.argv[1:], "v")
169except getopt.GetoptError, err:
170    # print help information and exit:
171    print str(err) # will print something like "option -a not recognized"
172    usage()
173   
174loggingLevel = logging.WARNING
175for o, a in opts:
176    if o == "-v":
177        print " - Verbose mode ON"
178        loggingLevel = logging.DEBUG
179
180logging.basicConfig(level=loggingLevel,
181                                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
182
183print lineSeparator
184
185if (len(args) != 1):
186        usage()
187else:
188    datacentre = args[0]
189
190# set up the file utils to use this logger
191fileUtils = FileUtilities()
192
193status = 0
194numfilesproc = 0
195#base_dir = "/usr/local/WSClients/OAIBatch/" # this is the base dir that the script is ran from
196base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from
197       
198data_dir = base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
199
200#Change os directory to that with the harvested documents in it.
201os.chdir(base_dir)
202
203# - to run on Windows under cygwin, use the following
204#os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
205
206# set the global variables to retrieve from the config file
207harvest_home = ""
208datacentre_groups = ""
209datacentre_format = ""
210datacentre_namespace = ""
211NDG_dataProvider = False
212getConfigDetails(datacentre)
213
214#any records to harvest?
215if len( os.listdir(harvest_home)) == 0:
216    logging.info("Nothing to harvest this time from " + datacentre)
217    sys.exit()
218
219# The directory to put things for a tape backup (should already exist)
220backupdir = '/disks/glue1/oaiBackup/'
221
222# the following dirs define where the specific documents should go
223originals_dir = data_dir +"/oai/originals/"
224discovery_dir = data_dir +"/discovery/"
225
226# Create/clear the 'in' directory pristine copy of the discovery records
227fileUtils.setUpDir(originals_dir)
228commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} " + originals_dir
229#commandline = "find " + harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
230logging.info("Executing : " + commandline)
231status = os.system(commandline)
232
233#if status !=0:
234#    sys.exit("Failed at making pristine copy stage")
235
236# Create/clear the directory for the 'out' processed copy of the discovery records.
237fileUtils.setUpDir(discovery_dir)
238   
239# The file config.properties contains the location of the particular datacentres harvested records.
240# Copy the datacentre specific version of config to config.properties file.
241#commandline = "cp " + datacentre_config_filename + " " + base_dir + "config.properties"
242#logging.info("Executing : " + commandline)
243#status = os.system(commandline)
244#if status !=0:
245#    sys.exit("Failed at copying config file stage")
246
247#Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
248# - also replace any namespace declarations with a standard one which we know works in NDG
249# NB, this copies files from the original dir to the discovery dir
250logging.info(lineSeparator)
251logging.info("Renaming files:")
252for filename in os.listdir(originals_dir):
253        if filename.find('.xml') != -1:
254                original_filename = originals_dir + filename
255                ident=getID(original_filename)
256               
257                if NDG_dataProvider:
258                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
259                else:
260                                ident = ident.replace(":","-")
261                                ident = ident.replace("/","-")
262                                new_filename = discovery_dir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
263                                logging.info("original file = " + original_filename)
264                                logging.info("newfile = " + new_filename)
265               
266                # now correct any namespace issues
267                try:
268                    SchemaNameSpace(original_filename, new_filename, datacentre_format)
269                except:
270                        sys.exit("ERROR: SchemaNameSpace failed on file %s" %original_filename)
271                numfilesproc += 1
272        else:
273                logging.warning('File %s is not xml format. Not processed'  %(filename))
274
275logging.info(lineSeparator)
276
277# now set up the required XQueries
278xq=ndgXqueries()
279
280# Process the resulting files and put the data into the postgres DB
281filenames = os.listdir(discovery_dir)
282for filename in filenames:
283        addFileToPostgresDB(discovery_dir + filename)
284
285#Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
286backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
287
288this_backupdir = backupdir_base + "_originals"
289fileUtils.makeBackUp(originals_dir, this_backupdir)
290
291#Clear out the original harvest records area and FINALMOLES
292fileUtils.cleanDir(originals_dir)
293fileUtils.cleanDir(discovery_dir)
294fileUtils.cleanDir(harvest_home)
295
296print lineSeparator
297print "INFO: No. of files pre-processed = %s" %numfilesproc
298if status == 0:
299    print "INFO: Procedure oai_ingest.py completed"
300else:
301    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
302print lineSeparator
Note: See TracBrowser for help on using the repository browser.