source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new.py @ 3800

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new.py@3800
Revision 3800, 17.6 KB checked in by cbyrom, 13 years ago (diff)

Upgraded version of ingest codebranch - including major refactoring of the ingest
scripts to make more OO - allowing re-use and simplification of code + removal of reliance
on eXist DB to store data; this will now all be stored and looked up from the Postgres DB

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26# 11/04/08 CByrom Tidy up script by organising code into reusable functions + variables
27# + remove dependency on eXist DB
28
29import os
30import sys
31import commands
32import string
33import keywordAdder
34from SchemaNameSpace import SchemaNameSpace
35from DIF import DIF
36from MDIP import MDIP
37import ndgUtils
38from ndgUtils.ndgXqueries import ndgXqueries
39from ndgUtils.ndgObject import ndgObject
40import ConfigParser
41from ndgUtils.ndgDirectory import ndgDirectory
42from ndgUtils.eXistInterface import ndg_eXist
43from FileUtilities import setUpDir, cleanDir, makeBackUp
44from PostgresRecord import PostgresRecord
45import PostgresDAL
46
47def getID(filename):
48        '''
49        Gets the identifier out of an input metadata xml record.
50        Copes with DIF and MDIP currently.
51        @param filename - name of document file being processed
52        @return: ID - id to use to refer to the document
53        '''
54        xml=file(filename).read()
55        if datacentre_format == "DIF":
56            d=DIF(xml)
57            ID=d.entryID
58        elif datacentre_format == "MDIP":
59            d=MDIP(xml)
60            ID=d.id
61        else:
62            sys.exit("Only handles DIF or MDIP here.")
63        return ID
64
65def addFileToPostgresDB(filename):
66        '''
67        Add a file to the postgres DB - extracting and storing all the required
68        data in the process
69        '''
70        print "Adding file, %s, to postgres DB" %filename
71       
72        # first of all create a PostgresRecord - this object represents all the data required
73        # for a DB entry
74        record = PostgresRecord(filename, NDG_dataProvider)
75        dal = PostgresDAL(record)
76        dal.createOrUpdateRecord()
77
78def getConfigDetails(datacentre):
79        '''
80        Get the harvested records directory and groups for this datacentre from the
81        datacentre specific config file.  The harvested records directory depends on the
82        datacentres OAI base url, the set and format. These have to be know up-front.
83        The groups denote which 'portal groups' they belong to - for limiting searches to
84        say NERC-only datacentres records.
85        Groups are added to the intermediate MOLES when it is created.
86        @param datacentre: datacentre to use when looking up config file
87        '''
88        # set the variables to use the global copies, not the local ones
89        global harvest_home, datacentre_groups, datacentre_format, datacentre_namespace
90        global datacentre_config_filename, NDG_dataProvider
91        datacentre_config_filename = base_dir + datacentre + "_config.properties"
92        print "INFO: Retrieving data from datacentre config file = %s" %datacentre_config_filename
93       
94        # Check this file exists; if not, assume an invalid datacentre has been specified
95        if not os.path.isfile(datacentre_config_filename):
96            sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
97                "specified (%s) is invalid\n" %datacentre)
98           
99        datacentre_config_file = open(datacentre_config_filename, "r")
100       
101        for line in datacentre_config_file.readlines():
102            words  = string.split(line)
103            if len(words) == 0:
104                continue
105            if words[0] == 'host_path':
106                harvest_home = string.rstrip(words[1])
107            if words[0] == 'groups':
108                datacentre_groups = words[1:]
109            if words[0] == 'format':
110                datacentre_format = words[1]
111            if words[0] == 'namespace':
112                datacentre_namespace = words[1]
113            if words[0] == 'NDG_dataProvider':
114                NDG_dataProvider = True
115       
116        datacentre_config_file.close()
117       
118        if harvest_home == "":
119            sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
120       
121        print "INFO: harvested records are in %s" %harvest_home
122       
123        if datacentre_groups == "":
124            print "INFO: No groups/keywords set for datacentre %s" %datacentre
125        else:
126            print "INFO: datacentre groups/keywords = %s" %datacentre_groups
127       
128        if datacentre_format == "":
129            sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
130       
131        print "INFO: format being harvested = %s" %datacentre_format
132       
133        if datacentre_namespace == "":
134            sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
135       
136        print "INFO: datacentre namespace = %s" %datacentre_namespace
137        print lineSeparator
138       
139
140def usage():
141        '''
142        Display input params for the script
143        '''
144        print "Usage: python oai_ingest.py <datacentre> (<dbInfoFile>)"
145        print " - where:\n   <datacentre> is the data centre to ingest data from; and"
146        print "   <dbInfoFile> provides info on the eXist DB to use to do XQuery transforms"
147        sys.exit()
148
149
150def ingestOriginalsToeXist():
151        '''
152        Put original docs into eXist DB - NB, this allows for easy transform via XQuery
153        into the other required doc types
154        '''
155        print "INFO: Putting original docs in eXist..."
156        # this command creates the targetCollection in eXist from the contents of the discovery_corrected_dir
157        commandline = "$EXIST_HOME/bin/client.sh -c " + targetCollection + " -u admin -P " + \
158                db_admin + " -p " + discovery_corrected_dir
159        print lineSeparator
160        print "INFO: Executing : actual command to ingest into exist db", commandline
161        status = os.system(commandline)
162        if status !=0:
163            sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
164        print "INFO: Original docs now in eXist"
165        print lineSeparator
166
167
168def createEXistMiniMoles():
169        '''
170        Transform the docs in eXist into the mini moles format - to allow use of the various
171        existing xqueries to do the transforms from this format to others
172        '''
173        # First get the list of discovery record ids from the db collection
174        print "INFO: Creating minimoles documents from original docs in eXist"
175        if datacentre_format == 'DIF':
176            ndgDir=ndgDirectory(targetCollection,host, datacentre_format.upper())
177        else:
178            print 'ERROR: mini-moles creation does not handle MDIP yet! So this WILL FAIL (probably)'
179       
180        #create the mini-moles for each Discovery record in the collection
181        for member in ndgDir.members:
182            filename= member['fileName']
183            disc_id = member['EntryID']
184            print "INFO: internal id = %s" %disc_id
185            print "INFO: discovery filename = %s" %filename
186            # now create the xquery
187            # sort out the output ID stuff ...
188            if NDG_dataProvider:
189                discObj=ndgObject(disc_id)
190                xquery=xq.actual('dif2moles',targetCollection,discObj.repository,discObj.localID)
191            else:
192                xquery=xq.actual('dif2moles',targetCollection,datacentre_namespace,disc_id)
193
194            # and then sort out the input ID stuff
195            xquery=xquery.replace('Input_Entry_ID',disc_id)
196            xquery=xquery.replace('repository_localid', datacentre_namespace )
197
198            # Now do the transform
199            print "INFO: Running XQuery transform to create minimoles document"
200            molesid,s=xmldb.executeQuery(xquery)
201            moles_from_dif=xmldb.retrieve(molesid,0)
202
203            minimoles_file = minimoles_dir + "/" + filename
204            print "INFO: Creating new minimoles file: ", minimoles_file 
205            f=open(minimoles_file,'w')
206            f.write(moles_from_dif)
207            f.close()
208           
209            filecount = 0
210            for root, dirs, files in os.walk(minimoles_dir):
211                filecount += len(files)
212               
213                if filecount == 0:
214                    print "ERROR: couldn't create any minimum moles records for %s" %datacentre
215                    sys.exit()
216               
217                #Add keywords if necessary
218                addKeywords()
219               
220                # ingest the created discovery minimum molesrecords into eXist db.
221                commandline = "$EXIST_HOME/bin/client.sh -c /db/discovery/moles -u admin -P " + db_admin + \
222                " -p " + finalmoles_dir
223                print "INFO: Executing : actual command to ingest into exist db."
224                status = os.system(commandline)
225                if status !=0:
226                    sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
227
228
229def createEXistCollections():
230        '''
231        Ensure all the required collections are set up in the eXist DB
232        '''
233        commandline = "$EXIST_HOME/bin/client.sh -c /db " + \
234                " -u admin -P " + db_admin + " -m "
235        collections = ['/db/xqueryLib', '/db/discovery/moles']
236        print lineSeparator
237        for collection in collections:
238                cmd = commandline + collection + " -p"
239                print "INFO: Executing : actual command to create DB collection ", cmd
240                status = os.system(cmd)
241                print status
242                if status !=0:
243                        sys.exit("Failed to create DB collection. Status = %s" %(status))
244       
245
246       
247def addKeywords():
248        if datacentre_groups == "":
249            commandline = "ls -1 " + minimoles_dir + " | xargs -i mv " + minimoles_dir + \
250                "{\} " + finalmoles_dir
251            print "INFO: Executing : " + commandline
252            status = os.system(commandline)
253            if status !=0:
254                sys.exit("Failed at moving MOLES to FINAL directory")
255        else:
256            keywordAdder.main(minimoles_dir, finalmoles_dir, datacentre_groups)
257
258                       
259def addXQueriesToeXist():
260        '''
261        The current set of XQueries files refer to some xquery libraries - which are stored in eXist
262        - add them to the DB for use elsewhere
263        '''
264        commandline = "$EXIST_HOME/bin/client.sh -c /db " + \
265                " -u admin -P " + db_admin + " -p " + base_dir + 'xquery'
266        print lineSeparator
267        print "INFO: Executing : actual command to ingest into exist db", commandline
268        status = os.system(commandline)
269        print status
270        if status !=0:
271            sys.exit("Failed at ingesting into exist db. Datacentre =  %s. Status = %s" %(datacentre,status))
272       
273
274       
275lineSeparator = "-----------------------------"
276print lineSeparator
277print "RUNNING: oai_ingest.py"         
278print lineSeparator
279
280if (len(sys.argv) < 2 or len(sys.argv) > 3):
281        usage()
282else:
283    datacentre = sys.argv[1]
284
285# set the default password file
286dbinfoname = "ingest.txt"
287if (len(sys.argv) == 3):
288        dbinfoname = sys.argv[2]
289
290status = 0
291numfilesproc = 0
292base_dir = "/usr/local/WSClients/OAIBatch/" # this is the base dir that the script is ran from
293#base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from
294       
295data_dir = base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
296
297#Change os directory to that with the harvested documents in it.
298os.chdir(base_dir)
299
300# Other settings and constants
301date_string = commands.getoutput("date +'%y%m%d_%H%M'")
302#os.putenv('EXIST_HOME', '/usr/local/exist-client')
303os.putenv('EXIST_HOME', '/home/users/cbyrom/opt/eXist')
304os.putenv('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.')
305
306# set the global variables to retrieve from the config file
307harvest_home = ""
308datacentre_groups = ""
309datacentre_format = ""
310datacentre_namespace = ""
311NDG_dataProvider = False
312getConfigDetails(datacentre)
313
314#any records to harvest?
315if len( os.listdir(harvest_home)) == 0:
316    print "INFO: Nothing to harvest this time from %s" %datacentre
317    sys.exit()
318
319# get the db access info from the specified input file
320dbaccess={}
321dbinfo_file=open(dbinfoname,"r")
322for line in dbinfo_file.readlines():
323    words  = string.split(line)
324    if len(words) < 2:
325        continue
326    dbaccess[(words[0],words[1])] = [words[2]]
327dbinfo_file.close()
328#host ='glue.badc.rl.ac.uk'
329host ='localhost'
330try:
331        db_admin = dbaccess[(host,'admin')][0]
332except:
333        sys.exit("ERROR: Could not find details for %s DB user 'admin' in DB config file\n" %host)
334
335# The directory to put things for a tape backup (should already exist)
336backupdir = '/disks/glue1/oaiBackup/'
337
338# the following dirs define where the specific documents should go
339originals_dir = data_dir +"/oai/originals/"
340discovery_dir = data_dir +"/discovery/"
341discovery_corrected_dir = data_dir +"/discovery_corrected/"
342
343# Create/clear the 'in' directory pristine copy of the discovery records
344setUpDir(originals_dir)
345commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} " + originals_dir
346print "INFO: Executing : " + commandline
347status = os.system(commandline)
348if status !=0:
349    sys.exit("Failed at making pristine copy stage")
350
351# Create/clear the directory for the 'out' processed copy of the discovery records.
352setUpDir(discovery_dir)
353   
354# Create/clear the directory for the 'out' namespace corrected copy of the discovery records.
355setUpDir(discovery_corrected_dir)
356
357# The file config.properties contains the location of the particular datacentres harvested records.
358# Copy the datacentre specific version of config to config.properties file.
359commandline = "cp " + datacentre_config_filename + " " + base_dir + "config.properties"
360print "INFO: Executing : " + commandline
361status = os.system(commandline)
362if status !=0:
363    sys.exit("Failed at copying config file stage")
364
365#Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
366# NB, this copies files from the original dir to the discovery dir
367print lineSeparator
368print "INFO: Renaming files:"
369for filename in os.listdir(originals_dir):
370        if filename.find('.xml') != -1:
371                original_filename = originals_dir + filename
372                ident=getID(original_filename)
373                print "INFO: ID extracted from the discovery record = %s" %ident
374                if NDG_dataProvider:
375                    new_filename = discovery_dir + ident.replace(":","__")+".xml"
376                else:
377                    ident = ident.replace(":","-")
378                    ident = ident.replace("/","-")
379                    new_filename = discovery_dir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml"
380 
381                print "original file = %s\nnewfile = %s" %(original_filename, new_filename)
382               
383                try:
384                        os.rename(original_filename, new_filename)
385                except:
386                        sys.exit("ERROR: Failed to rename file %s to %s" %(original_filename, new_filename))
387                numfilesproc += 1
388        else:
389                print 'WARNING: File %s is not xml format. Not processed'  %(filename)
390
391#replace any namespace declarations with a standard one which we know works in NDG
392# NB, this copies files from the discovery dir to the discovery corrected dir
393print lineSeparator
394print "INFO: Correcting namespaces of files (corrected files in %s)" %discovery_corrected_dir
395for filename in os.listdir(discovery_dir):
396        if filename.find('.xml') != -1:
397                    in_filename = discovery_dir + filename
398                    corrected_filename = discovery_corrected_dir + filename
399                    try:
400                       SchemaNameSpace(in_filename, corrected_filename,datacentre_format)
401                    except:
402                       print "ERROR: SchemaNameSpace failed on file %s"%in_filename
403
404# put the original docs in eXist - as the targetCollection
405# NB, firstly ensure the appropriate collections are set up
406createEXistCollections()
407
408# now set up the required XQueries and check xqueries libs are available from eXist
409targetCollection = "/db/discovery/original/" +datacentre_format+ "/" +datacentre_namespace
410xq=ndgXqueries()
411xmldb=ndg_eXist(db='' + host + '')
412
413addXQueriesToeXist()
414
415ingestOriginalsToeXist()
416
417# now transform these to minimoles format
418minimoles_dir = base_dir + 'MINIMOLES/'
419finalmoles_dir = base_dir + 'FINALMOLES/'
420setUpDir(minimoles_dir)
421setUpDir(finalmoles_dir)
422createEXistMiniMoles()
423
424# TODO: need to skip the eXist steps below and just populate the postGres DB
425# 1. using the discovery_corrected_dir - (do we need the non corrected files?) - import into original column
426# 2. use the correct xquery to transform the original doc directly and store t
427
428
429# Process the resulting files and put the data into the postgres DB
430filenames = os.listdir(discovery_corrected_dir)
431for filename in filenames:
432        addFileToPostgresDB(discovery_corrected_dir + filename)
433
434#SpaceTimeIngestFromMOLES.main("./FINALMOLES")
435
436#Extract the spatiotemporal info and put into NEW postgis tables
437#SpaceTimeIngestPostgisFromMOLES.main("./FINALMOLES")
438
439#Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
440backupdir_base = backupdir + datacentre + "_" + date_string
441this_backupdir = backupdir_base + "_originals"
442makeBackUp(originals_dir, this_backupdir)
443
444this_backupdir = backupdir_base + "_discovery"
445makeBackUp(discovery_dir, this_backupdir)
446
447this_backupdir = backupdir_base + "_FINALMOLES"
448makeBackUp("./FINALMOLES", this_backupdir)
449
450#Clear out the original harvest records area and FINALMOLES
451cleanDir("./FINALMOLES")
452cleanDir(harvest_home)
453
454print "======================================================"
455print "No. of files pre-processed = %s" %numfilesproc
456if status == 0:
457    print " Procedure oai_ingest.py ran to end"
458else:
459    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
460
461print "======================================================"
Note: See TracBrowser for help on using the repository browser.