source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new.py @ 3810

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new.py@3810
Revision 3810, 18.7 KB checked in by cbyrom, 12 years ago (diff)

Create Logger class to standardise output - and allow this to be
switched on/off with a verbose flag.
Implement the logger in several other classes + extend available logging in these.
Also, add a 'usage' method to oai_ingest_new to show proper usage of script.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26# 11/04/08 CByrom Tidy up script by organising code into reusable functions + variables
27# + remove dependency on eXist DB
28
29import os, sys, commands, string, getopt
30import keywordAdder
31from SchemaNameSpace import SchemaNameSpace
32from DIF import DIF
33from MDIP import MDIP
34import ndgUtils
35from ndgUtils.ndgXqueries import ndgXqueries
36from ndgUtils.ndgObject import ndgObject
37import ConfigParser
38from ndgUtils.ndgDirectory import ndgDirectory
39from ndgUtils.eXistInterface import ndg_eXist
40from FileUtilities import FileUtilities
41from PostgresRecord import PostgresRecord
42import PostgresDAO
43import renderEntity
44from Logger import Logger
45
46       
47def getID(filename):
48        '''
49        Gets the identifier out of an input metadata xml record.
50        Copes with DIF and MDIP currently.
51        @param filename - name of document file being processed
52        @return: ID - id to use to refer to the document
53        '''
54        logger.printOutput("INFO: Retrieving identifier for metadata record " + filename)
55        xml=file(filename).read()
56        if datacentre_format == "DIF":
57            d=DIF(xml)
58            ID=d.entryID
59        elif datacentre_format == "MDIP":
60            d=MDIP(xml)
61            ID=d.id
62        else:
63            sys.exit("Only handles DIF or MDIP here.")
64           
65        logger.printOutput("Found identifier: " + ID)
66        return ID
67
68
69def addFileToPostgresDB(filename):
70        '''
71        Add a file to the postgres DB - extracting and storing all the required
72        data in the process
73        '''
74        logger.printOutput("Adding file, " + filename + ", to postgres DB")
75       
76        # first of all create a PostgresRecord - this object represents all the data required
77        # for a DB entry
78        record = PostgresRecord(filename, NDG_dataProvider, targetCollection, datacentre_namespace, 'discovery_idTEST', xq, datacentre_format)
79        dao = PostgresDAO(record)
80        dao.createOrUpdateRecord()
81
82
83def getConfigDetails(datacentre):
84        '''
85        Get the harvested records directory and groups for this datacentre from the
86        datacentre specific config file.  The harvested records directory depends on the
87        datacentres OAI base url, the set and format. These have to be know up-front.
88        The groups denote which 'portal groups' they belong to - for limiting searches to
89        say NERC-only datacentres records.
90        Groups are added to the intermediate MOLES when it is created.
91        @param datacentre: datacentre to use when looking up config file
92        '''
93        # set the variables to use the global copies, not the local ones
94        global harvest_home, datacentre_groups, datacentre_format, datacentre_namespace
95        global datacentre_config_filename, NDG_dataProvider
96        datacentre_config_filename = base_dir + datacentre + "_config.properties"
97        logger.printOutput("INFO: Retrieving data from datacentre config file, " + datacentre_config_filename)
98       
99        # Check this file exists; if not, assume an invalid datacentre has been specified
100        if not os.path.isfile(datacentre_config_filename):
101            sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
102                "specified (%s) is invalid\n" %datacentre)
103           
104        datacentre_config_file = open(datacentre_config_filename, "r")
105       
106        for line in datacentre_config_file.readlines():
107            words  = string.split(line)
108            if len(words) == 0:
109                continue
110            if words[0] == 'host_path':
111                harvest_home = string.rstrip(words[1])
112            if words[0] == 'groups':
113                datacentre_groups = words[1:]
114            if words[0] == 'format':
115                datacentre_format = words[1]
116            if words[0] == 'namespace':
117                datacentre_namespace = words[1]
118            if words[0] == 'NDG_dataProvider':
119                NDG_dataProvider = True
120       
121        datacentre_config_file.close()
122       
123        if harvest_home == "":
124            sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
125       
126        logger.printOutput("INFO: harvested records are in " + harvest_home)
127       
128        if datacentre_groups == "":
129            logger.printOutput("INFO: No groups/keywords set for datacentre " + datacentre)
130        else:
131            logger.printOutput("INFO: datacentre groups/keywords: " + datacentre_groups)
132       
133        if datacentre_format == "":
134            sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
135       
136        logger.printOutput("INFO: format being harvested: " + datacentre_format)
137       
138        if datacentre_namespace == "":
139            sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
140       
141        logger.printOutput("INFO: datacentre namespace: " + datacentre_namespace)
142        print lineSeparator
143
144
145def ingestOriginalsToeXist():
146        '''
147        Put original docs into eXist DB - NB, this allows for easy transform via XQuery
148        into the other required doc types
149        '''
150        logger.printOutput("INFO: Putting original docs in eXist...")
151        # this command creates the targetCollection in eXist from the contents of the discovery_corrected_dir
152        commandline = "$EXIST_HOME/bin/client.sh -c " + targetCollection + " -u admin -P " + \
153                db_admin + " -p " + discovery_corrected_dir
154        logger.printOutput(lineSeparator)
155        logger.printOutput("INFO: Executing : actual command to ingest into exist db: " + commandline)
156        status = os.system(commandline)
157        if status !=0:
158            sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
159        logger.printOutput("INFO: Original docs now in eXist")
160        logger.printOutput(lineSeparator)
161
162
163def createEXistMiniMoles():
164        '''
165        Transform the docs in eXist into the mini moles format - to allow use of the various
166        existing xqueries to do the transforms from this format to others
167        '''
168        # First get the list of discovery record ids from the db collection
169        logger.printOutput("INFO: Creating minimoles documents from original docs in eXist")
170        #       if datacentre_format == 'DIF':
171   
172        ndgDir = ndgDirectory(targetCollection, host, datacentre_format.upper())
173    #   else:
174    #       print 'ERROR: mini-moles creation does not handle MDIP yet! So this WILL FAIL (probably)'
175       
176        #create the mini-moles for each Discovery record in the collection
177        for member in ndgDir.members:
178            filename= member['fileName']
179            disc_id = member['EntryID']
180            print "INFO: internal id = %s" %disc_id
181            print "INFO: discovery filename = %s" %filename
182            # now create the xquery
183            # sort out the output ID stuff ...
184            xQueryType = 'dif2moles'
185            if NDG_dataProvider:
186                discObj=ndgObject(disc_id)
187                xquery=xq.actual(xQueryType, targetCollection, discObj.repository, discObj.localID)
188                print "REPOSITORY: %s, DISC_ID: %s" %(discObj.repository, discObj.localID)
189            else:
190                xquery=xq.actual(xQueryType, targetCollection, datacentre_namespace, disc_id)
191
192            # and then sort out the input ID stuff
193            xquery=xquery.replace('Input_Entry_ID', disc_id)
194            xquery=xquery.replace('repository_localid', datacentre_namespace )
195
196            # Now do the transform
197            print "INFO: Running XQuery transform to create minimoles document"
198            molesid,s=xmldb.executeQuery(xquery)
199            moles_from_dif=xmldb.retrieve(molesid,0)
200
201            minimoles_file = minimoles_dir + "/" + filename
202            print "INFO: Creating new minimoles file: ", minimoles_file 
203            f=open(minimoles_file,'w')
204            f.write(moles_from_dif)
205            f.close()
206           
207            filecount = 0
208            for root, dirs, files in os.walk(minimoles_dir):
209                filecount += len(files)
210               
211                if filecount == 0:
212                    print "ERROR: couldn't create any minimum moles records for %s" %datacentre
213                    sys.exit()
214               
215                #Add keywords if necessary
216                addKeywords()
217               
218        # ingest the created discovery minimum molesrecords into eXist db.
219        commandline = "$EXIST_HOME/bin/client.sh -c " + miniMolesCollection + " -u admin -P " + db_admin + \
220        " -p " + finalmoles_dir
221        print "INFO: Executing : actual command to ingest into exist db."
222        status = os.system(commandline)
223        if status !=0:
224            sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
225
226
227def createEXistCollections():
228        '''
229        Ensure all the required collections are set up in the eXist DB
230        '''
231        commandline = "$EXIST_HOME/bin/client.sh -c /db " + \
232                " -u admin -P " + db_admin + " -m "
233        collections = ['/db/xqueryLib', '/db/discovery/moles']
234        logger.printOutput(lineSeparator)
235        for collection in collections:
236                cmd = commandline + collection + " -p"
237                logger.printOutput("INFO: Executing : actual command to create DB collection: " +  cmd)
238                status = os.system(cmd)
239                print status
240                if status !=0:
241                        sys.exit("Failed to create DB collection. Status = %s" %(status))
242       
243       
244def addKeywords():
245        if datacentre_groups == "":
246            commandline = "ls -1 " + minimoles_dir + " | xargs -i mv " + minimoles_dir + \
247                "{\} " + finalmoles_dir
248            logger.printOutput("INFO: Executing : " + commandline)
249            status = os.system(commandline)
250            if status !=0:
251                sys.exit("Failed at moving MOLES to FINAL directory")
252        else:
253            keywordAdder.main(minimoles_dir, finalmoles_dir, datacentre_groups)
254
255                       
256def addXQueriesToeXist():
257        '''
258        The current set of XQueries files refer to some xquery libraries - which are stored in eXist
259        - add them to the DB for use elsewhere
260        '''
261        commandline = "$EXIST_HOME/bin/client.sh -c /db " + \
262                " -u admin -P " + db_admin + " -p " + base_dir + 'xquery'
263        logger.printOutput(lineSeparator)
264        logger.printOutput("INFO: Executing : actual command to ingest into exist db: " + commandline)
265        status = os.system(commandline)
266        if status !=0:
267            sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
268       
269
270def usage():
271        '''
272        Display input params for the script
273        '''
274        print "Usage: python -v oai_ingest.py <datacentre> (<dbInfoFile>)"
275        print " - where:\n   <datacentre> is the data centre to ingest data from; and"
276        print "   <dbInfoFile> provides info on the eXist DB to use to do XQuery transforms"
277        print " -v - verbose mode for output logging"
278        sys.exit(2)
279       
280
281lineSeparator = "-----------------------------"
282print lineSeparator
283print "RUNNING: oai_ingest.py"         
284print lineSeparator
285
286verboseMode = False
287
288# check for verbose option
289try:
290    opts, args = getopt.getopt(sys.argv[1:], "v")
291except getopt.GetoptError, err:
292    # print help information and exit:
293    print str(err) # will print something like "option -a not recognized"
294    usage()
295   
296for o, a in opts:
297    if o == "-v":
298        verboseMode = True
299
300if (len(args) < 1 or len(args) > 2):
301        usage()
302else:
303    datacentre = args[0]
304
305# set the default password file
306dbinfoname = "ingest.txt"
307if (len(args) == 2):
308        dbinfoname = args[1]
309
310# set up the logger to use
311logger = Logger(verboseMode)
312
313# set up the file utils to use this logger
314fileUtils = FileUtilities(logger)
315
316status = 0
317numfilesproc = 0
318#base_dir = "/usr/local/WSClients/OAIBatch/" # this is the base dir that the script is ran from
319base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from
320       
321data_dir = base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
322
323#Change os directory to that with the harvested documents in it.
324os.chdir(base_dir)
325
326# Other settings and constants
327date_string = commands.getoutput("date +'%y%m%d_%H%M'")
328 
329#os.putenv('EXIST_HOME', '/usr/local/exist-client')
330os.putenv('EXIST_HOME', '/home/users/cbyrom/opt/eXist')
331os.putenv('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.')
332
333# set the global variables to retrieve from the config file
334harvest_home = ""
335datacentre_groups = ""
336datacentre_format = ""
337datacentre_namespace = ""
338NDG_dataProvider = False
339getConfigDetails(datacentre)
340
341#any records to harvest?
342if len( os.listdir(harvest_home)) == 0:
343    logger.printOutput("INFO: Nothing to harvest this time from " + datacentre)
344    sys.exit()
345
346# get the db access info from the specified input file
347dbaccess={}
348dbinfo_file=open(dbinfoname,"r")
349for line in dbinfo_file.readlines():
350    words  = string.split(line)
351    if len(words) < 2:
352        continue
353    dbaccess[(words[0],words[1])] = [words[2]]
354dbinfo_file.close()
355#host ='glue.badc.rl.ac.uk'
356host ='localhost'
357try:
358        db_admin = dbaccess[(host,'admin')][0]
359except:
360        sys.exit("ERROR: Could not find details for %s DB user 'admin' in DB config file\n" %host)
361
362# The directory to put things for a tape backup (should already exist)
363backupdir = '/disks/glue1/oaiBackup/'
364
365# the following dirs define where the specific documents should go
366originals_dir = data_dir +"/oai/originals/"
367discovery_dir = data_dir +"/discovery/"
368discovery_corrected_dir = data_dir +"/discovery_corrected/"
369
370# Create/clear the 'in' directory pristine copy of the discovery records
371fileUtils.setUpDir(originals_dir)
372commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} " + originals_dir
373logger.printOutput("INFO: Executing : " + commandline)
374status = os.system(commandline)
375
376if status !=0:
377    sys.exit("Failed at making pristine copy stage")
378
379# Create/clear the directory for the 'out' processed copy of the discovery records.
380fileUtils.setUpDir(discovery_dir)
381   
382# Create/clear the directory for the 'out' namespace corrected copy of the discovery records.
383fileUtils.setUpDir(discovery_corrected_dir)
384
385# The file config.properties contains the location of the particular datacentres harvested records.
386# Copy the datacentre specific version of config to config.properties file.
387commandline = "cp " + datacentre_config_filename + " " + base_dir + "config.properties"
388logger.printOutput("INFO: Executing : " + commandline)
389status = os.system(commandline)
390if status !=0:
391    sys.exit("Failed at copying config file stage")
392
393#Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
394# NB, this copies files from the original dir to the discovery dir
395logger.printOutput(lineSeparator)
396logger.printOutput("INFO: Renaming files:")
397for filename in os.listdir(originals_dir):
398        if filename.find('.xml') != -1:
399                original_filename = originals_dir + filename
400                ident=getID(original_filename)
401                if NDG_dataProvider:
402                    new_filename = discovery_dir + ident.replace(":","__")+".xml"
403                else:
404                    ident = ident.replace(":","-")
405                    ident = ident.replace("/","-")
406                    new_filename = discovery_dir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
407 
408                    logger.printOutput("original file = " + original_filename)
409                    logger.printOutput("newfile = " + new_filename)
410               
411                try:
412                        os.rename(original_filename, new_filename)
413                except:
414                        sys.exit("ERROR: Failed to rename file %s to %s" %(original_filename, new_filename))
415                numfilesproc += 1
416        else:
417                print 'WARNING: File %s is not xml format. Not processed'  %(filename)
418
419#replace any namespace declarations with a standard one which we know works in NDG
420# NB, this copies files from the discovery dir to the discovery corrected dir
421logger.printOutput(lineSeparator)
422logger.printOutput("INFO: Correcting namespaces of files (corrected files in "  + \
423                    discovery_corrected_dir + ")")
424for filename in os.listdir(discovery_dir):
425        if filename.find('.xml') != -1:
426                    in_filename = discovery_dir + filename
427                    corrected_filename = discovery_corrected_dir + filename
428                    try:
429                       SchemaNameSpace(in_filename, corrected_filename,datacentre_format, logger)
430                    except:
431                       print "ERROR: SchemaNameSpace failed on file %s"%in_filename
432
433# put the original docs in eXist - as the targetCollection
434# NB, firstly ensure the appropriate collections are set up
435createEXistCollections()
436
437# now set up the required XQueries and check xqueries libs are available from eXist
438targetCollection = "/db/discovery/original/" +datacentre_format+ "/" +datacentre_namespace
439miniMolesCollection = "/db/discovery/moles"
440xq=ndgXqueries()
441xmldb=ndg_eXist(db='' + host + '')
442
443addXQueriesToeXist()
444
445ingestOriginalsToeXist()
446
447# now transform these to minimoles format
448minimoles_dir = base_dir + 'MINIMOLES/'
449finalmoles_dir = base_dir + 'FINALMOLES/'
450fileUtils.setUpDir(minimoles_dir)
451fileUtils.setUpDir(finalmoles_dir)
452createEXistMiniMoles()
453
454# TODO: need to skip the eXist steps below and just populate the postGres DB
455# 2. use the correct xquery to transform the original doc directly and store t
456
457
458# Process the resulting files and put the data into the postgres DB
459filenames = os.listdir(discovery_corrected_dir)
460for filename in filenames:
461        addFileToPostgresDB(discovery_corrected_dir + filename)
462
463#Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
464backupdir_base = backupdir + datacentre + "_" + date_string
465this_backupdir = backupdir_base + "_originals"
466fileUtils.makeBackUp(originals_dir, this_backupdir)
467
468this_backupdir = backupdir_base + "_discovery"
469fileUtils.makeBackUp(discovery_dir, this_backupdir)
470
471this_backupdir = backupdir_base + "_FINALMOLES"
472fileUtils.makeBackUp("./FINALMOLES", this_backupdir)
473
474#Clear out the original harvest records area and FINALMOLES
475fileUtils.cleanDir("./FINALMOLES")
476fileUtils.cleanDir(harvest_home)
477
478print "======================================================"
479print "No. of files pre-processed = %s" %numfilesproc
480if status == 0:
481    print " Procedure oai_ingest.py ran to end"
482else:
483    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
484
485print "======================================================"
Note: See TracBrowser for help on using the repository browser.