source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest.py @ 3797

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest.py@3797
Revision 3797, 17.0 KB checked in by cbyrom, 11 years ago (diff)

Upgraded version of ingest codebranch - including major refactoring of the ingest
scripts to make more OO - allowing re-use and simplification of code + removal of reliance
on eXist DB to store data; this will now all be stored and looked up from the Postgres DB

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26
27import os
28import sys
29import commands
30import string
31import SpaceTimeIngestFromMOLES
32import SpaceTimeIngestPostgisFromMOLES
33import keywordAdder
34from SchemaNameSpace import SchemaNameSpace
35from DIF import DIF
36from MDIP import MDIP
37import ndgUtils
38from ndgUtils.ndgXqueries import ndgXqueries
39from ndgUtils.eXistInterface import ndg_eXist
40from ndgUtils.ndgObject import ndgObject
41import ConfigParser
42from xmlrpclib import Fault
43from ndgUtils.elementtree import ElementTree as ET
44from ndgUtils.ndgDirectory import ndgDirectory
45
46def getID(filename):
47        ''' Gets the identifier out of an input metadata xml record. Copes with DIF and MDIP currently.'''
48        xml=file(filename).read()
49        if datacentre_format == "DIF":
50            d=DIF(xml)
51            ID=d.entryID
52        elif datacentre_format == "MDIP":
53            d=MDIP(xml)
54            ID=d.id
55        else:
56            sys.exit("Only handles DIF or MDIP here.")
57        return ID
58
59status = 0
60numfilesproc = 0
61harvest_home = ""
62datacentre_groups = ""
63datacentre_format = ""
64datacentre_namespace = ""
65NDG_dataProvider = False
66
67if (len(sys.argv) < 3):
68    print "ERROR: <datacentre> or <db info file> parameter not supplied."
69    sys.exit()
70else:
71    datacentre = sys.argv[1]
72    dbinfoname = sys.argv[2]
73
74#Change os directory to that with the code in it.
75os.chdir('/usr/local/WSClients/OAIBatch')
76
77# Other settings and constants
78date_string = commands.getoutput ("date +'%y%m%d_%H%M'")
79os.putenv ('EXIST_HOME', '/usr/local/exist-client')
80#os.putenv ('JAVA_HOME', '/usr/java/jdk1.5.0_03')
81os.putenv ('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.')
82#os.putenv ('CLASSPATH','.:/usr/java/jdk1.5.0_03/lib/tools.jar')
83
84#Xquery settings
85xq=ndgXqueries()
86xmldb=ndg_eXist(db='glue.badc.rl.ac.uk')
87
88# Get the harvested records directory and groups for this datacentre from the datacentre specific config file
89# The harvested records directory depends on the datacentres OAI base url, the set and  format. These have to be know up-front.
90# The groups denote which 'portal groups' they belong to - for limiting searches to say NERC-only datacentres records.
91# Groups are added to the intermediate MOLES when it is created.
92datacentre_config_filename = "/usr/local/WSClients/OAIBatch/" + datacentre + "_config.properties"
93print "INFO: Datacentre config file = %s" %datacentre_config_filename
94datacentre_config_file = open(datacentre_config_filename, "r")
95
96for line in datacentre_config_file.readlines():
97    words  = string.split(line)
98    if len(words) == 0:
99        continue
100    if words[0] == 'host_path':
101        harvest_home = string.rstrip(words[1])
102    if words[0] == 'groups':
103        datacentre_groups = words[1:]
104    if words[0] == 'format':
105        datacentre_format = words[1]
106    if words[0] == 'namespace':
107        datacentre_namespace = words[1]
108    if words[0] == 'NDG_dataProvider':
109        NDG_dataProvider = True
110
111datacentre_config_file.close()
112
113if harvest_home == "":
114    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
115else:
116    print "INFO: harvested records are in %s" %harvest_home
117
118if datacentre_groups == "":
119    print "INFO: No groups/keywords set for datacentre %s" %datacentre
120else:
121    print "INFO: datacentre groups/keywords = %s" %datacentre_groups
122
123if datacentre_format == "":
124    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
125else:
126    print "INFO: format being harvested = %s" %datacentre_format
127
128if datacentre_namespace == "":
129    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
130else:
131    print "INFO: datacentre namespace = %s" %datacentre_namespace
132
133#any records to harvest?
134if len( os.listdir(harvest_home)) == 0:
135    print "INFO: Nothing to harvest this time from %s" %datacentre
136    sys.exit()
137
138# get the db access info
139host ='glue.badc.rl.ac.uk'
140dbaccess={}
141dbinfo_file=open(dbinfoname,"r")
142for line in dbinfo_file.readlines():
143    words  = string.split(line)
144    if len(words) < 2:
145        continue
146    dbaccess[(words[0],words[1])] = [words[2]]
147dbinfo_file.close()
148#print dbaccess
149db_admin = dbaccess[(host,'admin')][0]
150#print db_admin
151
152# The directory to put things for a tape backup (should already exist)
153backupdir = '/disks/glue1/oaiBackup/'
154
155# Create/clear the 'in' directory pristine copy of the discovery records
156if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"):
157    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/{\}"
158    print "INFO: Executing : " + commandline
159    status = os.system(commandline)
160else:
161    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"
162    print "INFO: Executing : " + commandline
163    status= os.system(commandline)
164
165if status != 0:
166    sys.exit("Failed at creating copy dir stage")
167
168# make the 'in' pristine copy. Cope with there being lots of files in the directory.
169
170commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} /usr/local/WSClients/OAIBatch/data/" + datacentre + "/oai/originals"
171print "INFO: Executing : " + commandline
172status = os.system(commandline)
173if status !=0:
174    sys.exit("Failed at making pristine copy stage")
175
176# Create/clear the directory for the 'out' processed copy of the discovery records.
177if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"):
178    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/{\}"
179    print "INFO: Executing : " + commandline
180    status = os.system(commandline)
181else:
182    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
183    print "INFO: Executing : " + commandline
184    status= os.system(commandline)
185
186# Create/clear the directory for the 'out' namespace corrected copy of the discovery records.
187if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"):
188    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected/{\}"
189    print "INFO: Executing : " + commandline
190    status = os.system(commandline)
191else:
192    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"
193    print "INFO: Executing : " + commandline
194    status= os.system(commandline)
195
196# The file config.properties contains the location of the particular datacentres harvested records.
197# Copy the datacentre specific version of config to config.properties file.
198commandline = "cp /usr/local/WSClients/OAIBatch/" + datacentre +"_config.properties /usr/local/WSClients/OAIBatch/config.properties"
199print "INFO: Executing : " + commandline
200status = os.system(commandline)
201if status !=0:
202    sys.exit("Failed at copying config file stage")
203
204#Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
205indir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"
206outdir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
207#wrapFlag=False
208filenames = os.listdir(indir)
209for filename in filenames:
210        if filename.find('.xml') != -1:
211                original_filename = indir + "/" + filename
212                ident=getID(original_filename)
213                print "INFO: ID extracted from the discovery record = %s" %ident
214                if NDG_dataProvider:
215                    new_filename = outdir + "/"+ ident.replace(":","__")+".xml"
216                else:
217                    ident = ident.replace(":","-")
218                    ident = ident.replace("/","-")
219#                    new_filename = outdir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
220                    new_filename = indir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
221 
222                print "INFO: original file = %s, newfile = %s" %(original_filename, new_filename)
223               
224                try:
225                        os.rename(original_filename, new_filename)
226                except:
227                        sys.exit("ERROR: Failed to rename file %s to %s" %(original_filename, new_filename))
228#                commandline = "cp "+original_filename+ " " +new_filename
229                #print "Executing : " + commandline
230#                status = os.system(commandline)
231#                if status !=0:
232#                    sys.exit("ERROR: Failed at re-naming file stage")
233                numfilesproc += 1
234        else:
235                print 'WARNING: File %s is not xml format. Not processed'  %(filename)
236
237#replace any namespace declarations with a standard one which we know works in NDG
238indir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
239outdir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"
240filenames = os.listdir(indir)
241for filename in filenames:
242        if filename.find('.xml') != -1:
243                    in_filename = indir + "/" + filename
244                    corrected_filename = outdir + "/" + filename
245                    try:
246                       SchemaNameSpace(in_filename, corrected_filename,datacentre_format)
247                    except:
248                       print "ERROR: SchemaNameSpace failed on file %s"%in_filename
249
250# ingest the datacentres records into eXist db (backups of exist happen nightly).
251commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/original/"+datacentre_format+ "/" +datacentre_namespace+ " -u admin -P "+db_admin+" -p "+outdir
252print "INFO: Executing : actual command to ingest into exist db"
253status = os.system(commandline)
254if status !=0:
255    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
256
257#are there any old moles records hanging around.If so, remove.
258try:
259   os.stat("./DIF2MOLES")
260except:
261   print "INFO: No old moles records hanging around"
262else:
263   commandline = "ls -1 ./DIF2MOLES | xargs -i rm ./DIF2MOLES/{\}"
264   print "INFO: Executing : " + commandline
265   status = os.system(commandline)
266   if status !=0:
267       sys.exit("ERROR: Failed at clearing out DIF2MOLES area.")
268
269# Then run the minimum moles creator for each discovery record
270# Put records in ./DIF2MOLES with same filename
271
272# First get the list of discovery record ids from the db collection
273targetCollection = "/db/discovery/original/"+datacentre_format+ "/" +datacentre_namespace
274if datacentre_format == 'DIF':
275    ndgDir=ndgDirectory(targetCollection,host,docType='DIF')
276    #print ndgDir.members
277else:
278    print 'ERROR: mini-moles creation does not handle MDIP yet! So this WILL FAIL (probably)'
279
280
281#create the mini-moles for each Discovery record in the collection
282for member in ndgDir.members:
283    #print member
284    filename= member['fileName']
285    disc_id = member['EntryID']
286    print "INFO: internal id = %s" %disc_id
287    print "INFO: discovery filename = %s" %filename
288    # now create the xquery
289    # sort out the output ID stuff ...
290    if NDG_dataProvider:
291        discObj=ndgObject(disc_id)
292        xquery=xq.actual('dif2moles',targetCollection,discObj.repository,discObj.localID)
293    else:
294        xquery=xq.actual('dif2moles',targetCollection,datacentre_namespace,disc_id)
295    # and then sort out the input ID stuff
296    xquery=xquery.replace('Input_Entry_ID',disc_id)
297    xquery=xquery.replace('repository_localid', datacentre_namespace )
298    #print xq.help('dif2moles')
299    molesid,s=xmldb.executeQuery(xquery)
300    moles_from_dif=xmldb.retrieve(molesid,0)
301    #print moles_from_dif
302    # now write out xml to file
303    outdir= './DIF2MOLES'
304    f=open(outdir+"/"+filename,'w')
305    f.write(moles_from_dif)
306    f.close()
307
308#There should be some records now
309try:
310    os.stat("./DIF2MOLES")
311except:
312    print "ERROR: couldn't create any minimum moles records for %s" %datacentre
313    sys.exit()
314
315#Add keywords if necessary
316if datacentre_groups == "":
317    commandline = "ls -1 ./DIF2MOLES/ | xargs -i mv ./DIF2MOLES/{\} ./FINALMOLES/"
318    print "INFO: Executing : " + commandline
319    status = os.system(commandline)
320    if status !=0:
321        sys.exit("Failed at moving MOLES to FINAL directory")
322else:
323    keywordAdder.main('./DIF2MOLES', './FINALMOLES', datacentre_groups)
324
325# ingest the created discovery minimum molesrecords into eXist db.
326commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/moles -u admin -P "+db_admin+" -p ./FINALMOLES"
327print "INFO: Executing : actual command to ingest into exist db."
328status = os.system(commandline)
329if status !=0:
330    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
331
332#Extract the spatiotemporal info from created moles and put in Postgres db
333SpaceTimeIngestFromMOLES.main("./FINALMOLES")
334
335#Extract the spatiotemporal info and put into NEW postgis tables
336#SpaceTimeIngestPostgisFromMOLES.main("./FINALMOLES")
337
338#Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
339this_backupdir = backupdir + datacentre + "_" + date_string + "_originals"
340commandline = "mkdir " + this_backupdir
341print "INFO: Executing : " + commandline
342status = os.system(commandline)
343if status !=0:
344    sys.exit("Failed at creating backup directory %s" %this_backupdir)
345
346commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/ | xargs -i cp /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/{\} " + this_backupdir
347print "INFO: Executing : " + commandline
348status = os.system(commandline)
349if status !=0:
350    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
351
352this_backupdir = backupdir + datacentre + "_" + date_string + "_discovery"
353commandline = "mkdir " + this_backupdir
354print "INFO: Executing : " + commandline
355status = os.system(commandline)
356if status !=0:
357    sys.exit("Failed at creating backup directory %s" %this_backupdir)
358
359commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/ | xargs -i cp /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/{\} " + this_backupdir
360print "INFO: Executing : " + commandline
361status = os.system(commandline)
362if status !=0:
363    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
364
365this_backupdir = backupdir + datacentre + "_" + date_string + "_FINALMOLES"
366commandline = "mkdir " + this_backupdir
367print "INFO: Executing : " + commandline
368status = os.system(commandline)
369if status !=0:
370    sys.exit("Failed at creating backup directory %s" %this_backupdir)
371
372commandline = "ls -1 ./FINALMOLES | xargs -i cp ./FINALMOLES/{\} " + this_backupdir
373print "INFO: Executing : " + commandline
374status = os.system(commandline)
375if status !=0:
376    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
377
378#Clear out the original harvest records area and FINALMOLES
379commandline = "ls -1 ./FINALMOLES | xargs -i rm ./FINALMOLES/{\}"
380print "INFO: Executing : " + commandline
381status = os.system(commandline)
382if status !=0:
383    sys.exit("Failed at clearing out FINALMOLES area %s" %harvest_home)
384
385commandline = "ls -1 " + harvest_home + " | xargs -i rm " + harvest_home + "/{\}"
386print "INFO: Executing : " + commandline
387status = os.system(commandline)
388if status !=0:
389    sys.exit("Failed at clearing out original harvest records area %s" %harvest_home)
390
391print "======================================================"
392print "No. of files pre-processed = %s" %numfilesproc
393if status == 0:
394    print " Procedure oai_ingest.py ran to end"
395else:
396    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
397
398print "======================================================"
Note: See TracBrowser for help on using the repository browser.