source: TI01-discovery/trunk/ingestAutomation/OAIBatch/oai_ingest.py @ 3777

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/trunk/ingestAutomation/OAIBatch/oai_ingest.py@3777
Revision 3777, 16.7 KB checked in by selatham, 11 years ago (diff)

removed development SpaceTimeIngestPostgisFromMOLES

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26
27import os
28import sys
29import commands
30import string
31import SpaceTimeIngestFromMOLES
32import SpaceTimeIngestPostgisFromMOLES
33import keywordAdder
34from SchemaNameSpace import SchemaNameSpace
35from DIF import DIF
36from MDIP import MDIP
37import ndgUtils
38from ndgUtils.ndgXqueries import ndgXqueries
39from ndgUtils.eXistInterface import ndg_eXist
40from ndgUtils.ndgObject import ndgObject
41import ConfigParser
42from xmlrpclib import Fault
43from ndgUtils.elementtree import ElementTree as ET
44from ndgUtils.ndgDirectory import ndgDirectory
45
46def getID(filename):
47        ''' Gets the identifier out of an input metadata xml record. Copes with DIF and MDIP currently.'''
48        xml=file(filename).read()
49        if datacentre_format == "DIF":
50            d=DIF(xml)
51            ID=d.entryID
52        elif datacentre_format == "MDIP":
53            d=MDIP(xml)
54            ID=d.id
55        else:
56            sys.exit("Only handles DIF or MDIP here.")
57        return ID
58
59status = 0
60numfilesproc = 0
61harvest_home = ""
62datacentre_groups = ""
63datacentre_format = ""
64datacentre_namespace = ""
65NDG_dataProvider = False
66
67if (len(sys.argv) < 3):
68    print "ERROR: <datacentre> or <db info file> parameter not supplied."
69    sys.exit()
70else:
71    datacentre = sys.argv[1]
72    dbinfoname = sys.argv[2]
73
74#Change os directory to that with the code in it.
75os.chdir('/usr/local/WSClients/OAIBatch')
76
77# Other settings and constants
78date_string = commands.getoutput ("date +'%y%m%d_%H%M'")
79os.putenv ('EXIST_HOME', '/usr/local/exist-client')
80#os.putenv ('JAVA_HOME', '/usr/java/jdk1.5.0_03')
81os.putenv ('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.')
82#os.putenv ('CLASSPATH','.:/usr/java/jdk1.5.0_03/lib/tools.jar')
83
84#Xquery settings
85xq=ndgXqueries()
86xmldb=ndg_eXist(db='glue.badc.rl.ac.uk')
87
88# Get the harvested records directory and groups for this datacentre from the datacentre specific config file
89# The harvested records directory depends on the datacentres OAI base url, the set and  format. These have to be know up-front.
90# The groups denote which 'portal groups' they belong to - for limiting searches to say NERC-only datacentres records.
91# Groups are added to the intermediate MOLES when it is created.
92datacentre_config_filename = "/usr/local/WSClients/OAIBatch/" + datacentre + "_config.properties"
93print "INFO: Datacentre config file = %s" %datacentre_config_filename
94datacentre_config_file = open(datacentre_config_filename, "r")
95
96for line in datacentre_config_file.readlines():
97    words  = string.split(line)
98    if len(words) == 0:
99        continue
100    if words[0] == 'host_path':
101        harvest_home = string.rstrip(words[1])
102    if words[0] == 'groups':
103        datacentre_groups = words[1:]
104    if words[0] == 'format':
105        datacentre_format = words[1]
106    if words[0] == 'namespace':
107        datacentre_namespace = words[1]
108    if words[0] == 'NDG_dataProvider':
109        NDG_dataProvider = True
110
111datacentre_config_file.close()
112
113if harvest_home == "":
114    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
115else:
116    print "INFO: harvested records are in %s" %harvest_home
117
118if datacentre_groups == "":
119    print "INFO: No groups/keywords set for datacentre %s" %datacentre
120else:
121    print "INFO: datacentre groups/keywords = %s" %datacentre_groups
122
123if datacentre_format == "":
124    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
125else:
126    print "INFO: format being harvested = %s" %datacentre_format
127
128if datacentre_namespace == "":
129    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
130else:
131    print "INFO: datacentre namespace = %s" %datacentre_namespace
132
133#any records to harvest?
134if len( os.listdir(harvest_home)) == 0:
135    print "INFO: Nothing to harvest this time from %s" %datacentre
136    sys.exit()
137
138# get the db access info
139host ='glue.badc.rl.ac.uk'
140dbaccess={}
141dbinfo_file=open(dbinfoname,"r")
142for line in dbinfo_file.readlines():
143    words  = string.split(line)
144    if len(words) < 2:
145        continue
146    dbaccess[(words[0],words[1])] = [words[2]]
147dbinfo_file.close()
148#print dbaccess
149db_admin = dbaccess[(host,'admin')][0]
150#print db_admin
151
152# The directory to put things for a tape backup (should already exist)
153backupdir = '/disks/glue1/oaiBackup/'
154
155# Create/clear the 'in' directory pristine copy of the discovery records
156if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"):
157    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/{\}"
158    print "INFO: Executing : " + commandline
159    status = os.system(commandline)
160else:
161    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"
162    print "INFO: Executing : " + commandline
163    status= os.system(commandline)
164
165if status != 0:
166    sys.exit("Failed at creating copy dir stage")
167
168# make the 'in' pristine copy. Cope with there being lots of files in the directory.
169
170commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} /usr/local/WSClients/OAIBatch/data/" + datacentre + "/oai/originals"
171print "INFO: Executing : " + commandline
172status = os.system(commandline)
173if status !=0:
174    sys.exit("Failed at making pristine copy stage")
175
176# Create/clear the directory for the 'out' processed copy of the discovery records.
177if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"):
178    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/{\}"
179    print "INFO: Executing : " + commandline
180    status = os.system(commandline)
181else:
182    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
183    print "INFO: Executing : " + commandline
184    status= os.system(commandline)
185
186# Create/clear the directory for the 'out' namespace corrected copy of the discovery records.
187if os.path.isdir("/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"):
188    commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected/ | xargs -i rm /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected/{\}"
189    print "INFO: Executing : " + commandline
190    status = os.system(commandline)
191else:
192    commandline = "mkdir /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"
193    print "INFO: Executing : " + commandline
194    status= os.system(commandline)
195
196# The file config.properties contains the location of the particular datacentres harvested records.
197# Copy the datacentre specific version of config to config.properties file.
198commandline = "cp /usr/local/WSClients/OAIBatch/" + datacentre +"_config.properties /usr/local/WSClients/OAIBatch/config.properties"
199print "INFO: Executing : " + commandline
200status = os.system(commandline)
201if status !=0:
202    sys.exit("Failed at copying config file stage")
203
204#Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
205indir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals"
206outdir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
207#wrapFlag=False
208filenames = os.listdir(indir)
209for filename in filenames:
210        if filename.find('.xml') != -1:
211                original_filename = indir + "/" + filename
212                ident=getID(original_filename)
213                print "INFO: ID extracted from the discovery record = %s" %ident
214                if NDG_dataProvider:
215                    new_filename = outdir + "/"+ ident.replace(":","__")+".xml"
216                else:
217                    ident = ident.replace(":","-")
218                    ident = ident.replace("/","-")
219                    new_filename = outdir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
220                print "INFO: original file = %s, newfile = %s" %(original_filename, new_filename)
221                commandline = "cp "+original_filename+ " " +new_filename
222                #print "Executing : " + commandline
223                status = os.system(commandline)
224                if status !=0:
225                    sys.exit("ERROR: Failed at re-naming file stage")
226                numfilesproc += 1
227        else:
228                print 'WARNING: File %s is not xml format. Not processed'  %(filename)
229
230#replace any namespace declarations with a standard one which we know works in NDG
231indir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery"
232outdir="/usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery_corrected"
233filenames = os.listdir(indir)
234for filename in filenames:
235        if filename.find('.xml') != -1:
236                    in_filename = indir + "/" + filename
237                    corrected_filename = outdir + "/" + filename
238                    try:
239                       SchemaNameSpace(in_filename, corrected_filename,datacentre_format)
240                    except:
241                       print "ERROR: SchemaNameSpace failed on file %s"%in_filename
242
243# ingest the datacentres records into eXist db (backups of exist happen nightly).
244commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/original/"+datacentre_format+ "/" +datacentre_namespace+ " -u admin -P "+db_admin+" -p "+outdir
245print "INFO: Executing : actual command to ingest into exist db"
246status = os.system(commandline)
247if status !=0:
248    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
249
250#are there any old moles records hanging around.If so, remove.
251try:
252   os.stat("./DIF2MOLES")
253except:
254   print "INFO: No old moles records hanging around"
255else:
256   commandline = "ls -1 ./DIF2MOLES | xargs -i rm ./DIF2MOLES/{\}"
257   print "INFO: Executing : " + commandline
258   status = os.system(commandline)
259   if status !=0:
260       sys.exit("ERROR: Failed at clearing out DIF2MOLES area.")
261
262# Then run the minimum moles creator for each discovery record
263# Put records in ./DIF2MOLES with same filename
264
265# First get the list of discovery record ids from the db collection
266targetCollection = "/db/discovery/original/"+datacentre_format+ "/" +datacentre_namespace
267if datacentre_format == 'DIF':
268    ndgDir=ndgDirectory(targetCollection,host,docType='DIF')
269    #print ndgDir.members
270else:
271    print 'ERROR: mini-moles creation does not handle MDIP yet! So this WILL FAIL (probably)'
272
273
274#create the mini-moles for each Discovery record in the collection
275for member in ndgDir.members:
276    #print member
277    filename= member['fileName']
278    disc_id = member['EntryID']
279    print "INFO: internal id = %s" %disc_id
280    print "INFO: discovery filename = %s" %filename
281    # now create the xquery
282    # sort out the output ID stuff ...
283    if NDG_dataProvider:
284        discObj=ndgObject(disc_id)
285        xquery=xq.actual('dif2moles',targetCollection,discObj.repository,discObj.localID)
286    else:
287        xquery=xq.actual('dif2moles',targetCollection,datacentre_namespace,disc_id)
288    # and then sort out the input ID stuff
289    xquery=xquery.replace('Input_Entry_ID',disc_id)
290    xquery=xquery.replace('repository_localid', datacentre_namespace )
291    #print xq.help('dif2moles')
292    molesid,s=xmldb.executeQuery(xquery)
293    moles_from_dif=xmldb.retrieve(molesid,0)
294    #print moles_from_dif
295    # now write out xml to file
296    outdir= './DIF2MOLES'
297    f=open(outdir+"/"+filename,'w')
298    f.write(moles_from_dif)
299    f.close()
300
301#There should be some records now
302try:
303    os.stat("./DIF2MOLES")
304except:
305    print "ERROR: couldn't create any minimum moles records for %s" %datacentre
306    sys.exit()
307
308#Add keywords if necessary
309if datacentre_groups == "":
310    commandline = "ls -1 ./DIF2MOLES/ | xargs -i mv ./DIF2MOLES/{\} ./FINALMOLES/"
311    print "INFO: Executing : " + commandline
312    status = os.system(commandline)
313    if status !=0:
314        sys.exit("Failed at moving MOLES to FINAL directory")
315else:
316    keywordAdder.main('./DIF2MOLES', './FINALMOLES', datacentre_groups)
317
318# ingest the created discovery minimum molesrecords into eXist db.
319commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/moles -u admin -P "+db_admin+" -p ./FINALMOLES"
320print "INFO: Executing : actual command to ingest into exist db."
321status = os.system(commandline)
322if status !=0:
323    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
324
325#Extract the spatiotemporal info from created moles and put in Postgres db
326SpaceTimeIngestFromMOLES.main("./FINALMOLES")
327
328#Extract the spatiotemporal info and put into NEW postgis tables
329#SpaceTimeIngestPostgisFromMOLES.main("./FINALMOLES")
330
331#Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
332this_backupdir = backupdir + datacentre + "_" + date_string + "_originals"
333commandline = "mkdir " + this_backupdir
334print "INFO: Executing : " + commandline
335status = os.system(commandline)
336if status !=0:
337    sys.exit("Failed at creating backup directory %s" %this_backupdir)
338
339commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/ | xargs -i cp /usr/local/WSClients/OAIBatch/data/" + datacentre +"/oai/originals/{\} " + this_backupdir
340print "INFO: Executing : " + commandline
341status = os.system(commandline)
342if status !=0:
343    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
344
345this_backupdir = backupdir + datacentre + "_" + date_string + "_discovery"
346commandline = "mkdir " + this_backupdir
347print "INFO: Executing : " + commandline
348status = os.system(commandline)
349if status !=0:
350    sys.exit("Failed at creating backup directory %s" %this_backupdir)
351
352commandline = "ls -1 /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/ | xargs -i cp /usr/local/WSClients/OAIBatch/data/" + datacentre +"/discovery/{\} " + this_backupdir
353print "INFO: Executing : " + commandline
354status = os.system(commandline)
355if status !=0:
356    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
357
358this_backupdir = backupdir + datacentre + "_" + date_string + "_FINALMOLES"
359commandline = "mkdir " + this_backupdir
360print "INFO: Executing : " + commandline
361status = os.system(commandline)
362if status !=0:
363    sys.exit("Failed at creating backup directory %s" %this_backupdir)
364
365commandline = "ls -1 ./FINALMOLES | xargs -i cp ./FINALMOLES/{\} " + this_backupdir
366print "INFO: Executing : " + commandline
367status = os.system(commandline)
368if status !=0:
369    sys.exit("Failed at copying to backup directory %s" %this_backupdir)
370
371#Clear out the original harvest records area and FINALMOLES
372commandline = "ls -1 ./FINALMOLES | xargs -i rm ./FINALMOLES/{\}"
373print "INFO: Executing : " + commandline
374status = os.system(commandline)
375if status !=0:
376    sys.exit("Failed at clearing out FINALMOLES area %s" %harvest_home)
377
378commandline = "ls -1 " + harvest_home + " | xargs -i rm " + harvest_home + "/{\}"
379print "INFO: Executing : " + commandline
380status = os.system(commandline)
381if status !=0:
382    sys.exit("Failed at clearing out original harvest records area %s" %harvest_home)
383
384print "======================================================"
385print "No. of files pre-processed = %s" %numfilesproc
386if status == 0:
387    print " Procedure oai_ingest.py ran to end"
388else:
389    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
390
391print "======================================================"
Note: See TracBrowser for help on using the repository browser.