source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new.py @ 3817

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new.py@3817
Revision 3817, 19.3 KB checked in by cbyrom, 11 years ago (diff)

Add default logging support + create new version of ingest script, removing
all traces of the eXist DB + improve documentation and output.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26# 11/04/08 CByrom Tidy up script by organising code into reusable functions + variables
27# + remove dependency on eXist DB
28
29import os, sys, string, getopt
30from time import strftime
31import keywordAdder
32from SchemaNameSpace import SchemaNameSpace
33from DIF import DIF
34from MDIP import MDIP
35import ndgUtils
36from ndgUtils.ndgXqueries import ndgXqueries
37from ndgUtils.ndgObject import ndgObject
38import ConfigParser
39from ndgUtils.ndgDirectory import ndgDirectory
40from ndgUtils.eXistInterface import ndg_eXist
41from FileUtilities import FileUtilities
42from PostgresRecord import PostgresRecord
43import PostgresDAO
44import renderEntity
45from Logger import Logger
46
47       
48def getID(filename):
49        '''
50        Gets the identifier out of an input metadata xml record.
51        Copes with DIF and MDIP currently.
52        @param filename - name of document file being processed
53        @return: ID - id to use to refer to the document
54        '''
55        logger.printOutput("INFO: Retrieving identifier for metadata record " + filename)
56        xml=file(filename).read()
57        if datacentre_format == "DIF":
58            d=DIF(xml)
59            ID=d.entryID
60        elif datacentre_format == "MDIP":
61            d=MDIP(xml)
62            ID=d.id
63        else:
64            sys.exit("Only handles DIF or MDIP here.")
65
66        logger.printOutput("Found identifier: " + ID)
67        return ID
68
69
70def addFileToPostgresDB(filename):
71        '''
72        Add a file to the postgres DB - extracting and storing all the required
73        data in the process
74        '''
75        logger.printOutput("Adding file, " + filename + ", to postgres DB")
76        discoveryID = getID(filename)
77       
78        # NB, if we're dealing with an NDG data provider, the details are slightly different
79        if NDG_dataProvider:
80                discObj=ndgObject(discoveryID)
81        discoveryID = discObj.localID
82        datacentre_namespace = discObj.repository
83       
84        # first of all create a PostgresRecord - this object represents all the data required
85        # for a DB entry
86        record = PostgresRecord(filename, targetCollection, datacentre_namespace, discoveryID, xq, datacentre_format, logger)
87
88        # Now create the data access object to interface to the DB
89        dao = PostgresDAO(record)
90       
91        # Finally, write the new record
92        dao.createOrUpdateRecord()
93
94
95def getConfigDetails(datacentre):
96        '''
97        Get the harvested records directory and groups for this datacentre from the
98        datacentre specific config file.  The harvested records directory depends on the
99        datacentres OAI base url, the set and format. These have to be know up-front.
100        The groups denote which 'portal groups' they belong to - for limiting searches to
101        say NERC-only datacentres records.
102        Groups are added to the intermediate MOLES when it is created.
103        @param datacentre: datacentre to use when looking up config file
104        '''
105        # set the variables to use the global copies, not the local ones
106        global harvest_home, datacentre_groups, datacentre_format, datacentre_namespace
107        global datacentre_config_filename, NDG_dataProvider
108        datacentre_config_filename = base_dir + datacentre + "_config.properties"
109        logger.printOutput("INFO: Retrieving data from datacentre config file, " + datacentre_config_filename)
110       
111        # Check this file exists; if not, assume an invalid datacentre has been specified
112        if not os.path.isfile(datacentre_config_filename):
113            sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
114                "specified (%s) is invalid\n" %datacentre)
115           
116        datacentre_config_file = open(datacentre_config_filename, "r")
117       
118        for line in datacentre_config_file.readlines():
119            words  = string.split(line)
120            if len(words) == 0:
121                continue
122            if words[0] == 'host_path':
123                harvest_home = string.rstrip(words[1])
124            if words[0] == 'groups':
125                datacentre_groups = words[1:]
126            if words[0] == 'format':
127                datacentre_format = words[1]
128            if words[0] == 'namespace':
129                datacentre_namespace = words[1]
130            if words[0] == 'NDG_dataProvider':
131                NDG_dataProvider = True
132       
133        datacentre_config_file.close()
134       
135        if harvest_home == "":
136            sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
137       
138        logger.printOutput("INFO: harvested records are in " + harvest_home)
139       
140        if datacentre_groups == "":
141            logger.printOutput("INFO: No groups/keywords set for datacentre " + datacentre)
142        else:
143            logger.printOutput("INFO: datacentre groups/keywords: " + datacentre_groups)
144       
145        if datacentre_format == "":
146            sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
147       
148        logger.printOutput("INFO: format being harvested: " + datacentre_format)
149       
150        if datacentre_namespace == "":
151            sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
152       
153        logger.printOutput("INFO: datacentre namespace: " + datacentre_namespace)
154        print lineSeparator
155
156
157def ingestOriginalsToeXist():
158        '''
159        Put original docs into eXist DB - NB, this allows for easy transform via XQuery
160        into the other required doc types
161        '''
162        logger.printOutput("INFO: Putting original docs in eXist...")
163        # this command creates the targetCollection in eXist from the contents of the discovery_corrected_dir
164        commandline = "$EXIST_HOME/bin/client.sh -c " + targetCollection + " -u admin -P " + \
165                db_admin + " -p " + discovery_corrected_dir
166        logger.printOutput(lineSeparator)
167        logger.printOutput("INFO: Executing : actual command to ingest into exist db: " + commandline)
168        status = os.system(commandline)
169        if status !=0:
170            sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
171        logger.printOutput("INFO: Original docs now in eXist")
172        logger.printOutput(lineSeparator)
173
174
175def createEXistMiniMoles():
176        '''
177        Transform the docs in eXist into the mini moles format - to allow use of the various
178        existing xqueries to do the transforms from this format to others
179        '''
180        # First get the list of discovery record ids from the db collection
181        logger.printOutput("INFO: Creating minimoles documents from original docs in eXist")
182        #       if datacentre_format == 'DIF':
183   
184        ndgDir = ndgDirectory(targetCollection, host, datacentre_format.upper())
185    #   else:
186    #       print 'ERROR: mini-moles creation does not handle MDIP yet! So this WILL FAIL (probably)'
187       
188        #create the mini-moles for each Discovery record in the collection
189        for member in ndgDir.members:
190            filename= member['fileName']
191            disc_id = member['EntryID']
192            print "INFO: internal id = %s" %disc_id
193            print "INFO: discovery filename = %s" %filename
194            # now create the xquery
195            # sort out the output ID stuff ...
196            xQueryType = 'dif2moles'
197            if NDG_dataProvider:
198                discObj=ndgObject(disc_id)
199                xquery=xq.actual(xQueryType, targetCollection, discObj.repository, discObj.localID)
200                print "REPOSITORY: %s, DISC_ID: %s" %(discObj.repository, discObj.localID)
201            else:
202                xquery=xq.actual(xQueryType, targetCollection, datacentre_namespace, disc_id)
203
204            # and then sort out the input ID stuff
205            xquery=xquery.replace('Input_Entry_ID', disc_id)
206            xquery=xquery.replace('repository_localid', datacentre_namespace )
207
208            # Now do the transform
209            print "INFO: Running XQuery transform to create minimoles document"
210#           print xquery
211            molesid,s=xmldb.executeQuery(xquery)
212            print "molesid: %s, s: %s" %(molesid, s)
213            moles_from_dif=xmldb.retrieve(molesid,0)
214
215            minimoles_file = minimoles_dir + "/" + filename
216            print "INFO: Creating new minimoles file: ", minimoles_file 
217            f=open(minimoles_file,'w')
218            f.write(moles_from_dif)
219            f.close()
220           
221            filecount = 0
222            for root, dirs, files in os.walk(minimoles_dir):
223                filecount += len(files)
224               
225                if filecount == 0:
226                    print "ERROR: couldn't create any minimum moles records for %s" %datacentre
227                    sys.exit()
228               
229                #Add keywords if necessary
230                addKeywords()
231               
232        # ingest the created discovery minimum molesrecords into eXist db.
233        commandline = "$EXIST_HOME/bin/client.sh -c " + miniMolesCollection + " -u admin -P " + db_admin + \
234        " -p " + finalmoles_dir
235        print "INFO: Executing : actual command to ingest into exist db."
236        status = os.system(commandline)
237        if status !=0:
238            sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
239
240
241def createEXistCollections():
242        '''
243        Ensure all the required collections are set up in the eXist DB
244        '''
245        commandline = "$EXIST_HOME/bin/client.sh -c /db " + \
246                " -u admin -P " + db_admin + " -m "
247        collections = ['/db/xqueryLib', '/db/discovery/moles']
248        logger.printOutput(lineSeparator)
249        for collection in collections:
250                cmd = commandline + collection + " -p"
251                logger.printOutput("INFO: Executing : actual command to create DB collection: " +  cmd)
252                status = os.system(cmd)
253                print status
254                if status !=0:
255                        sys.exit("Failed to create DB collection. Status = %s" %(status))
256       
257       
258def addKeywords():
259        if datacentre_groups == "":
260            commandline = "ls -1 " + minimoles_dir + " | xargs -i mv " + minimoles_dir + \
261                "{\} " + finalmoles_dir
262#           commandline = "find " + minimoles_dir + " -type f -print | xargs -i mv {\} " + finalmoles_dir
263            logger.printOutput("INFO: Executing : " + commandline)
264            status = os.system(commandline)
265            if status !=0:
266                sys.exit("Failed at moving MOLES to FINAL directory")
267        else:
268            keywordAdder.main(minimoles_dir, finalmoles_dir, datacentre_groups)
269
270                       
271def addXQueriesToeXist():
272        '''
273        The current set of XQueries files refer to some xquery libraries - which are stored in eXist
274        - add them to the DB for use elsewhere
275        '''
276        commandline = "$EXIST_HOME/bin/client.sh -c /db " + \
277                " -u admin -P " + db_admin + " -p " + base_dir + 'xquery'
278        logger.printOutput(lineSeparator)
279        logger.printOutput("INFO: Executing : actual command to ingest into exist db: " + commandline)
280        status = os.system(commandline)
281        if status !=0:
282            sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
283       
284
285def usage():
286        '''
287        Display input params for the script
288        '''
289        print "Usage: python -v oai_ingest.py <datacentre> (<dbInfoFile>)"
290        print " - where:\n   <datacentre> is the data centre to ingest data from; and"
291        print "   <dbInfoFile> provides info on the eXist DB to use to do XQuery transforms"
292        print " -v - verbose mode for output logging"
293        sys.exit(2)
294       
295
296lineSeparator = "-----------------------------"
297print lineSeparator
298print "RUNNING: oai_ingest.py"         
299
300verboseMode = False
301
302# check for verbose option
303try:
304    opts, args = getopt.getopt(sys.argv[1:], "v")
305except getopt.GetoptError, err:
306    # print help information and exit:
307    print str(err) # will print something like "option -a not recognized"
308    usage()
309   
310for o, a in opts:
311    if o == "-v":
312        print " - Verbose mode ON"
313        verboseMode = True
314print lineSeparator
315
316if (len(args) < 1 or len(args) > 2):
317        usage()
318else:
319    datacentre = args[0]
320
321# set the default password file
322dbinfoname = "passwords.txt"
323if (len(args) == 2):
324        dbinfoname = args[1]
325
326# set up the logger to use
327logger = Logger(verboseMode)
328
329# set up the file utils to use this logger
330fileUtils = FileUtilities(logger)
331
332status = 0
333numfilesproc = 0
334#base_dir = "/usr/local/WSClients/OAIBatch/" # this is the base dir that the script is ran from
335base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from
336       
337data_dir = base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
338
339#Change os directory to that with the harvested documents in it.
340os.chdir(base_dir)
341
342# Other settings and constants
343#os.putenv('EXIST_HOME', '/usr/local/exist-client')
344os.putenv('EXIST_HOME', '/home/users/cbyrom/opt/eXist')
345#os.putenv('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.:\\opt\\cygwin\\bin')
346
347# - to run on Windows under cygwin, use the following
348os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
349# set the global variables to retrieve from the config file
350harvest_home = ""
351datacentre_groups = ""
352datacentre_format = ""
353datacentre_namespace = ""
354NDG_dataProvider = False
355getConfigDetails(datacentre)
356
357#any records to harvest?
358if len( os.listdir(harvest_home)) == 0:
359    logger.printOutput("INFO: Nothing to harvest this time from " + datacentre)
360    sys.exit()
361
362# get the db access info from the specified input file
363dbaccess={}
364dbinfo_file=open(dbinfoname,"r")
365for line in dbinfo_file.readlines():
366    words  = string.split(line)
367    if len(words) < 2:
368        continue
369    dbaccess[(words[0],words[1])] = [words[2]]
370dbinfo_file.close()
371#host ='glue.badc.rl.ac.uk'
372host ='localhost'
373try:
374        db_admin = dbaccess[(host,'admin')][0]
375except:
376        sys.exit("ERROR: Could not find details for %s DB user 'admin' in DB config file\n" %host)
377
378# The directory to put things for a tape backup (should already exist)
379backupdir = '/disks/glue1/oaiBackup/'
380
381# the following dirs define where the specific documents should go
382originals_dir = data_dir +"/oai/originals/"
383discovery_dir = data_dir +"/discovery/"
384discovery_corrected_dir = data_dir +"/discovery_corrected/"
385
386# Create/clear the 'in' directory pristine copy of the discovery records
387fileUtils.setUpDir(originals_dir)
388commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} " + originals_dir
389#commandline = "find " + harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
390logger.printOutput("INFO: Executing : " + commandline)
391status = os.system(commandline)
392
393if status !=0:
394    sys.exit("Failed at making pristine copy stage")
395
396# Create/clear the directory for the 'out' processed copy of the discovery records.
397fileUtils.setUpDir(discovery_dir)
398   
399# Create/clear the directory for the 'out' namespace corrected copy of the discovery records.
400fileUtils.setUpDir(discovery_corrected_dir)
401
402# The file config.properties contains the location of the particular datacentres harvested records.
403# Copy the datacentre specific version of config to config.properties file.
404commandline = "cp " + datacentre_config_filename + " " + base_dir + "config.properties"
405logger.printOutput("INFO: Executing : " + commandline)
406status = os.system(commandline)
407if status !=0:
408    sys.exit("Failed at copying config file stage")
409
410#Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
411# NB, this copies files from the original dir to the discovery dir
412logger.printOutput(lineSeparator)
413logger.printOutput("INFO: Renaming files:")
414for filename in os.listdir(originals_dir):
415        if filename.find('.xml') != -1:
416                original_filename = originals_dir + filename
417                ident=getID(original_filename)
418                if NDG_dataProvider:
419                    new_filename = discovery_dir + ident.replace(":","__")+".xml"
420                else:
421                    ident = ident.replace(":","-")
422                    ident = ident.replace("/","-")
423                    new_filename = discovery_dir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
424 
425                    logger.printOutput("original file = " + original_filename)
426                    logger.printOutput("newfile = " + new_filename)
427               
428                try:
429                        os.rename(original_filename, new_filename)
430                except:
431                        sys.exit("ERROR: Failed to rename file %s to %s" %(original_filename, new_filename))
432                numfilesproc += 1
433        else:
434                print 'WARNING: File %s is not xml format. Not processed'  %(filename)
435
436#replace any namespace declarations with a standard one which we know works in NDG
437# NB, this copies files from the discovery dir to the discovery corrected dir
438logger.printOutput(lineSeparator)
439logger.printOutput("INFO: Correcting namespaces of files (corrected files in "  + \
440                    discovery_corrected_dir + ")")
441for filename in os.listdir(discovery_dir):
442        if filename.find('.xml') != -1:
443                    in_filename = discovery_dir + filename
444                    corrected_filename = discovery_corrected_dir + filename
445                    try:
446                       SchemaNameSpace(in_filename, corrected_filename, datacentre_format, logger)
447                    except:
448                       print "ERROR: SchemaNameSpace failed on file %s"%in_filename
449
450# put the original docs in eXist - as the targetCollection
451# NB, firstly ensure the appropriate collections are set up
452createEXistCollections()
453
454# now set up the required XQueries and check xqueries libs are available from eXist
455targetCollection = "/db/discovery/original/" +datacentre_format+ "/" +datacentre_namespace
456miniMolesCollection = "/db/discovery/moles"
457xq=ndgXqueries()
458xmldb=ndg_eXist(db='' + host + '')
459
460addXQueriesToeXist()
461
462ingestOriginalsToeXist()
463
464# now transform these to minimoles format
465minimoles_dir = base_dir + 'MINIMOLES/'
466finalmoles_dir = base_dir + 'FINALMOLES/'
467fileUtils.setUpDir(minimoles_dir)
468fileUtils.setUpDir(finalmoles_dir)
469
470createEXistMiniMoles()
471
472
473# Process the resulting files and put the data into the postgres DB
474filenames = os.listdir(discovery_corrected_dir)
475for filename in filenames:
476        addFileToPostgresDB(discovery_corrected_dir + filename)
477
478#Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
479backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
480
481this_backupdir = backupdir_base + "_originals"
482fileUtils.makeBackUp(originals_dir, this_backupdir)
483
484this_backupdir = backupdir_base + "_discovery"
485fileUtils.makeBackUp(discovery_dir, this_backupdir)
486
487this_backupdir = backupdir_base + "_FINALMOLES"
488fileUtils.makeBackUp("./FINALMOLES", this_backupdir)
489
490#Clear out the original harvest records area and FINALMOLES
491fileUtils.cleanDir("./FINALMOLES")
492fileUtils.cleanDir(harvest_home)
493
494print "======================================================"
495print "No. of files pre-processed = %s" %numfilesproc
496if status == 0:
497    print " Procedure oai_ingest.py ran to end"
498else:
499    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
500
501print "======================================================"
Note: See TracBrowser for help on using the repository browser.