source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py @ 3845

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py@3845
Revision 3845, 12.9 KB checked in by cbyrom, 13 years ago (diff)

Fix creation of backup dirs.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2""" Script oai_ingest.py takes parameters <datacentre> <dbinfofile>.
3The /usr/local/WSClients/OAIBatch directory contains:-
4 - this python script, plus some other modules eg ndgUtils for parts of the process.
5 - a DataProvider specific config file,
6 - the python module for extracting spatiotemporal information and adding to postgres db.
7Under this directory the following structure should be maintained:
8 ./data
9 - /DATACENTRE/
10                - discovery/:         Re-named documents.
11        - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service.
12                - oai/difYYYYMMDD/    Documents as harvested from OAI
13 Where  /DATACENTRE  varies for the different data providers
14"""
15#History:
16# 12/05/06 SEL spelling correction
17# 30/05/06 SEL cope with many files for processing."Argument list too long" problem.
18# 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version).
19# 16/10/06 SEL Changed to using python oaiClean.py module instead of java code.
20# 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade.
21# 17/10/06 SEL cope with different discovery formats - not just DIF.
22# 23/10/06 SEL keywords not mandatory in config file.
23# 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running.
24#  December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java.
25#                    Also extracted hard coded pwds into a file.
26# 11/04/08 CByrom Tidy up script by organising code into reusable functions + variables
27# + remove dependency on eXist DB
28
29import os, sys, string, getopt, logging
30from time import strftime
31from SchemaNameSpace import SchemaNameSpace
32from DIF import DIF
33from MDIP import MDIP
34import ndgUtils
35from ndgUtils.ndgXqueries import ndgXqueries
36from FileUtilities import FileUtilities
37from PostgresRecord import PostgresRecord
38from PostgresDAO import PostgresDAO
39import db_funcs
40
41class oai_ingest:
42        '''
43        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
44        - including running the various transforms and parsings to get all doc types and spatiotemporal data
45        in the correct form in the DB
46        '''
47
48        def getID(self, filename):
49                '''
50                Gets the identifier out of an input metadata xml record.
51                Copes with DIF and MDIP currently.
52                @param filename - name of document file being processed
53                @return: ID - id to use to refer to the document
54                '''
55                logging.info("Retrieving identifier for metadata record " + filename)
56                xml=file(filename).read()
57                if self._datacentre_format == "DIF":
58                    d=DIF(xml)
59                    ID=d.entryID
60                elif self._datacentre_format == "MDIP":
61                    d=MDIP(xml)
62                    ID=d.id
63                else:
64                    sys.exit("Only handles DIF or MDIP here.")
65       
66                logging.info("Found identifier: " + ID)
67                return ID
68       
69       
70        def addFileToPostgresDB(self, filename):
71                '''
72                Add a file to the postgres DB - extracting and storing all the required
73                data in the process
74                @param filename: full path of file to add to postgres DB
75                '''
76                if not os.path.isfile(filename):
77                        logging.info("Skipping, %s - not a valid file" %filename)
78                        return
79               
80                logging.info("Adding file, " + filename + ", to postgres DB")
81                discoveryID = self.getID(filename)
82               
83                # first of all create a PostgresRecord - this object represents all the data required
84                # for a DB entry
85                record = PostgresRecord(filename, self._NDG_dataProvider, \
86                                                            self._datacentre_groups, self._datacentre_namespace, \
87                                                            discoveryID, self._xq, self._datacentre_format)
88       
89                # Now create the data access object to interface to the DB
90                dao = PostgresDAO(record, self._dbConnection)
91               
92                # Finally, write the new record
93                dao.createOrUpdateRecord()
94       
95       
96        def getConfigDetails(self, datacentre):
97                '''
98                Get the harvested records directory and groups for this datacentre from the
99                datacentre specific config file.  The harvested records directory depends on the
100                datacentres OAI base url, the set and format. These have to be know up-front.
101                The groups denote which 'portal groups' they belong to - for limiting searches to
102                say NERC-only datacentres records.
103                Groups are added to the intermediate MOLES when it is created.
104                @param datacentre: datacentre to use when looking up config file
105                '''
106                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
107                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
108               
109                # Check this file exists; if not, assume an invalid datacentre has been specified
110                if not os.path.isfile(self._datacentre_config_filename):
111                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
112                        "specified (%s) is invalid\n" %datacentre)
113                   
114                datacentre_config_file = open(self._datacentre_config_filename, "r")
115               
116                for line in datacentre_config_file.readlines():
117                    words  = string.split(line)
118                    if len(words) == 0:
119                        continue
120                    if words[0] == 'host_path':
121                        self._harvest_home = string.rstrip(words[1])
122                    if words[0] == 'groups':
123                        self._datacentre_groups = words[1:]
124                    if words[0] == 'format':
125                        self._datacentre_format = words[1]
126                    if words[0] == 'namespace':
127                        self._datacentre_namespace = words[1]
128                    if words[0] == 'self._NDG_dataProvider':
129                        self._NDG_dataProvider = True
130               
131                datacentre_config_file.close()
132               
133                if self._harvest_home == "":
134                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
135               
136                logging.info("harvested records are in " + self._harvest_home)
137               
138                if self._datacentre_groups == "":
139                    logging.info("No groups/keywords set for datacentre " + datacentre)
140                else:
141                    logging.info("datacentre groups/keywords: " + self._datacentre_groups)
142               
143                if self._datacentre_format == "":
144                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
145               
146                logging.info("format being harvested: " + self._datacentre_format)
147               
148                if self._datacentre_namespace == "":
149                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
150               
151                logging.info("datacentre namespace: " + self._datacentre_namespace)
152               
153                if self._NDG_dataProvider:
154                        logging.info("Datacentre classified as an NDG data provider")
155                else:
156                        logging.info("Datacentre is not classificied as an NDG data provider")
157                print self.lineSeparator
158
159               
160        def _getDBConnection(self):
161                '''
162                Get the default DB connection - by reading in data from the db config file
163                '''
164                logging.info("Setting up connection to postgres DB")
165                dbinfo_file=open('ingest.config', "r")
166                dbinfo = dbinfo_file.read().split()
167                if len(dbinfo) != 4:
168                        raise ValueError, 'Incorrect data in config file'
169               
170                self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
171                logging.info("Postgres DB connection now set up")
172
173       
174        def usage(self):
175                '''
176                Display input params for the script
177                '''
178                print "Usage: python -v oai_ingest.py <datacentre>"
179                print " - where:\n   <datacentre> is the data centre to ingest data from; and"
180                print " -v - verbose mode for output logging"
181                sys.exit(2)
182
183               
184        def __init__(self, datacentre=None):
185                '''
186                Main entry point for script
187                '''
188                self.lineSeparator = "-----------------------------"
189                print self.lineSeparator
190                print "RUNNING: oai_ingest.py"         
191               
192                # check for verbose option
193                try:
194                    opts, args = getopt.getopt(sys.argv[1:], "v")
195                except getopt.GetoptError, err:
196                    # print help information and exit:
197                    print str(err) # will print something like "option -a not recognized"
198                    self.usage()
199                   
200                loggingLevel = logging.WARNING
201                for o, a in opts:
202                    if o == "-v":
203                        print " - Verbose mode ON"
204                        loggingLevel = logging.DEBUG
205               
206                logging.basicConfig(level=loggingLevel,
207                                                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
208               
209                print self.lineSeparator
210               
211                if datacentre is None:
212                        self.usage()
213               
214                # create file utils object to do various file related stuff
215                fileUtils = FileUtilities()
216               
217                status = 0
218                numfilesproc = 0
219                self._base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from
220                       
221                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
222               
223                #Change os directory to that with the harvested documents in it.
224                os.chdir(self._base_dir)
225               
226                # - to run on Windows under cygwin, use the following
227                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
228               
229                # set the global variables to retrieve from the config file
230                self._harvest_home = ""
231                self._datacentre_groups = ""
232                self._datacentre_format = ""
233                self._datacentre_namespace = ""
234                self._NDG_dataProvider = False
235                self.getConfigDetails(datacentre)
236               
237                #any records to harvest?
238                if len( os.listdir(self._harvest_home)) == 0:
239                    logging.info("Nothing to harvest this time from " + datacentre)
240                    sys.exit()
241               
242                # The directory to put things for a tape backup (should already exist)
243                #backupdir = '/disks/glue1/oaiBackup/'
244                # TODO: uncomment above on live system
245                backupdir = data_dir + "/backups/"
246               
247                # the following dirs define where the specific documents should go
248                originals_dir = data_dir + "/oai/originals/"
249                discovery_dir = data_dir + "/discovery/"
250               
251                # Create/clear the 'in' directory pristine copy of the discovery records
252                fileUtils.setUpDir(originals_dir)
253                commandline = "ls -1 " + self._harvest_home + "/ | xargs -i cp " + self._harvest_home + "/{\} " + originals_dir
254                #commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
255                logging.info("Executing : " + commandline)
256                status = os.system(commandline)
257               
258                if status !=0:
259                    sys.exit("Failed at making pristine copy stage")
260               
261                # Create/clear the directory for the 'out' processed copy of the discovery records.
262                fileUtils.setUpDir(discovery_dir)
263                   
264                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
265                # - also replace any namespace declarations with a standard one which we know works in NDG
266                # NB, this copies files from the original dir to the discovery dir
267                logging.info(self.lineSeparator)
268                logging.info("Renaming files:")
269                for filename in os.listdir(originals_dir):
270                        if filename.endswith('.xml'):
271                                original_filename = originals_dir + filename
272                                ident=self.getID(original_filename)
273                               
274                                if self._NDG_dataProvider:
275                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
276                                else:
277                                                ident = ident.replace(":","-")
278                                                ident = ident.replace("/","-")
279                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
280                                                logging.info("original file = " + original_filename)
281                                                logging.info("newfile = " + new_filename)
282                               
283                                # now correct any namespace issues
284                                try:
285                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
286                                except:
287                                        sys.exit("ERROR: SchemaNameSpace failed on file %s" %original_filename)
288                                numfilesproc += 1
289                        else:
290                                logging.warning('File %s is not xml format. Not processed'  %(filename))
291               
292                logging.info(self.lineSeparator)
293               
294                # now set up the required XQueries
295                # - NB, extract the xquery libraries locally for easy reference
296                self._xq=ndgXqueries()
297                for libFile in self._xq.xqlib:
298                        fileUtils.createFile(libFile, self._xq.xqlib[libFile])
299               
300                # Process the resulting files and put the data into the postgres DB
301                # - firstly set up a db connection to use
302                self._dbConnection = None
303                self._getDBConnection()
304               
305                filenames = os.listdir(discovery_dir)
306                for filename in filenames:
307                        self.addFileToPostgresDB(discovery_dir + filename)
308               
309                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
310                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
311               
312                this_backupdir = backupdir_base + "_originals/"
313                fileUtils.makeBackUp(originals_dir, this_backupdir)
314               
315                #Clear out the original harvest records area and FINALMOLES
316                fileUtils.cleanDir(originals_dir)
317                fileUtils.cleanDir(discovery_dir)
318                # TODO: uncomment following line when live on system
319                #fileUtils.cleanDir(self._harvest_home)
320               
321                print self.lineSeparator
322                print "INFO: No. of files pre-processed = %s" %numfilesproc
323                if status == 0:
324                    print "INFO: Procedure oai_ingest.py completed"
325                else:
326                    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status
327                print self.lineSeparator
328               
329       
330if __name__=="__main__":
331        opts, args = getopt.getopt(sys.argv[1:], "v")
332        if len(args) < 1:
333                oai_ingest()
334       
335        oai_ingest(args[0])
Note: See TracBrowser for help on using the repository browser.