source: TI01-discovery/trunk/ingestAutomation/OAIBatch/oai_ingest.py @ 3168

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/trunk/ingestAutomation/OAIBatch/oai_ingest.py@3168
Revision 3168, 18.2 KB checked in by selatham, 12 years ago (diff)

Re-write to use the pythonic dif2moles xquery rather than Java. extract pwds too.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameter <datacentre>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules for parts of the process.
5 - a DataProvider specific config file,
6 - the d2b.jar moles creator class which creates moles discovery records,
7 - the python module for extracting spatiotemporal information and adding to postgres db.
8Under this directory the following structure should be maintained:
9 ./data
10 - /DATACENTRE/
11                - discovery/:         Re-named documents.
12        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
13                - oai/difYYYYMMDD/    Documents as harvested from OAI
14 Where  /DATACENTRE  varies for the different data providers
15"""
16#History:
17# 12/05/06 SEL spelling correction
18# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
19# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
20# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
21# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
22# 17/10/06 SEL cope with different discovery formats - not just DIF.
23# 23/10/06 SEL keywords not mandatory in config file.
24# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
25#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
26#                    Also extracted hard coded pwds into a file.
27
28import os
29import sys
30import commands
31import string
32import SpaceTimeIngestFromMOLES
33import keywordAdder
34from SchemaNameSpace import SchemaNameSpace
35from DIF import DIF
36from MDIP import MDIP
37import ndgUtils
38from ndgUtils.ndgXqueries import ndgXqueries
39from ndgUtils.eXistInterface import ndg_eXist
40from ndgUtils.ndgObject import ndgObject
41import ConfigParser
42from xmlrpclib import Fault
43from ndgUtils.elementtree import ElementTree as ET
44from ndgUtils.ndgDirectory import ndgDirectory
45
46def getID(filename):
47        ''' Gets the identifier out of an input metadata xml record. Copes with DIF and MDIP currently.'''
48        xml=file(filename).read()
49        if datacentre_format == "DIF":
50            d=DIF(xml)
51            ID=d.entryID
52        elif datacentre_format == "MDIP":
53            d=MDIP(xml)
54            ID=d.id
55        else:
56            sys.exit("Only handles DIF or MDIP here.")
57        return ID
58
59status = 0
60numfilesproc = 0
61harvest_home = ""
62datacentre_groups = ""
63datacentre_format = ""
64datacentre_namespace = ""
65NDG_dataProvider = False
66
67if (len(sys.argv) < 3):
68    print "ERROR: <datacentre> or <db info file> parameter not supplied."
69    sys.exit()
70else:
71    datacentre = sys.argv[1]
72    dbinfoname = sys.argv[2]
73
74# Other settings and constants
75date_string = commands.getoutput ("date +'%y%m%d_%H%M'")
76os.putenv ('EXIST_HOME', '/usr/local/exist-client')
77#os.putenv ('JAVA_HOME', '/usr/java/jdk1.5.0_03')
78os.putenv ('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.')
79#os.putenv ('CLASSPATH','.:/usr/java/jdk1.5.0_03/lib/tools.jar')
80
81#Xquery settings
82#f='glue.config'
83#c=ConfigParser.ConfigParser()
84#c.read(f)
85xq=ndgXqueries()
86xmldb=ndg_eXist(db='glue.badc.rl.ac.uk')
87
88# Get the harvested records directory and groups for this datacentre from the datacentre specific config file
89# The harvested records directory depends on the datacentres OAI base url, the set and  format. These have to be know up-front.
90# The groups denote which 'portal groups' they belong to - for limiting searches to say NERC-only datacentres records.
91# Groups are added to the intermediate MOLES when it is created.
92datacentre_config_filename = "/usr/local/WSClients/OAIBatch/" + datacentre + "_config.properties"
93print "INFO: Datacentre config file = %s" %datacentre_config_filename
94datacentre_config_file = open(datacentre_config_filename, "r")
95
96for line in datacentre_config_file.readlines():
97    words  = string.split(line)
98    if len(words) == 0:
99        continue
100    if words[0] == 'host_path':
101        harvest_home = string.rstrip(words[1])
102    if words[0] == 'groups':
103        datacentre_groups = words[1:]
104    if words[0] == 'format':
105        datacentre_format = words[1]
106    if words[0] == 'namespace':
107        datacentre_namespace = words[1]
108    if words[0] == 'NDG_dataProvider':
109        NDG_dataProvider = True
110
111datacentre_config_file.close()
112
113if harvest_home == "":
114    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
115else:
116    print "INFO: harvested records are in %s" %harvest_home
117
118if datacentre_groups == "":
119    print "INFO: No groups/keywords set for datacentre %s" %datacentre
120else:
121    print "INFO: datacentre groups/keywords = %s" %datacentre_groups
122
123if datacentre_format == "":
124    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
125else:
126    print "INFO: format being harvested = %s" %datacentre_format
127
128if datacentre_namespace == "":
129    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
130else:
131    print "INFO: datacentre namespace = %s" %datacentre_namespace
132
133#any records to harvest?
134if len( os.listdir(harvest_home)) == 0:
135    print "INFO: Nothing to harvest this time from %s" %datacentre
136    sys.exit()
137
138# get the db access info
139host ='glue.badc.rl.ac.uk'
140dbaccess={}
141dbinfo_file=open(dbinfoname,"r")
142for line in dbinfo_file.readlines():
143    words  = string.split(line)
144    if len(words) < 2:
145        continue
146    dbaccess[(words[0],words[1])] = [words[2]]
147datacentre_config_file.close()
148#print dbaccess
149db_admin = dbaccess[(host,'admin')][0]
150#print db_admin
151
152# The directory to put things for a tape backup (should already exist)
153backupdir = '/disks/glue1/oaiBackup/'
154
155# Create/clear the 'in' directory pristine copy of the discovery records
156if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"):
157    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/{\}"
158    print "INFO: Executing : " + commandline
159    status = os.system(commandline)
160else:
161    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"
162    print "INFO: Executing : " + commandline
163    status= os.system(commandline)
164
165if status != 0:
166    sys.exit("Failed at creating copy dir stage")
167
168# make the 'in' pristine copy. Cope with there being lots of files in the directory.
169
170commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} /usr/local/WSClients/OAIBatch/data/" + datacentre + "/oai/originals"
171print "INFO: Executing : " + commandline
172status = os.system(commandline)
173if status !=0:
174    sys.exit("Failed at making pristine copy stage")
175
176# Create/clear the directory for the 'out' processed copy of the discovery records.
177if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"):
178    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/{\}"
179    print "INFO: Executing : " + commandline
180    status = os.system(commandline)
181else:
182    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
183    print "INFO: Executing : " + commandline
184    status= os.system(commandline)
185
186# Create/clear the directory for the 'out' namespace corrected copy of the discovery records.
187if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"):
188    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected/{\}"
189    print "INFO: Executing : " + commandline
190    status = os.system(commandline)
191else:
192    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"
193    print "INFO: Executing : " + commandline
194    status= os.system(commandline)
195
196# The file config.properties contains the location of the particular datacentres harvested records.
197# Copy the datacentre specific version of config to config.properties file.
198commandline = "cp /usr/local/WSClients/OAIBatch/" + datacentre +"_config.properties /usr/local/WSClients/OAIBatch/config.properties"
199print "INFO: Executing : " + commandline
200status = os.system(commandline)
201if status !=0:
202    sys.exit("Failed at copying config file stage")
203
204#Change os directory to that with the code in it.
205os.chdir('/usr/local/WSClients/OAIBatch')
206
207
208#Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
209indir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"
210outdir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
211#wrapFlag=False
212filenames = os.listdir(indir)
213for filename in filenames:
214        if filename.find('.xml') != -1:
215                original_filename = indir + "/" + filename
216                ident=getID(original_filename)
217                print "INFO: ID extracted from the discovery record = %s" %ident
218                if NDG_dataProvider:
219                    new_filename = outdir + "/"+ ident.replace(":","__")+".xml"
220                else:
221                    ident = ident.replace(":","-")
222                    ident = ident.replace("/","-")
223                    new_filename = outdir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
224                print "INFO: original file = %s, newfile = %s" %(original_filename, new_filename)
225                commandline = "cp "+original_filename+ " " +new_filename
226                #print "Executing : " + commandline
227                status = os.system(commandline)
228                if status !=0:
229                    sys.exit("ERROR: Failed at re-naming file stage")
230                numfilesproc += 1
231        else:
232                print 'WARNING: File %s is not xml format. Not processed'  %(full_filename)
233
234#replace any namespace declarations with a standard one which we know works in NDG
235indir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
236outdir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"
237filenames = os.listdir(indir)
238for filename in filenames:
239        if filename.find('.xml') != -1:
240                    in_filename = indir + "/" + filename
241                    corrected_filename = outdir + "/" + filename
242                    try:
243                       SchemaNameSpace(in_filename, corrected_filename,datacentre_format)
244                    except:
245                       print "ERROR: SchemaNameSpace failed on file %s"%in_filename
246
247# ingest the datacentres records into eXist db (backups of exist happen nightly).
248commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/original/"+datacentre_format+ "/" +datacentre_namespace+ " -u admin -P "+db_admin+" -p "+outdir
249print "INFO: Executing : actual command to ingest into exist db"
250status = os.system(commandline)
251if status !=0:
252    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
253
254#are there any old moles records hanging around.If so, remove.
255try:
256   os.stat("./DIF2MOLES")
257except:
258   print "INFO: No old moles records hanging around"
259else:
260   commandline = "ls -1 ./DIF2MOLES | xargs -i rm ./DIF2MOLES/{\}"
261   print "INFO: Executing : " + commandline
262   status = os.system(commandline)
263   if status !=0:
264       sys.exit("ERROR: Failed at clearing out DIF2MOLES area.")
265
266# Then run the minimum moles creator for each discovery record
267# Put records in ./DIF2MOLES with same filename
268#filenames = os.listdir(outdir)
269#for filename in filenames:
270#        if filename.find('.xml') != -1:
271#                    original_filename = outdir + "/" + filename
272#                    ident=getID(original_filename)
273#                    if NDG_dataProvider:
274#                        newident=ident.replace(":","__")
275#                        print "identifier is %s" %newident
276#                        molesLocalID = newident.split("__",2)[2]
277#                    else:
278#                        molesLocalID = ident
279#                    print "molesLocalID is %s" %molesLocalID
280#                    commandline = "java -jar D2B/d2boneoff.jar repositoryID " +datacentre_namespace+" repositoryLocalID "+datacentre+" format "+ \
281#                    datacentre_format+" repository xmldb:exist://glue.badc.rl.ac.uk:8080/exist/xmlrpc userpw xxxxxx targetCollection /db/discovery/original/"+ \
282#                    datacentre_format+"/"+datacentre_namespace +" inputRecordID "+ ident+ " outputLocalID "+molesLocalID+ " > ./DIF2MOLES/"+filename
283#                    print "Executing command to run d2boneoff.jar"
284#                    status= os.system(commandline)
285#                    if status==10:
286#                        print "WARNING: couldn't find the record"
287#                    elif status!=0:
288#                        print "ERROR: couldn't create the minimum moles records"
289#                        sys.exit
290#There should be some records now
291try:
292    os.stat("./DIF2MOLES")
293except:
294    print "ERROR: couldn't create any minimum moles records for %s" %datacentre
295    sys.exit()
296
297# Then run the minimum moles creator for each discovery record
298# Put records in ./DIF2MOLES with same filename
299
300# First get the list of discovery record ids from the db collection
301targetCollection = "/db/discovery/original/"+datacentre_format+ "/" +datacentre_namespace
302if datacentre_format == 'DIF':
303    ndgDir=ndgDirectory(targetCollection,host,docType='DIF')
304    #print ndgDir.members
305else:
306    print 'ERROR: mini-moles creation does not handle MDIP yet! So this WILL FAIL (probably)'
307
308
309#create the mini-moles for each Discovery record in the collection
310for member in ndgDir.members:
311    #print member
312    filename= member['fileName']
313    disc_id = member['EntryID']
314    print "INFO: internal id = %s" %disc_id
315    print "INFO: discovery filename = %s" %filename
316    # now create the xquery
317    # sort out the output ID stuff ...
318    if NDG_dataProvider:
319        discObj=ndgObject(disc_id)
320        xquery=xq.actual('dif2moles',targetCollection,discObj.repository,discObj.localID)
321    else:
322        xquery=xq.actual('dif2moles',targetCollection,datacentre_namespace,disc_id)
323    # and then sort out the input ID stuff
324    xquery=xquery.replace('Input_Entry_ID',disc_id)
325    xquery=xquery.replace('repository_localid', datacentre_namespace )
326    #print xq.help('dif2moles')
327    molesid,s=xmldb.executeQuery(xquery)
328    moles_from_dif=xmldb.retrieve(molesid,0)
329    #print moles_from_dif
330    # now write out xml to file
331    outdir= './DIF2MOLES'
332    f=open(outdir+"/"+filename,'w')
333    f.write(moles_from_dif)
334    f.close()
335
336#Add keywords if necessary
337if datacentre_groups == "":
338    commandline = "ls -1 ./DIF2MOLES/ | xargs -i mv ./DIF2MOLES/{\} ./FINALMOLES/"
339    print "INFO: Executing : " + commandline
340    status = os.system(commandline)
341    if status !=0:
342        sys.exit("Failed at moving MOLES to FINAL directory")
343else:
344    keywordAdder.main('./DIF2MOLES', './FINALMOLES', datacentre_groups)
345
346# ingest the created discovery minimum molesrecords into eXist db.
347commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/moles -u admin -P "+db_admin+" -p ./FINALMOLES"
348print "INFO: Executing : actual command to ingest into exist db."
349status = os.system(commandline)
350if status !=0:
351    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
352
353#Extract the spatiotemporal info from created moles and put in Postgres db
354SpaceTimeIngestFromMOLES.main("./FINALMOLES")
355
356#Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
357this_backupdir = backupdir + datacentre + "_" + date_string + "_originals"
358commandline = "mkdir " + this_backupdir
359print "INFO: Executing : " + commandline
360status = os.system(commandline)
361if status !=0:
362    sys.exit("Failed at creating backup directory %s" %this_backupdir)
363
364commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/ | xargs -i cp /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/{\} " + this_backupdir
365print "INFO: Executing : " + commandline
366status = os.system(commandline)
367if status !=0:
368    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
369
370this_backupdir = backupdir + datacentre + "_" + date_string + "_discovery"
371commandline = "mkdir " + this_backupdir
372print "INFO: Executing : " + commandline
373status = os.system(commandline)
374if status !=0:
375    sys.exit("Failed at creating backup directory %s" %this_backupdir)
376
377commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/ | xargs -i cp /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/{\} " + this_backupdir
378print "INFO: Executing : " + commandline
379status = os.system(commandline)
380if status !=0:
381    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
382
383this_backupdir = backupdir + datacentre + "_" + date_string + "_FINALMOLES"
384commandline = "mkdir " + this_backupdir
385print "INFO: Executing : " + commandline
386status = os.system(commandline)
387if status !=0:
388    sys.exit("Failed at creating backup directory %s" %this_backupdir)
389
390commandline = "ls -1 ./FINALMOLES | xargs -i cp ./FINALMOLES/{\} " + this_backupdir
391print "INFO: Executing : " + commandline
392status = os.system(commandline)
393if status !=0:
394    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
395
396#Clear out the original harvest records area and FINALMOLES
397commandline = "ls -1 ./FINALMOLES | xargs -i rm ./FINALMOLES/{\}"
398print "INFO: Executing : " + commandline
399status = os.system(commandline)
400if status !=0:
401    sys.exit("Failed at clearing out FINALMOLES area %s" %harvest_home)
402
403commandline = "ls -1 " + harvest_home + " | xargs -i rm " + harvest_home + "/{\}"
404print "INFO: Executing : " + commandline
405status = os.system(commandline)
406if status !=0:
407    sys.exit("Failed at clearing out original harvest records area %s" %harvest_home)
408
409print "======================================================"
410print "No. of files pre-processed = %s" %numfilesproc
411if status == 0:
412    print " Procedure oai_ingest.py ran to end"
413else:
414    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
415
416print "======================================================"
Note: See TracBrowser for help on using the repository browser.