source: TI01-discovery/trunk/ingestAutomation/OAIBatch/oai_ingest.py~ @ 3785

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/trunk/ingestAutomation/OAIBatch/oai_ingest.py~@3785
Revision 3785, 16.8 KB checked in by selatham, 12 years ago (diff)

gather required utilities. improve elementree imports in keywordAdder. put host etc in arguments for oai_ingest.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile> <existhost>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26
27import os
28import sys
29import commands
30import string
31import SpaceTimeIngestFromMOLES
32import SpaceTimeIngestPostgisFromMOLES
33import keywordAdder
34from SchemaNameSpace import SchemaNameSpace
35from DIF import DIF
36from MDIP import MDIP
37import ndgUtils
38from ndgUtils.ndgXqueries import ndgXqueries
39from ndgUtils.eXistInterface import ndg_eXist
40from ndgUtils.ndgObject import ndgObject
41import ConfigParser
42from xmlrpclib import Fault
43from ndgUtils.elementtree import ElementTree as ET
44from ndgUtils.ndgDirectory import ndgDirectory
45
46def getID(filename):
47        ''' Gets the identifier out of an input metadata xml record. Copes with DIF and MDIP currently.'''
48        xml=file(filename).read()
49        if datacentre_format == "DIF":
50            d=DIF(xml)
51            ID=d.entryID
52        elif datacentre_format == "MDIP":
53            d=MDIP(xml)
54            ID=d.id
55        else:
56            sys.exit("Only handles DIF or MDIP here.")
57        return ID
58
59status = 0
60numfilesproc = 0
61harvest_home = ""
62datacentre_groups = ""
63datacentre_format = ""
64datacentre_namespace = ""
65NDG_dataProvider = False
66
67if (len(sys.argv) < 5):
68    print "ERROR: <datacentre> or <db info file> or <existhost> or <backup> parameter not supplied."
69    sys.exit()
70else:
71    datacentre = sys.argv[1]
72    dbinfoname = sys.argv[2]
73    existhost = sys.argv[3]
74    # The directory to put things for a tape backup (should already exist)
75    backupdir = sys.argv[4]
76    #backupdir = '/disks/glue1/oaiBackup/'
77
78#Change os directory to that with the code in it.
79os.chdir('/usr/local/WSClients/OAIBatch')
80
81# Other settings and constants
82date_string = commands.getoutput ("date +'%y%m%d_%H%M'")
83os.putenv ('EXIST_HOME', '/usr/local/exist-client')
84#os.putenv ('JAVA_HOME', '/usr/java/jdk1.5.0_03')
85os.putenv ('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.')
86#os.putenv ('CLASSPATH','.:/usr/java/jdk1.5.0_03/lib/tools.jar')
87
88#Xquery settings
89xq=ndgXqueries()
90xmldb=ndg_eXist(db=existhost)
91
92# Get the harvested records directory and groups for this datacentre from the datacentre specific config file
93# The harvested records directory depends on the datacentres OAI base url, the set and  format. These have to be know up-front.
94# The groups denote which 'portal groups' they belong to - for limiting searches to say NERC-only datacentres records.
95# Groups are added to the intermediate MOLES when it is created.
96datacentre_config_filename = "/usr/local/WSClients/OAIBatch/" + datacentre + "_config.properties"
97print "INFO: Datacentre config file = %s" %datacentre_config_filename
98datacentre_config_file = open(datacentre_config_filename, "r")
99
100for line in datacentre_config_file.readlines():
101    words  = string.split(line)
102    if len(words) == 0:
103        continue
104    if words[0] == 'host_path':
105        harvest_home = string.rstrip(words[1])
106    if words[0] == 'groups':
107        datacentre_groups = words[1:]
108    if words[0] == 'format':
109        datacentre_format = words[1]
110    if words[0] == 'namespace':
111        datacentre_namespace = words[1]
112    if words[0] == 'NDG_dataProvider':
113        NDG_dataProvider = True
114
115datacentre_config_file.close()
116
117if harvest_home == "":
118    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
119else:
120    print "INFO: harvested records are in %s" %harvest_home
121
122if datacentre_groups == "":
123    print "INFO: No groups/keywords set for datacentre %s" %datacentre
124else:
125    print "INFO: datacentre groups/keywords = %s" %datacentre_groups
126
127if datacentre_format == "":
128    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
129else:
130    print "INFO: format being harvested = %s" %datacentre_format
131
132if datacentre_namespace == "":
133    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
134else:
135    print "INFO: datacentre namespace = %s" %datacentre_namespace
136
137#any records to harvest?
138if len( os.listdir(harvest_home)) == 0:
139    print "INFO: Nothing to harvest this time from %s" %datacentre
140    sys.exit()
141
142# get the exist db access info
143#host ='glue.badc.rl.ac.uk'
144dbaccess={}
145dbinfo_file=open(dbinfoname,"r")
146for line in dbinfo_file.readlines():
147    words  = string.split(line)
148    if len(words) < 2:
149        continue
150    dbaccess[(words[0],words[1])] = [words[2]]
151dbinfo_file.close()
152#print dbaccess
153db_admin = dbaccess[(existhost,'admin')][0]
154#print db_admin
155
156# Create/clear the 'in' directory pristine copy of the discovery records
157if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"):
158    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/{\}"
159    print "INFO: Executing : " + commandline
160    status = os.system(commandline)
161else:
162    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"
163    print "INFO: Executing : " + commandline
164    status= os.system(commandline)
165
166if status != 0:
167    sys.exit("Failed at creating copy dir stage")
168
169# make the 'in' pristine copy. Cope with there being lots of files in the directory.
170
171commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} /usr/local/WSClients/OAIBatch/data/" + datacentre + "/oai/originals"
172print "INFO: Executing : " + commandline
173status = os.system(commandline)
174if status !=0:
175    sys.exit("Failed at making pristine copy stage")
176
177# Create/clear the directory for the 'out' processed copy of the discovery records.
178if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"):
179    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/{\}"
180    print "INFO: Executing : " + commandline
181    status = os.system(commandline)
182else:
183    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
184    print "INFO: Executing : " + commandline
185    status= os.system(commandline)
186
187# Create/clear the directory for the 'out' namespace corrected copy of the discovery records.
188if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"):
189    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected/{\}"
190    print "INFO: Executing : " + commandline
191    status = os.system(commandline)
192else:
193    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"
194    print "INFO: Executing : " + commandline
195    status= os.system(commandline)
196
197# The file config.properties contains the location of the particular datacentres harvested records.
198# Copy the datacentre specific version of config to config.properties file.
199commandline = "cp /usr/local/WSClients/OAIBatch/" + datacentre +"_config.properties /usr/local/WSClients/OAIBatch/config.properties"
200print "INFO: Executing : " + commandline
201status = os.system(commandline)
202if status !=0:
203    sys.exit("Failed at copying config file stage")
204
205#Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
206indir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"
207outdir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
208#wrapFlag=False
209filenames = os.listdir(indir)
210for filename in filenames:
211        if filename.find('.xml') != -1:
212                original_filename = indir + "/" + filename
213                ident=getID(original_filename)
214                print "INFO: ID extracted from the discovery record = %s" %ident
215                if NDG_dataProvider:
216                    new_filename = outdir + "/"+ ident.replace(":","__")+".xml"
217                else:
218                    ident = ident.replace(":","-")
219                    ident = ident.replace("/","-")
220                    new_filename = outdir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
221                print "INFO: original file = %s, newfile = %s" %(original_filename, new_filename)
222                commandline = "cp "+original_filename+ " " +new_filename
223                #print "Executing : " + commandline
224                status = os.system(commandline)
225                if status !=0:
226                    sys.exit("ERROR: Failed at re-naming file stage")
227                numfilesproc += 1
228        else:
229                print 'WARNING: File %s is not xml format. Not processed'  %(filename)
230
231#replace any namespace declarations with a standard one which we know works in NDG
232indir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
233outdir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"
234filenames = os.listdir(indir)
235for filename in filenames:
236        if filename.find('.xml') != -1:
237                    in_filename = indir + "/" + filename
238                    corrected_filename = outdir + "/" + filename
239                    try:
240                       SchemaNameSpace(in_filename, corrected_filename,datacentre_format)
241                    except:
242                       print "ERROR: SchemaNameSpace failed on file %s"%in_filename
243
244# ingest the datacentres records into eXist db (backups of exist happen nightly).
245commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/original/"+datacentre_format+ "/" +datacentre_namespace+ " -u admin -P "+db_admin+" -p "+outdir
246print "INFO: Executing : actual command to ingest into exist db"
247status = os.system(commandline)
248if status !=0:
249    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
250
251#are there any old moles records hanging around.If so, remove.
252try:
253   os.stat("./DIF2MOLES")
254except:
255   print "INFO: No old moles records hanging around"
256else:
257   commandline = "ls -1 ./DIF2MOLES | xargs -i rm ./DIF2MOLES/{\}"
258   print "INFO: Executing : " + commandline
259   status = os.system(commandline)
260   if status !=0:
261       sys.exit("ERROR: Failed at clearing out DIF2MOLES area.")
262
263# Then run the minimum moles creator for each discovery record
264# Put records in ./DIF2MOLES with same filename
265
266# First get the list of discovery record ids from the db collection
267targetCollection = "/db/discovery/original/"+datacentre_format+ "/" +datacentre_namespace
268if datacentre_format == 'DIF':
269    ndgDir=ndgDirectory(targetCollection,host,docType='DIF')
270    #print ndgDir.members
271else:
272    print 'ERROR: mini-moles creation does not handle MDIP yet! So this WILL FAIL (probably)'
273
274
275#create the mini-moles for each Discovery record in the collection
276for member in ndgDir.members:
277    #print member
278    filename= member['fileName']
279    disc_id = member['EntryID']
280    print "INFO: internal id = %s" %disc_id
281    print "INFO: discovery filename = %s" %filename
282    # now create the xquery
283    # sort out the output ID stuff ...
284    if NDG_dataProvider:
285        discObj=ndgObject(disc_id)
286        xquery=xq.actual('dif2moles',targetCollection,discObj.repository,discObj.localID)
287    else:
288        xquery=xq.actual('dif2moles',targetCollection,datacentre_namespace,disc_id)
289    # and then sort out the input ID stuff
290    xquery=xquery.replace('Input_Entry_ID',disc_id)
291    xquery=xquery.replace('repository_localid', datacentre_namespace )
292    #print xq.help('dif2moles')
293    molesid,s=xmldb.executeQuery(xquery)
294    moles_from_dif=xmldb.retrieve(molesid,0)
295    #print moles_from_dif
296    # now write out xml to file
297    outdir= './DIF2MOLES'
298    f=open(outdir+"/"+filename,'w')
299    f.write(moles_from_dif)
300    f.close()
301
302#There should be some records now
303try:
304    os.stat("./DIF2MOLES")
305except:
306    print "ERROR: couldn't create any minimum moles records for %s" %datacentre
307    sys.exit()
308
309#Add keywords if necessary
310if datacentre_groups == "":
311    commandline = "ls -1 ./DIF2MOLES/ | xargs -i mv ./DIF2MOLES/{\} ./FINALMOLES/"
312    print "INFO: Executing : " + commandline
313    status = os.system(commandline)
314    if status !=0:
315        sys.exit("Failed at moving MOLES to FINAL directory")
316else:
317    keywordAdder.main('./DIF2MOLES', './FINALMOLES', datacentre_groups)
318
319# ingest the created discovery minimum molesrecords into eXist db.
320commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/moles -u admin -P "+db_admin+" -p ./FINALMOLES"
321print "INFO: Executing : actual command to ingest into exist db."
322status = os.system(commandline)
323if status !=0:
324    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
325
326#Extract the spatiotemporal info from created moles and put in Postgres db
327SpaceTimeIngestFromMOLES.main("./FINALMOLES")
328
329#Extract the spatiotemporal info and put into NEW postgis tables
330#SpaceTimeIngestPostgisFromMOLES.main("./FINALMOLES")
331
332#Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
333this_backupdir = backupdir + datacentre + "_" + date_string + "_originals"
334commandline = "mkdir " + this_backupdir
335print "INFO: Executing : " + commandline
336status = os.system(commandline)
337if status !=0:
338    sys.exit("Failed at creating backup directory %s" %this_backupdir)
339
340commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/ | xargs -i cp /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/{\} " + this_backupdir
341print "INFO: Executing : " + commandline
342status = os.system(commandline)
343if status !=0:
344    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
345
346this_backupdir = backupdir + datacentre + "_" + date_string + "_discovery"
347commandline = "mkdir " + this_backupdir
348print "INFO: Executing : " + commandline
349status = os.system(commandline)
350if status !=0:
351    sys.exit("Failed at creating backup directory %s" %this_backupdir)
352
353commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/ | xargs -i cp /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/{\} " + this_backupdir
354print "INFO: Executing : " + commandline
355status = os.system(commandline)
356if status !=0:
357    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
358
359this_backupdir = backupdir + datacentre + "_" + date_string + "_FINALMOLES"
360commandline = "mkdir " + this_backupdir
361print "INFO: Executing : " + commandline
362status = os.system(commandline)
363if status !=0:
364    sys.exit("Failed at creating backup directory %s" %this_backupdir)
365
366commandline = "ls -1 ./FINALMOLES | xargs -i cp ./FINALMOLES/{\} " + this_backupdir
367print "INFO: Executing : " + commandline
368status = os.system(commandline)
369if status !=0:
370    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
371
372#Clear out the original harvest records area and FINALMOLES
373commandline = "ls -1 ./FINALMOLES | xargs -i rm ./FINALMOLES/{\}"
374print "INFO: Executing : " + commandline
375status = os.system(commandline)
376if status !=0:
377    sys.exit("Failed at clearing out FINALMOLES area %s" %harvest_home)
378
379commandline = "ls -1 " + harvest_home + " | xargs -i rm " + harvest_home + "/{\}"
380print "INFO: Executing : " + commandline
381status = os.system(commandline)
382if status !=0:
383    sys.exit("Failed at clearing out original harvest records area %s" %harvest_home)
384
385print "======================================================"
386print "No. of files pre-processed = %s" %numfilesproc
387if status == 0:
388    print " Procedure oai_ingest.py ran to end"
389else:
390    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
391
392print "======================================================"
Note: See TracBrowser for help on using the repository browser.