source: TI01-discovery/trunk/ingestAutomation/OAIBatch/oai_ingest.py @ 3785

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/trunk/ingestAutomation/OAIBatch/oai_ingest.py@3785
Revision 3785, 16.7 KB checked in by selatham, 11 years ago (diff)

gather required utilities. improve elementree imports in keywordAdder. put host etc in arguments for oai_ingest.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile> <existhost> <backupdir>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26
27import os
28import sys
29import commands
30import string
31import SpaceTimeIngestFromMOLES
32import keywordAdder
33from SchemaNameSpace import SchemaNameSpace
34from DIF import DIF
35from MDIP import MDIP
36import ndgUtils
37from ndgUtils.ndgXqueries import ndgXqueries
38from ndgUtils.eXistInterface import ndg_eXist
39from ndgUtils.ndgObject import ndgObject
40import ConfigParser
41from xmlrpclib import Fault
42from ndgUtils.elementtree import ElementTree as ET
43from ndgUtils.ndgDirectory import ndgDirectory
44
45def getID(filename):
46        ''' Gets the identifier out of an input metadata xml record. Copes with DIF and MDIP currently.'''
47        xml=file(filename).read()
48        if datacentre_format == "DIF":
49            d=DIF(xml)
50            ID=d.entryID
51        elif datacentre_format == "MDIP":
52            d=MDIP(xml)
53            ID=d.id
54        else:
55            sys.exit("Only handles DIF or MDIP here.")
56        return ID
57
58status = 0
59numfilesproc = 0
60harvest_home = ""
61datacentre_groups = ""
62datacentre_format = ""
63datacentre_namespace = ""
64NDG_dataProvider = False
65
66if (len(sys.argv) < 5):
67    print "ERROR: <datacentre> or <db info file> or <existhost> or <backupdir> parameter not supplied."
68    sys.exit()
69else:
70    datacentre = sys.argv[1]
71    dbinfoname = sys.argv[2]
72    existhost = sys.argv[3]
73    # The directory to put things for a tape backup (should already exist)
74    backupdir = sys.argv[4]
75    #backupdir = '/disks/glue1/oaiBackup/'
76
77#Change os directory to that with the code in it.
78os.chdir('/usr/local/WSClients/OAIBatch')
79
80# Other settings and constants
81date_string = commands.getoutput ("date +'%y%m%d_%H%M'")
82os.putenv ('EXIST_HOME', '/usr/local/exist-client')
83#os.putenv ('JAVA_HOME', '/usr/java/jdk1.5.0_03')
84os.putenv ('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.')
85#os.putenv ('CLASSPATH','.:/usr/java/jdk1.5.0_03/lib/tools.jar')
86
87#Xquery settings
88xq=ndgXqueries()
89xmldb=ndg_eXist(db=existhost)
90
91# Get the harvested records directory and groups for this datacentre from the datacentre specific config file
92# The harvested records directory depends on the datacentres OAI base url, the set and  format. These have to be know up-front.
93# The groups denote which 'portal groups' they belong to - for limiting searches to say NERC-only datacentres records.
94# Groups are added to the intermediate MOLES when it is created.
95datacentre_config_filename = "/usr/local/WSClients/OAIBatch/" + datacentre + "_config.properties"
96print "INFO: Datacentre config file = %s" %datacentre_config_filename
97datacentre_config_file = open(datacentre_config_filename, "r")
98
99for line in datacentre_config_file.readlines():
100    words  = string.split(line)
101    if len(words) == 0:
102        continue
103    if words[0] == 'host_path':
104        harvest_home = string.rstrip(words[1])
105    if words[0] == 'groups':
106        datacentre_groups = words[1:]
107    if words[0] == 'format':
108        datacentre_format = words[1]
109    if words[0] == 'namespace':
110        datacentre_namespace = words[1]
111    if words[0] == 'NDG_dataProvider':
112        NDG_dataProvider = True
113
114datacentre_config_file.close()
115
116if harvest_home == "":
117    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
118else:
119    print "INFO: harvested records are in %s" %harvest_home
120
121if datacentre_groups == "":
122    print "INFO: No groups/keywords set for datacentre %s" %datacentre
123else:
124    print "INFO: datacentre groups/keywords = %s" %datacentre_groups
125
126if datacentre_format == "":
127    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
128else:
129    print "INFO: format being harvested = %s" %datacentre_format
130
131if datacentre_namespace == "":
132    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
133else:
134    print "INFO: datacentre namespace = %s" %datacentre_namespace
135
136#any records to harvest?
137if len( os.listdir(harvest_home)) == 0:
138    print "INFO: Nothing to harvest this time from %s" %datacentre
139    sys.exit()
140
141# get the exist db access info
142#host ='glue.badc.rl.ac.uk'
143dbaccess={}
144dbinfo_file=open(dbinfoname,"r")
145for line in dbinfo_file.readlines():
146    words  = string.split(line)
147    if len(words) < 2:
148        continue
149    dbaccess[(words[0],words[1])] = [words[2]]
150dbinfo_file.close()
151#print dbaccess
152db_admin = dbaccess[(existhost,'admin')][0]
153#print db_admin
154
155# Create/clear the 'in' directory pristine copy of the discovery records
156if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"):
157    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/{\}"
158    print "INFO: Executing : " + commandline
159    status = os.system(commandline)
160else:
161    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"
162    print "INFO: Executing : " + commandline
163    status= os.system(commandline)
164
165if status != 0:
166    sys.exit("Failed at creating copy dir stage")
167
168# make the 'in' pristine copy. Cope with there being lots of files in the directory.
169
170commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} /usr/local/WSClients/OAIBatch/data/" + datacentre + "/oai/originals"
171print "INFO: Executing : " + commandline
172status = os.system(commandline)
173if status !=0:
174    sys.exit("Failed at making pristine copy stage")
175
176# Create/clear the directory for the 'out' processed copy of the discovery records.
177if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"):
178    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/{\}"
179    print "INFO: Executing : " + commandline
180    status = os.system(commandline)
181else:
182    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
183    print "INFO: Executing : " + commandline
184    status= os.system(commandline)
185
186# Create/clear the directory for the 'out' namespace corrected copy of the discovery records.
187if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"):
188    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected/{\}"
189    print "INFO: Executing : " + commandline
190    status = os.system(commandline)
191else:
192    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"
193    print "INFO: Executing : " + commandline
194    status= os.system(commandline)
195
196# The file config.properties contains the location of the particular datacentres harvested records.
197# Copy the datacentre specific version of config to config.properties file.
198commandline = "cp /usr/local/WSClients/OAIBatch/" + datacentre +"_config.properties /usr/local/WSClients/OAIBatch/config.properties"
199print "INFO: Executing : " + commandline
200status = os.system(commandline)
201if status !=0:
202    sys.exit("Failed at copying config file stage")
203
204#Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
205indir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"
206outdir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
207#wrapFlag=False
208filenames = os.listdir(indir)
209for filename in filenames:
210        if filename.find('.xml') != -1:
211                original_filename = indir + "/" + filename
212                ident=getID(original_filename)
213                print "INFO: ID extracted from the discovery record = %s" %ident
214                if NDG_dataProvider:
215                    new_filename = outdir + "/"+ ident.replace(":","__")+".xml"
216                else:
217                    ident = ident.replace(":","-")
218                    ident = ident.replace("/","-")
219                    new_filename = outdir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
220                print "INFO: original file = %s, newfile = %s" %(original_filename, new_filename)
221                commandline = "cp "+original_filename+ " " +new_filename
222                #print "Executing : " + commandline
223                status = os.system(commandline)
224                if status !=0:
225                    sys.exit("ERROR: Failed at re-naming file stage")
226                numfilesproc += 1
227        else:
228                print 'WARNING: File %s is not xml format. Not processed'  %(filename)
229
230#replace any namespace declarations with a standard one which we know works in NDG
231indir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
232outdir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"
233filenames = os.listdir(indir)
234for filename in filenames:
235        if filename.find('.xml') != -1:
236                    in_filename = indir + "/" + filename
237                    corrected_filename = outdir + "/" + filename
238                    try:
239                       SchemaNameSpace(in_filename, corrected_filename,datacentre_format)
240                    except:
241                       print "ERROR: SchemaNameSpace failed on file %s"%in_filename
242
243# ingest the datacentres records into eXist db (backups of exist happen nightly).
244commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/original/"+datacentre_format+ "/" +datacentre_namespace+ " -u admin -P "+db_admin+" -p "+outdir
245print "INFO: Executing : actual command to ingest into exist db"
246status = os.system(commandline)
247if status !=0:
248    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
249
250#are there any old moles records hanging around.If so, remove.
251try:
252   os.stat("./DIF2MOLES")
253except:
254   print "INFO: No old moles records hanging around"
255else:
256   commandline = "ls -1 ./DIF2MOLES | xargs -i rm ./DIF2MOLES/{\}"
257   print "INFO: Executing : " + commandline
258   status = os.system(commandline)
259   if status !=0:
260       sys.exit("ERROR: Failed at clearing out DIF2MOLES area.")
261
262# Then run the minimum moles creator for each discovery record
263# Put records in ./DIF2MOLES with same filename
264
265# First get the list of discovery record ids from the db collection
266targetCollection = "/db/discovery/original/"+datacentre_format+ "/" +datacentre_namespace
267if datacentre_format == 'DIF':
268    ndgDir=ndgDirectory(targetCollection,existhost,docType='DIF')
269    #print ndgDir.members
270else:
271    print 'ERROR: mini-moles creation does not handle MDIP yet! So this WILL FAIL (probably)'
272
273
274#create the mini-moles for each Discovery record in the collection
275for member in ndgDir.members:
276    #print member
277    filename= member['fileName']
278    disc_id = member['EntryID']
279    print "INFO: internal id = %s" %disc_id
280    print "INFO: discovery filename = %s" %filename
281    # now create the xquery
282    # sort out the output ID stuff ...
283    if NDG_dataProvider:
284        discObj=ndgObject(disc_id)
285        xquery=xq.actual('dif2moles',targetCollection,discObj.repository,discObj.localID)
286    else:
287        xquery=xq.actual('dif2moles',targetCollection,datacentre_namespace,disc_id)
288    # and then sort out the input ID stuff
289    xquery=xquery.replace('Input_Entry_ID',disc_id)
290    xquery=xquery.replace('repository_localid', datacentre_namespace )
291    #print xq.help('dif2moles')
292    molesid,s=xmldb.executeQuery(xquery)
293    moles_from_dif=xmldb.retrieve(molesid,0)
294    #print moles_from_dif
295    # now write out xml to file
296    outdir= './DIF2MOLES'
297    f=open(outdir+"/"+filename,'w')
298    f.write(moles_from_dif)
299    f.close()
300
301#There should be some records now
302try:
303    os.stat("./DIF2MOLES")
304except:
305    print "ERROR: couldn't create any minimum moles records for %s" %datacentre
306    sys.exit()
307
308#Add keywords if necessary
309if datacentre_groups == "":
310    commandline = "ls -1 ./DIF2MOLES/ | xargs -i mv ./DIF2MOLES/{\} ./FINALMOLES/"
311    print "INFO: Executing : " + commandline
312    status = os.system(commandline)
313    if status !=0:
314        sys.exit("Failed at moving MOLES to FINAL directory")
315else:
316    keywordAdder.main('./DIF2MOLES', './FINALMOLES', datacentre_groups)
317
318# ingest the created discovery minimum molesrecords into eXist db.
319commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/moles -u admin -P "+db_admin+" -p ./FINALMOLES"
320print "INFO: Executing : actual command to ingest into exist db."
321status = os.system(commandline)
322if status !=0:
323    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
324
325#Extract the spatiotemporal info from created moles and put in Postgres db
326SpaceTimeIngestFromMOLES.main("./FINALMOLES")
327
328#Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
329this_backupdir = backupdir + datacentre + "_" + date_string + "_originals"
330commandline = "mkdir " + this_backupdir
331print "INFO: Executing : " + commandline
332status = os.system(commandline)
333if status !=0:
334    sys.exit("Failed at creating backup directory %s" %this_backupdir)
335
336commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/ | xargs -i cp /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/{\} " + this_backupdir
337print "INFO: Executing : " + commandline
338status = os.system(commandline)
339if status !=0:
340    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
341
342this_backupdir = backupdir + datacentre + "_" + date_string + "_discovery"
343commandline = "mkdir " + this_backupdir
344print "INFO: Executing : " + commandline
345status = os.system(commandline)
346if status !=0:
347    sys.exit("Failed at creating backup directory %s" %this_backupdir)
348
349commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/ | xargs -i cp /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/{\} " + this_backupdir
350print "INFO: Executing : " + commandline
351status = os.system(commandline)
352if status !=0:
353    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
354
355this_backupdir = backupdir + datacentre + "_" + date_string + "_FINALMOLES"
356commandline = "mkdir " + this_backupdir
357print "INFO: Executing : " + commandline
358status = os.system(commandline)
359if status !=0:
360    sys.exit("Failed at creating backup directory %s" %this_backupdir)
361
362commandline = "ls -1 ./FINALMOLES | xargs -i cp ./FINALMOLES/{\} " + this_backupdir
363print "INFO: Executing : " + commandline
364status = os.system(commandline)
365if status !=0:
366    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
367
368#Clear out the original harvest records area and FINALMOLES
369commandline = "ls -1 ./FINALMOLES | xargs -i rm ./FINALMOLES/{\}"
370print "INFO: Executing : " + commandline
371status = os.system(commandline)
372if status !=0:
373    sys.exit("Failed at clearing out FINALMOLES area %s" %harvest_home)
374
375commandline = "ls -1 " + harvest_home + " | xargs -i rm " + harvest_home + "/{\}"
376print "INFO: Executing : " + commandline
377status = os.system(commandline)
378if status !=0:
379    sys.exit("Failed at clearing out original harvest records area %s" %harvest_home)
380
381print "======================================================"
382print "No. of files pre-processed = %s" %numfilesproc
383if status == 0:
384    print " Procedure oai_ingest.py ran to end"
385else:
386    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
387
388print "======================================================"
Note: See TracBrowser for help on using the repository browser.