source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5243

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5243
Revision 5243, 12.1 KB checked in by cbyrom, 11 years ago (diff)

Adjust logging and output of error in ingest scripts.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17
18class AbstractDocumentIngester(object):
19        '''
20        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
21        - including running the various transforms and parsings to get all doc types and spatiotemporal
22        data in the correct form in the DB
23        '''
24        lineSeparator = "-----------------------------"
25                       
26        # The directory to put things for a tape backup (should already exist)
27        BACKUP_DIR = '/disks/glue1/oaiBackup/'
28
29        def _setupCmdLineOptions(self):
30                '''
31                Determine the logging level to use and configure this appropriately
32                @return args: any input arguments - excluding options
33                '''
34                # check for verbose option
35                try:
36                        opts, args = getopt.getopt(sys.argv[1:], "vd")
37                except getopt.GetoptError, err:
38                    # print help information and exit:
39                    print str(err) # will print something like "option -a not recognized"
40                    sys.exit(2)
41
42                if len(args) < 1:
43                        self.usage()
44                   
45                loggingLevel = logging.WARNING
46                for o, a in opts:
47                    if o == "-v":
48                        print " - Verbose mode ON"
49                        loggingLevel = logging.INFO
50                    elif o == "-d":
51                        print " - Debug mode ON"
52                        loggingLevel = logging.DEBUG
53               
54                # set up any keywords on the object
55                # NB, be careful to keep the instance variables the same name as the keywords!
56                for arg in args:
57                        bits = arg.split('=')
58                        if len(bits) == 2:
59                                if bits[0] == 'ingestFromDate':
60                                        self.setIngestFromDate(bits[1])
61                                elif bits[0] == 'interval':
62                                        self.setPollInterval(bits[1])
63                                else:
64                                        setattr(self, bits[0], bits[1])
65               
66                print self.lineSeparator
67                # NB, this is a slight fudge as cannot get the detailed logging to work
68                # without setting up a new logger here - which means we get two loggers
69                # outputing data. The initial call to logging needs to be tracked down
70                # and configured correctly, so this can be avoided...
71#               self.logger = logging.getLogger()
72#               self.logger.setLevel(loggingLevel)
73
74                # create console handler and set level to debug
75#               ch = logging.StreamHandler()
76#               ch.setLevel(loggingLevel)
77                # create formatter
78#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
79                # add formatter to ch
80#               ch.setFormatter(formatter)
81                # add ch to logger
82#               self.logger.addHandler(ch)
83                return args
84
85
86        def getID(self, filename):
87                '''
88                Gets the identifier out of an input metadata xml record.
89                @param filename - name of document file being processed
90                @return: ID - id to use to refer to the document
91                '''
92                logging.info("Retrieving identifier for metadata record " + filename)
93                xml=file(filename).read()
94                ID = idget(xml)
95                return ID
96       
97       
98        def addFileToPostgresDB(self, filename):
99                '''
100                Add a file to the postgres DB - extracting and storing all the required
101                data in the process
102                @param filename: full path of file to add to postgres DB
103                '''
104                logging.info("Adding file, " + filename + ", to postgres DB")
105               
106                # first of all create a PostgresRecord - this object represents all the data required
107                # for a DB entry
108                dao = None
109                try:
110                        discoveryID = self.getID(filename)
111                       
112                        record = PostgresRecord(filename, self._NDG_dataProvider, \
113                                                            self._datacentre_groups, self._datacentre_namespace, \
114                                                            discoveryID, self._xq, self._datacentre_format)
115                       
116                        print self._xq
117                        # Now create the data access object to interface to the DB
118                        dao = PostgresDAO(record, pgClient = self.pgc)
119               
120                        # Finally, write the new record
121                        if dao.createOrUpdateRecord():
122                                self._no_files_ingested += 1
123                except:
124                        logging.error("Exception thrown - detail: ")
125                        errors = sys.exc_info()
126                        logging.error(errors)
127                        self._error_messages += str(errors)
128                       
129                        if dao:
130                                logging.info("Removing record and its associated info from DB")
131                                logging.info("- to allow clean ingestion on rerun")
132                                try:
133                                        dao.deleteOriginalRecord()
134                                except:
135                                        logging.error("Problem encountered when removing record: ")
136                                        logging.error(sys.exc_info())
137                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
138
139                        self._no_problem_files += 1
140                        logging.info("Continue processing other files")
141       
142       
143        def getConfigDetails(self, datacentre):
144                '''
145                Get the harvested records directory and groups for this datacentre from the
146                datacentre specific config file.  The harvested records directory depends on the
147                datacentres OAI base url, the set and format. These have to be know up-front.
148                The groups denote which 'portal groups' they belong to - for limiting searches to
149                say NERC-only datacentres records.
150                Groups are added to the intermediate MOLES when it is created.
151                @param datacentre: datacentre to use when looking up config file
152                '''
153                # initialise the variables to retrieve from the config file
154                self._harvest_home = ""
155                self._datacentre_groups = ""
156                self._datacentre_format = ""
157                self._datacentre_namespace = ""
158                self._NDG_dataProvider = False
159
160                datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
161                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
162               
163                # Check this file exists; if not, assume an invalid datacentre has been specified
164                if not os.path.isfile(datacentre_config_filename):
165                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
166                        "specified (%s) is invalid\n" %datacentre)
167                   
168                datacentre_config_file = open(datacentre_config_filename, "r")
169               
170                for line in datacentre_config_file.readlines():
171                    words  = string.split(line)
172                    if len(words) == 0:
173                        continue
174                    if words[0] == 'host_path':
175                        self._harvest_home = string.rstrip(words[1])
176                    if words[0] == 'groups':
177                        self._datacentre_groups = words[1:]
178                    if words[0] == 'format':
179                        self._datacentre_format = words[1]
180                    if words[0] == 'namespace':
181                        self._datacentre_namespace = words[1]
182                    if words[0] == 'NDG_dataProvider':
183                        self._NDG_dataProvider = True
184               
185                datacentre_config_file.close()
186               
187                if self._harvest_home == "":
188                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
189               
190                logging.info("harvested records are in " + self._harvest_home)
191               
192                if self._datacentre_groups == "":
193                    logging.info("No groups/keywords set for datacentre " + datacentre)
194                else:
195                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
196               
197                if self._datacentre_format == "":
198                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
199               
200                logging.info("format being harvested: " + self._datacentre_format)
201               
202                if self._datacentre_namespace == "":
203                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
204               
205                logging.info("datacentre namespace: " + self._datacentre_namespace)
206               
207                if self._NDG_dataProvider:
208                        logging.info("Datacentre classified as an NDG data provider")
209                else:
210                        logging.info("Datacentre is not classified as an NDG data provider")
211                logging.info(self.lineSeparator)
212
213
214        def _convertIngestFiles(self, originals_dir, discovery_dir):
215                '''
216                Processes/renames the files (changed 08/01/07 to get id from inside file)
217                 - also replace any namespace declarations with a standard one which we know works in NDG
218                 NB, this copies files from the original dir to the discovery dir
219                 @param originals_dir: directory to convert files from
220                 @param discovery_dir: directory in which converted files will end up
221                 @return numfilesproc: counter of number of files processed
222                '''
223                numfilesproc = 0
224                logging.info(self.lineSeparator)
225                logging.info("Renaming files:")
226                for filename in os.listdir(originals_dir):
227                        if not filename.endswith('.xml'):
228                                logging.warning('File %s is not xml format. Not processed'  %(filename))
229                                continue
230                       
231                        original_filename = originals_dir + filename
232                        try:
233                                ident=self.getID(original_filename)
234                        except Exception, detail:
235                                logging.error("Could not retrieve ID from file, %s" %filename)
236                                logging.error("Detail: %s" %detail)
237                                logging.info("Continue with next file")
238                                continue
239                       
240                        if self._NDG_dataProvider:
241                                new_filename = discovery_dir + ident.replace(":", "__")+".xml"
242                        else:
243                                ident = ident.replace(":", "-")
244                                ident = ident.replace("/", "-")
245                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
246                                logging.info("original file = " + original_filename)
247                                logging.info("newfile = " + new_filename)
248                       
249                        # now correct any namespace issues
250                        try:
251                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
252                        except Exception, detail:
253                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
254                                logging.error("Detail: %s" %detail)
255                                logging.info("Continue with next file")
256                                continue
257                        numfilesproc += 1
258               
259                logging.info("File renaming and converting completed")
260                logging.info(self.lineSeparator)
261                return numfilesproc
262
263               
264        def _getPostgresDBConnection(self):
265                '''
266                Get the default postgres DB connection - by reading in data from the db config file
267                '''
268                logging.debug("Setting up connection to postgres DB")
269                self.pgc = pgc(configFile = 'ingest.config')
270        logging.info("Postgres DB connection now set up")
271
272
273        def     _backupAndCleanData(self):
274                '''
275                Back up ingested data for specified data centre, then clean any of
276                the ingest dirs
277                '''
278                logging.info("Backing up ingested data")
279                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
280                  strftime("%y%m%d_%H%M") + "_originals/"
281               
282                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
283                logging.info("Data backed up - now clearing ingest directories")
284                #Clear out the original harvest records area and discovery dir
285                FileUtilities.cleanDir(self.originals_dir)
286                FileUtilities.cleanDir(self.discovery_dir)
287                logging.info("Ingest directories cleared")
288
289
290        def     _setupDataCentreDirs(self):
291                '''
292                Set up directories appropriate for the current data centre
293                '''
294                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
295                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
296               
297                # the following dirs define where the specific documents should go
298                self.originals_dir = data_dir + "/oai/originals/"
299                self.discovery_dir = data_dir + "/discovery/"
300               
301                # Create/clear the 'in' and 'out' directories
302                FileUtilities.setUpDir(self.originals_dir)
303                FileUtilities.setUpDir(self.discovery_dir)
304               
305                logging.info("Ingest directories for data centre set up")
306
307
308        def _convertAndIngestFiles(self, originals_dir, discovery_dir):
309                '''
310                Convert files from originals dir to discovery one, then
311                ingest, backup and clean
312                @param originals_dir: directory to ingest docs from
313                @param discovery_dir: directory to use to process ingested docs
314                '''
315                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
316               
317                filenames = os.listdir(self.discovery_dir)
318                for filename in filenames:
319                        fullPath = self.discovery_dir + filename
320                        if os.path.isfile(fullPath):
321                                self.addFileToPostgresDB(fullPath)
322               
323                self._backupAndCleanData()
324                return numfilesproc
325
326
327
328        def _setupXQueries(self):
329                '''
330                Set up the required XQueries
331                - NB, extract the xquery libraries locally for easy reference
332                '''
333                self._xq = ndgResources()
334                for libFile in self._xq.xqlib:
335                        # NB, we don't want the full path to the files - just the filename
336                        fileName = libFile.split('/')[-1]
337                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.