source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py @ 3857

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py@3857
Revision 3857, 13.3 KB checked in by cbyrom, 11 years ago (diff)

Add 'debug' mode to ingest script - to allow more verbose logging.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26# 11/04/08 CByrom Tidy up script by organising code into reusable functions + variables
27# + remove dependency on eXist DB
28
29import os, sys, string, getopt, logging
30from time import strftime
31from SchemaNameSpace import SchemaNameSpace
32from DIF import DIF
33from MDIP import MDIP
34import ndgUtils
35from ndgUtils.ndgXqueries import ndgXqueries
36from FileUtilities import FileUtilities
37from PostgresRecord import PostgresRecord
38from PostgresDAO import PostgresDAO
39import db_funcs
40
41class oai_ingest:
42        '''
43        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
44        - including running the various transforms and parsings to get all doc types and spatiotemporal data
45        in the correct form in the DB
46        '''
47
48        def getID(self, filename):
49                '''
50                Gets the identifier out of an input metadata xml record.
51                Copes with DIF and MDIP currently.
52                @param filename - name of document file being processed
53                @return: ID - id to use to refer to the document
54                '''
55                logging.info("Retrieving identifier for metadata record " + filename)
56                xml=file(filename).read()
57                if self._datacentre_format == "DIF":
58                    d=DIF(xml)
59                    ID=d.entryID
60                elif self._datacentre_format == "MDIP":
61                    d=MDIP(xml)
62                    ID=d.id
63                else:
64                    sys.exit("Only handles DIF or MDIP here.")
65       
66                logging.info("Found identifier: " + ID)
67                return ID
68       
69       
70        def addFileToPostgresDB(self, filename):
71                '''
72                Add a file to the postgres DB - extracting and storing all the required
73                data in the process
74                @param filename: full path of file to add to postgres DB
75                '''
76                if not os.path.isfile(filename):
77                        logging.info("Skipping, %s - not a valid file" %filename)
78                        return
79               
80                logging.info("Adding file, " + filename + ", to postgres DB")
81                discoveryID = self.getID(filename)
82               
83                # first of all create a PostgresRecord - this object represents all the data required
84                # for a DB entry
85                record = PostgresRecord(filename, self._NDG_dataProvider, \
86                                                            self._datacentre_groups, self._datacentre_namespace, \
87                                                            discoveryID, self._xq, self._datacentre_format)
88       
89                # Now create the data access object to interface to the DB
90                dao = PostgresDAO(record, self._dbConnection)
91               
92                # Finally, write the new record
93                if dao.createOrUpdateRecord():
94                        self._no_files_ingested += 1
95                       
96       
97       
98        def getConfigDetails(self, datacentre):
99                '''
100                Get the harvested records directory and groups for this datacentre from the
101                datacentre specific config file.  The harvested records directory depends on the
102                datacentres OAI base url, the set and format. These have to be know up-front.
103                The groups denote which 'portal groups' they belong to - for limiting searches to
104                say NERC-only datacentres records.
105                Groups are added to the intermediate MOLES when it is created.
106                @param datacentre: datacentre to use when looking up config file
107                '''
108                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
109                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
110               
111                # Check this file exists; if not, assume an invalid datacentre has been specified
112                if not os.path.isfile(self._datacentre_config_filename):
113                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
114                        "specified (%s) is invalid\n" %datacentre)
115                   
116                datacentre_config_file = open(self._datacentre_config_filename, "r")
117               
118                for line in datacentre_config_file.readlines():
119                    words  = string.split(line)
120                    if len(words) == 0:
121                        continue
122                    if words[0] == 'host_path':
123                        self._harvest_home = string.rstrip(words[1])
124                    if words[0] == 'groups':
125                        self._datacentre_groups = words[1:]
126                    if words[0] == 'format':
127                        self._datacentre_format = words[1]
128                    if words[0] == 'namespace':
129                        self._datacentre_namespace = words[1]
130                    if words[0] == 'self._NDG_dataProvider':
131                        self._NDG_dataProvider = True
132               
133                datacentre_config_file.close()
134               
135                if self._harvest_home == "":
136                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
137               
138                logging.info("harvested records are in " + self._harvest_home)
139               
140                if self._datacentre_groups == "":
141                    logging.info("No groups/keywords set for datacentre " + datacentre)
142                else:
143                    logging.info("datacentre groups/keywords: " + self._datacentre_groups)
144               
145                if self._datacentre_format == "":
146                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
147               
148                logging.info("format being harvested: " + self._datacentre_format)
149               
150                if self._datacentre_namespace == "":
151                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
152               
153                logging.info("datacentre namespace: " + self._datacentre_namespace)
154               
155                if self._NDG_dataProvider:
156                        logging.info("Datacentre classified as an NDG data provider")
157                else:
158                        logging.info("Datacentre is not classificied as an NDG data provider")
159                print self.lineSeparator
160
161               
162        def _getDBConnection(self):
163                '''
164                Get the default DB connection - by reading in data from the db config file
165                '''
166                logging.info("Setting up connection to postgres DB")
167                dbinfo_file=open('ingest.config', "r")
168                dbinfo = dbinfo_file.read().split()
169                if len(dbinfo) < 4:
170                        raise ValueError, 'Incorrect data in config file'
171               
172                # if port specified in file, use this, otherwise use default
173                if len(dbinfo) > 4:
174                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4])
175                else:
176                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
177                logging.info("Postgres DB connection now set up")
178
179       
180        def usage(self):
181                '''
182                Display input params for the script
183                '''
184                print "Usage: python -v oai_ingest.py <datacentre>"
185                print " - where:\n   <datacentre> is the data centre to ingest data from; and"
186                print " -v - verbose mode for output logging"
187                print " -d - debug mode for output logging"
188                sys.exit(2)
189
190               
191        def __init__(self, datacentre=None):
192                '''
193                Main entry point for script
194                '''
195                self.lineSeparator = "-----------------------------"
196                print self.lineSeparator
197                print "RUNNING: oai_ingest.py"         
198               
199                # check for verbose option
200                try:
201                    opts, args = getopt.getopt(sys.argv[1:], "vd")
202                except getopt.GetoptError, err:
203                    # print help information and exit:
204                    print str(err) # will print something like "option -a not recognized"
205                    self.usage()
206                   
207                loggingLevel = logging.WARNING
208                for o, a in opts:
209                    if o == "-v":
210                        print " - Verbose mode ON"
211                        loggingLevel = logging.INFO
212                    elif o == "-d":
213                        print " - Debug mode ON"
214                        loggingLevel = logging.DEBUG
215               
216                logging.basicConfig(level=loggingLevel,
217                                                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
218               
219                print self.lineSeparator
220               
221                if datacentre is None:
222                        self.usage()
223               
224                # create file utils object to do various file related stuff
225                fileUtils = FileUtilities()
226               
227                status = 0
228                numfilesproc = 0
229                self._no_files_ingested = 0
230                self._base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from
231                       
232                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
233               
234                #Change os directory to that with the harvested documents in it.
235                os.chdir(self._base_dir)
236               
237                # - to run on Windows under cygwin, use the following
238                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
239               
240                # set the global variables to retrieve from the config file
241                self._harvest_home = ""
242                self._datacentre_groups = ""
243                self._datacentre_format = ""
244                self._datacentre_namespace = ""
245                self._NDG_dataProvider = False
246                self.getConfigDetails(datacentre)
247               
248                #any records to harvest?
249                if len( os.listdir(self._harvest_home)) == 0:
250                    logging.info("Nothing to harvest this time from " + datacentre)
251                    sys.exit()
252               
253                # The directory to put things for a tape backup (should already exist)
254                #backupdir = '/disks/glue1/oaiBackup/'
255                # TODO: uncomment above on live system
256                backupdir = data_dir + "/backups/"
257               
258                # the following dirs define where the specific documents should go
259                originals_dir = data_dir + "/oai/originals/"
260                discovery_dir = data_dir + "/discovery/"
261               
262                # Create/clear the 'in' directory pristine copy of the discovery records
263                fileUtils.setUpDir(originals_dir)
264                commandline = "ls -1 " + self._harvest_home + "/ | xargs -i cp " + self._harvest_home + "/{\} " + originals_dir
265                #commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
266                logging.info("Executing : " + commandline)
267                status = os.system(commandline)
268               
269                if status !=0:
270                    sys.exit("Failed at making pristine copy stage")
271               
272                # Create/clear the directory for the 'out' processed copy of the discovery records.
273                fileUtils.setUpDir(discovery_dir)
274                   
275                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
276                # - also replace any namespace declarations with a standard one which we know works in NDG
277                # NB, this copies files from the original dir to the discovery dir
278                logging.info(self.lineSeparator)
279                logging.info("Renaming files:")
280                for filename in os.listdir(originals_dir):
281                        if filename.endswith('.xml'):
282                                original_filename = originals_dir + filename
283                                ident=self.getID(original_filename)
284                               
285                                if self._NDG_dataProvider:
286                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
287                                else:
288                                                ident = ident.replace(":","-")
289                                                ident = ident.replace("/","-")
290                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
291                                                logging.info("original file = " + original_filename)
292                                                logging.info("newfile = " + new_filename)
293                               
294                                # now correct any namespace issues
295                                try:
296                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
297                                except:
298                                        sys.exit("ERROR: SchemaNameSpace failed on file %s" %original_filename)
299                                numfilesproc += 1
300                        else:
301                                logging.warning('File %s is not xml format. Not processed'  %(filename))
302               
303                logging.info(self.lineSeparator)
304               
305                # now set up the required XQueries
306                # - NB, extract the xquery libraries locally for easy reference
307                self._xq=ndgXqueries()
308                for libFile in self._xq.xqlib:
309                        fileUtils.createFile(libFile, self._xq.xqlib[libFile])
310               
311                # Process the resulting files and put the data into the postgres DB
312                # - firstly set up a db connection to use
313                self._dbConnection = None
314                self._getDBConnection()
315               
316                filenames = os.listdir(discovery_dir)
317                for filename in filenames:
318                        self.addFileToPostgresDB(discovery_dir + filename)
319               
320                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
321                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
322               
323                this_backupdir = backupdir_base + "_originals/"
324                fileUtils.makeBackUp(originals_dir, this_backupdir)
325               
326                #Clear out the original harvest records area and FINALMOLES
327                fileUtils.cleanDir(originals_dir)
328                fileUtils.cleanDir(discovery_dir)
329                # TODO: uncomment following line when live on system
330                #fileUtils.cleanDir(self._harvest_home)
331               
332                print self.lineSeparator
333                print "INFO: Number of files processed = %s" %numfilesproc
334                print "INFO: Number of files ingested = %s" %self._no_files_ingested
335                if status == 0:
336                    print "INFO: Procedure oai_ingest.py completed"
337                else:
338                    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
339                print self.lineSeparator
340               
341       
342if __name__=="__main__":
343        opts, args = getopt.getopt(sys.argv[1:], "v")
344        if len(args) < 1:
345                oai_ingest()
346       
347        oai_ingest(args[0])
Note: See TracBrowser for help on using the repository browser.