source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py @ 3821

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py@3821
Revision 3821, 11.5 KB checked in by cbyrom, 13 years ago (diff)

Fix a few problems - including referencing the xquery libraries; these
have now been added to the ndgUtils egg and are extracted locally and
referenced directly. Also add functionality to deal with the moles -> other
transforms + add new utility methods and tidy up code and add more logging.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26# 11/04/08 CByrom Tidy up script by organising code into reusable functions + variables
27# + remove dependency on eXist DB
28
29import os, sys, string, getopt, logging
30from time import strftime
31from SchemaNameSpace import SchemaNameSpace
32from DIF import DIF
33from MDIP import MDIP
34import ndgUtils
35from ndgUtils.ndgXqueries import ndgXqueries
36from FileUtilities import FileUtilities
37from PostgresRecord import PostgresRecord
38from PostgresDAO import PostgresDAO
39
40def getID(filename):
41        '''
42        Gets the identifier out of an input metadata xml record.
43        Copes with DIF and MDIP currently.
44        @param filename - name of document file being processed
45        @return: ID - id to use to refer to the document
46        '''
47        logging.info("Retrieving identifier for metadata record " + filename)
48        xml=file(filename).read()
49        if datacentre_format == "DIF":
50            d=DIF(xml)
51            ID=d.entryID
52        elif datacentre_format == "MDIP":
53            d=MDIP(xml)
54            ID=d.id
55        else:
56            sys.exit("Only handles DIF or MDIP here.")
57
58        logging.info("Found identifier: " + ID)
59        return ID
60
61
62def addFileToPostgresDB(filename):
63        '''
64        Add a file to the postgres DB - extracting and storing all the required
65        data in the process
66        @param filename: full path of file to add to postgres DB
67        '''
68        logging.info("Adding file, " + filename + ", to postgres DB")
69        discoveryID = getID(filename)
70       
71        # first of all create a PostgresRecord - this object represents all the data required
72        # for a DB entry
73        record = PostgresRecord(filename, NDG_dataProvider, datacentre_groups, datacentre_namespace, discoveryID, xq, datacentre_format)
74
75        # Now create the data access object to interface to the DB
76        dao = PostgresDAO(record)
77       
78        # Finally, write the new record
79        dao.createOrUpdateRecord()
80
81
82def getConfigDetails(datacentre):
83        '''
84        Get the harvested records directory and groups for this datacentre from the
85        datacentre specific config file.  The harvested records directory depends on the
86        datacentres OAI base url, the set and format. These have to be know up-front.
87        The groups denote which 'portal groups' they belong to - for limiting searches to
88        say NERC-only datacentres records.
89        Groups are added to the intermediate MOLES when it is created.
90        @param datacentre: datacentre to use when looking up config file
91        '''
92        # set the variables to use the global copies, not the local ones
93        global harvest_home, datacentre_groups, datacentre_format, datacentre_namespace
94        global datacentre_config_filename, NDG_dataProvider
95        datacentre_config_filename = base_dir + datacentre + "_config.properties"
96        logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
97       
98        # Check this file exists; if not, assume an invalid datacentre has been specified
99        if not os.path.isfile(datacentre_config_filename):
100            sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
101                "specified (%s) is invalid\n" %datacentre)
102           
103        datacentre_config_file = open(datacentre_config_filename, "r")
104       
105        for line in datacentre_config_file.readlines():
106            words  = string.split(line)
107            if len(words) == 0:
108                continue
109            if words[0] == 'host_path':
110                harvest_home = string.rstrip(words[1])
111            if words[0] == 'groups':
112                datacentre_groups = words[1:]
113            if words[0] == 'format':
114                datacentre_format = words[1]
115            if words[0] == 'namespace':
116                datacentre_namespace = words[1]
117            if words[0] == 'NDG_dataProvider':
118                NDG_dataProvider = True
119       
120        datacentre_config_file.close()
121       
122        if harvest_home == "":
123            sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
124       
125        logging.info("harvested records are in " + harvest_home)
126       
127        if datacentre_groups == "":
128            logging.info("No groups/keywords set for datacentre " + datacentre)
129        else:
130            logging.info("datacentre groups/keywords: " + datacentre_groups)
131       
132        if datacentre_format == "":
133            sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
134       
135        logging.info("format being harvested: " + datacentre_format)
136       
137        if datacentre_namespace == "":
138            sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
139       
140        logging.info("datacentre namespace: " + datacentre_namespace)
141       
142        if NDG_dataProvider:
143                logging.info("Datacentre classified as an NDG data provider")
144        else:
145                logging.info("Datacentre is not classificied as an NDG data provider")
146        print lineSeparator
147       
148
149def usage():
150        '''
151        Display input params for the script
152        '''
153        print "Usage: python -v oai_ingest.py <datacentre>"
154        print " - where:\n   <datacentre> is the data centre to ingest data from; and"
155        print " -v - verbose mode for output logging"
156        sys.exit(2)
157       
158lineSeparator = "-----------------------------"
159print lineSeparator
160print "RUNNING: oai_ingest.py"         
161
162# check for verbose option
163try:
164    opts, args = getopt.getopt(sys.argv[1:], "v")
165except getopt.GetoptError, err:
166    # print help information and exit:
167    print str(err) # will print something like "option -a not recognized"
168    usage()
169   
170loggingLevel = logging.WARNING
171for o, a in opts:
172    if o == "-v":
173        print " - Verbose mode ON"
174        loggingLevel = logging.DEBUG
175
176logging.basicConfig(level=loggingLevel,
177                                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
178
179print lineSeparator
180
181if (len(args) != 1):
182        usage()
183else:
184    datacentre = args[0]
185
186# set up the file utils to use this logger
187fileUtils = FileUtilities()
188
189status = 0
190numfilesproc = 0
191#base_dir = "/usr/local/WSClients/OAIBatch/" # this is the base dir that the script is ran from
192base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from
193       
194data_dir = base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
195
196#Change os directory to that with the harvested documents in it.
197os.chdir(base_dir)
198
199# - to run on Windows under cygwin, use the following
200#os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
201
202# set the global variables to retrieve from the config file
203harvest_home = ""
204datacentre_groups = ""
205datacentre_format = ""
206datacentre_namespace = ""
207NDG_dataProvider = False
208getConfigDetails(datacentre)
209
210#any records to harvest?
211if len( os.listdir(harvest_home)) == 0:
212    logging.info("Nothing to harvest this time from " + datacentre)
213    sys.exit()
214
215# The directory to put things for a tape backup (should already exist)
216backupdir = '/disks/glue1/oaiBackup/'
217
218# the following dirs define where the specific documents should go
219originals_dir = data_dir +"/oai/originals/"
220discovery_dir = data_dir +"/discovery/"
221
222# Create/clear the 'in' directory pristine copy of the discovery records
223fileUtils.setUpDir(originals_dir)
224commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} " + originals_dir
225#commandline = "find " + harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
226logging.info("Executing : " + commandline)
227status = os.system(commandline)
228
229#if status !=0:
230#    sys.exit("Failed at making pristine copy stage")
231
232# Create/clear the directory for the 'out' processed copy of the discovery records.
233fileUtils.setUpDir(discovery_dir)
234   
235# The file config.properties contains the location of the particular datacentres harvested records.
236# Copy the datacentre specific version of config to config.properties file.
237#commandline = "cp " + datacentre_config_filename + " " + base_dir + "config.properties"
238#logging.info("Executing : " + commandline)
239#status = os.system(commandline)
240#if status !=0:
241#    sys.exit("Failed at copying config file stage")
242
243#Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
244# - also replace any namespace declarations with a standard one which we know works in NDG
245# NB, this copies files from the original dir to the discovery dir
246logging.info(lineSeparator)
247logging.info("Renaming files:")
248for filename in os.listdir(originals_dir):
249        if filename.find('.xml') != -1:
250                original_filename = originals_dir + filename
251                ident=getID(original_filename)
252               
253                if NDG_dataProvider:
254                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
255                else:
256                                ident = ident.replace(":","-")
257                                ident = ident.replace("/","-")
258                                new_filename = discovery_dir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
259                                logging.info("original file = " + original_filename)
260                                logging.info("newfile = " + new_filename)
261               
262                # now correct any namespace issues
263                try:
264                    SchemaNameSpace(original_filename, new_filename, datacentre_format)
265                except:
266                        sys.exit("ERROR: SchemaNameSpace failed on file %s" %original_filename)
267                numfilesproc += 1
268        else:
269                logging.warning('File %s is not xml format. Not processed'  %(filename))
270
271logging.info(lineSeparator)
272
273# now set up the required XQueries
274# - NB, extract the xquery libraries locally for easy reference
275xq=ndgXqueries()
276for libFile in xq.xqlib:
277        fileUtils.createFile(libFile, xq.xqlib[libFile])
278
279# Process the resulting files and put the data into the postgres DB
280filenames = os.listdir(discovery_dir)
281for filename in filenames:
282        addFileToPostgresDB(discovery_dir + filename)
283
284#Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
285backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
286
287this_backupdir = backupdir_base + "_originals"
288fileUtils.makeBackUp(originals_dir, this_backupdir)
289
290#Clear out the original harvest records area and FINALMOLES
291fileUtils.cleanDir(originals_dir)
292fileUtils.cleanDir(discovery_dir)
293fileUtils.cleanDir(harvest_home)
294
295print lineSeparator
296print "INFO: No. of files pre-processed = %s" %numfilesproc
297if status == 0:
298    print "INFO: Procedure oai_ingest.py completed"
299else:
300    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
301print lineSeparator
Note: See TracBrowser for help on using the repository browser.