source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py @ 3861

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py@3861
Revision 3861, 12.4 KB checked in by cbyrom, 11 years ago (diff)

Clear out redundant code and rename main script, oai_document_ingester

  • change references to this accordingly.
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from DIF import DIF
12from MDIP import MDIP
13import ndgUtils
14from ndgUtils.ndgXqueries import ndgXqueries
15from FileUtilities import FileUtilities
16from PostgresRecord import PostgresRecord
17from PostgresDAO import PostgresDAO
18import db_funcs
19
20class oai_document_ingester:
21        '''
22        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
23        - including running the various transforms and parsings to get all doc types and spatiotemporal
24        data in the correct form in the DB
25        '''
26
27        def getID(self, filename):
28                '''
29                Gets the identifier out of an input metadata xml record.
30                Copes with DIF and MDIP currently.
31                @param filename - name of document file being processed
32                @return: ID - id to use to refer to the document
33                '''
34                logging.info("Retrieving identifier for metadata record " + filename)
35                xml=file(filename).read()
36                if self._datacentre_format == "DIF":
37                    d=DIF(xml)
38                    ID=d.entryID
39                elif self._datacentre_format == "MDIP":
40                    d=MDIP(xml)
41                    ID=d.id
42                else:
43                    sys.exit("Only handles DIF or MDIP here.")
44       
45                logging.info("Found identifier: " + ID)
46                return ID
47       
48       
49        def addFileToPostgresDB(self, filename):
50                '''
51                Add a file to the postgres DB - extracting and storing all the required
52                data in the process
53                @param filename: full path of file to add to postgres DB
54                '''
55                if not os.path.isfile(filename):
56                        logging.info("Skipping, %s - not a valid file" %filename)
57                        return
58               
59                logging.info("Adding file, " + filename + ", to postgres DB")
60                discoveryID = self.getID(filename)
61               
62                # first of all create a PostgresRecord - this object represents all the data required
63                # for a DB entry
64                try:
65                        record = PostgresRecord(filename, self._NDG_dataProvider, \
66                                                            self._datacentre_groups, self._datacentre_namespace, \
67                                                            discoveryID, self._xq, self._datacentre_format)
68       
69                        # Now create the data access object to interface to the DB
70                        dao = PostgresDAO(record, self._dbConnection)
71               
72                        # Finally, write the new record
73                        if dao.createOrUpdateRecord():
74                                self._no_files_ingested += 1
75                except:
76                        logging.error("Exception thrown - detail: ")
77                        logging.error(sys.exc_info())
78                       
79       
80       
81        def getConfigDetails(self, datacentre):
82                '''
83                Get the harvested records directory and groups for this datacentre from the
84                datacentre specific config file.  The harvested records directory depends on the
85                datacentres OAI base url, the set and format. These have to be know up-front.
86                The groups denote which 'portal groups' they belong to - for limiting searches to
87                say NERC-only datacentres records.
88                Groups are added to the intermediate MOLES when it is created.
89                @param datacentre: datacentre to use when looking up config file
90                '''
91                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
92                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
93               
94                # Check this file exists; if not, assume an invalid datacentre has been specified
95                if not os.path.isfile(self._datacentre_config_filename):
96                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
97                        "specified (%s) is invalid\n" %datacentre)
98                   
99                datacentre_config_file = open(self._datacentre_config_filename, "r")
100               
101                for line in datacentre_config_file.readlines():
102                    words  = string.split(line)
103                    if len(words) == 0:
104                        continue
105                    if words[0] == 'host_path':
106                        self._harvest_home = string.rstrip(words[1])
107                    if words[0] == 'groups':
108                        self._datacentre_groups = words[1:]
109                    if words[0] == 'format':
110                        self._datacentre_format = words[1]
111                    if words[0] == 'namespace':
112                        self._datacentre_namespace = words[1]
113                    if words[0] == 'self._NDG_dataProvider':
114                        self._NDG_dataProvider = True
115               
116                datacentre_config_file.close()
117               
118                if self._harvest_home == "":
119                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
120               
121                logging.info("harvested records are in " + self._harvest_home)
122               
123                if self._datacentre_groups == "":
124                    logging.info("No groups/keywords set for datacentre " + datacentre)
125                else:
126                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
127               
128                if self._datacentre_format == "":
129                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
130               
131                logging.info("format being harvested: " + self._datacentre_format)
132               
133                if self._datacentre_namespace == "":
134                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
135               
136                logging.info("datacentre namespace: " + self._datacentre_namespace)
137               
138                if self._NDG_dataProvider:
139                        logging.info("Datacentre classified as an NDG data provider")
140                else:
141                        logging.info("Datacentre is not classificied as an NDG data provider")
142                print self.lineSeparator
143
144               
145        def _getDBConnection(self):
146                '''
147                Get the default DB connection - by reading in data from the db config file
148                '''
149                logging.info("Setting up connection to postgres DB")
150                dbinfo_file=open('ingest.config', "r")
151                dbinfo = dbinfo_file.read().split()
152                if len(dbinfo) < 4:
153                        raise ValueError, 'Incorrect data in config file'
154               
155                # if port specified in file, use this, otherwise use default
156                if len(dbinfo) > 4:
157                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4])
158                else:
159                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
160                logging.info("Postgres DB connection now set up")
161
162       
163        def usage(self):
164                '''
165                Display input params for the script
166                '''
167                print "Usage: python oai_document_ingester.py -v|d<datacentre>"
168                print " - where:\n   <datacentre> is the data centre to ingest data from; and"
169                print " -v - verbose mode for output logging"
170                print " -d - debug mode for output logging"
171                sys.exit(2)
172
173               
174        def __init__(self, datacentre=None):
175                '''
176                Main entry point for script
177                '''
178                self.lineSeparator = "-----------------------------"
179                print self.lineSeparator
180                print "RUNNING: oai_document_ingester.py"               
181               
182                # check for verbose option
183                try:
184                    opts, args = getopt.getopt(sys.argv[1:], "vd")
185                except getopt.GetoptError, err:
186                    # print help information and exit:
187                    print str(err) # will print something like "option -a not recognized"
188                    self.usage()
189                   
190                loggingLevel = logging.WARNING
191                for o, a in opts:
192                    if o == "-v":
193                        print " - Verbose mode ON"
194                        loggingLevel = logging.INFO
195                    elif o == "-d":
196                        print " - Debug mode ON"
197                        loggingLevel = logging.DEBUG
198               
199                logging.basicConfig(level=loggingLevel,
200                                                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
201               
202                print self.lineSeparator
203               
204                if datacentre is None:
205                        self.usage()
206               
207                # create file utils object to do various file related stuff
208                fileUtils = FileUtilities()
209               
210                status = 0
211                numfilesproc = 0
212                self._no_files_ingested = 0
213                self._base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from
214                       
215                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
216               
217                #Change os directory to that with the harvested documents in it.
218                os.chdir(self._base_dir)
219               
220                # - to run on Windows under cygwin, use the following
221                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
222               
223                # set the global variables to retrieve from the config file
224                self._harvest_home = ""
225                self._datacentre_groups = ""
226                self._datacentre_format = ""
227                self._datacentre_namespace = ""
228                self._NDG_dataProvider = False
229                self.getConfigDetails(datacentre)
230               
231                # check harvest dir exists and that there are any records to harvest?
232                dpath = os.path.normpath(os.path.dirname(self._harvest_home))
233                if not os.path.exists(dpath):
234                        logging.info("Harvest directory for datacentre %s (%s) could not be found - exiting" \
235                                                 %(datacentre, self._harvest_home))
236                        return
237                elif len(os.listdir(self._harvest_home)) == 0:
238                        logging.info("Nothing to harvest this time from %s" %datacentre)
239                        return
240               
241               
242                # The directory to put things for a tape backup (should already exist)
243                #backupdir = '/disks/glue1/oaiBackup/'
244                # TODO: uncomment above on live system
245                backupdir = data_dir + "/backups/"
246               
247                # the following dirs define where the specific documents should go
248                originals_dir = data_dir + "/oai/originals/"
249                discovery_dir = data_dir + "/discovery/"
250               
251                # Create/clear the 'in' directory pristine copy of the discovery records
252                fileUtils.setUpDir(originals_dir)
253                commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
254                logging.info("Executing : " + commandline)
255                status = os.system(commandline)
256
257                if status !=0:
258                    sys.exit("Failed at making pristine copy stage")
259               
260                # Create/clear the directory for the 'out' processed copy of the discovery records.
261                fileUtils.setUpDir(discovery_dir)
262                   
263                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
264                # - also replace any namespace declarations with a standard one which we know works in NDG
265                # NB, this copies files from the original dir to the discovery dir
266                logging.info(self.lineSeparator)
267                logging.info("Renaming files:")
268                for filename in os.listdir(originals_dir):
269                        if filename.endswith('.xml'):
270                                original_filename = originals_dir + filename
271                                ident=self.getID(original_filename)
272                               
273                                if self._NDG_dataProvider:
274                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
275                                else:
276                                                ident = ident.replace(":","-")
277                                                ident = ident.replace("/","-")
278                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
279                                                logging.info("original file = " + original_filename)
280                                                logging.info("newfile = " + new_filename)
281                               
282                                # now correct any namespace issues
283                                try:
284                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
285                                except:
286                                        sys.exit("ERROR: SchemaNameSpace failed on file %s" %original_filename)
287                                numfilesproc += 1
288                        else:
289                                logging.warning('File %s is not xml format. Not processed'  %(filename))
290               
291                logging.info(self.lineSeparator)
292               
293                # now set up the required XQueries
294                # - NB, extract the xquery libraries locally for easy reference
295                self._xq=ndgXqueries()
296                for libFile in self._xq.xqlib:
297                        fileUtils.createFile(libFile, self._xq.xqlib[libFile])
298               
299                # Process the resulting files and put the data into the postgres DB
300                # - firstly set up a db connection to use
301                self._dbConnection = None
302                self._getDBConnection()
303               
304                filenames = os.listdir(discovery_dir)
305                for filename in filenames:
306                        self.addFileToPostgresDB(discovery_dir + filename)
307               
308                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
309                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
310               
311                this_backupdir = backupdir_base + "_originals/"
312                fileUtils.makeBackUp(originals_dir, this_backupdir)
313               
314                #Clear out the original harvest records area and FINALMOLES
315                fileUtils.cleanDir(originals_dir)
316                fileUtils.cleanDir(discovery_dir)
317                # TODO: uncomment following line when live on system
318                #fileUtils.cleanDir(self._harvest_home)
319               
320                print self.lineSeparator
321                print "INFO: Number of files processed = %s" %numfilesproc
322                print "INFO: Number of files ingested = %s" %self._no_files_ingested
323                if status == 0:
324                    print "INFO: Procedure oai_document_ingester.py completed"
325                else:
326                    print "ERROR: Procedure oai_document_ingester.py FAILED with status %s" %status
327                print self.lineSeparator
328               
329       
330if __name__=="__main__":
331        opts, args = getopt.getopt(sys.argv[1:], '-vd')
332        if len(args) < 1:
333                oai_document_ingester()
334       
335        oai_document_ingester(args[0])
Note: See TracBrowser for help on using the repository browser.