source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5252

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5252
Revision 5252, 11.8 KB checked in by cbyrom, 10 years ago (diff)

Simplify error handling, improve output logging + standardise use of
upper case doc formats + switch off MDIP again since this mostly
breaks things.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17
18class AbstractDocumentIngester(object):
19        '''
20        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
21        - including running the various transforms and parsings to get all doc types and spatiotemporal
22        data in the correct form in the DB
23        '''
24        lineSeparator = "-----------------------------"
25                       
26        # The directory to put things for a tape backup (should already exist)
27        BACKUP_DIR = '/disks/glue1/oaiBackup/'
28
29        def _setupCmdLineOptions(self):
30                '''
31                Determine the logging level to use and configure this appropriately
32                @return args: any input arguments - excluding options
33                '''
34                # check for verbose option
35                try:
36                        opts, args = getopt.getopt(sys.argv[1:], "vd")
37                except getopt.GetoptError, err:
38                    # print help information and exit:
39                    print str(err) # will print something like "option -a not recognized"
40                    sys.exit(2)
41
42                if len(args) < 1:
43                        self.usage()
44                   
45                loggingLevel = logging.WARNING
46                for o, a in opts:
47                    if o == "-v":
48                        print " - Verbose mode ON"
49                        loggingLevel = logging.INFO
50                    elif o == "-d":
51                        print " - Debug mode ON"
52                        loggingLevel = logging.DEBUG
53               
54                # set up any keywords on the object
55                # NB, be careful to keep the instance variables the same name as the keywords!
56                for arg in args:
57                        bits = arg.split('=')
58                        if len(bits) == 2:
59                                if bits[0] == 'ingestFromDate':
60                                        self.setIngestFromDate(bits[1])
61                                elif bits[0] == 'interval':
62                                        self.setPollInterval(bits[1])
63                                else:
64                                        setattr(self, bits[0], bits[1])
65               
66                print self.lineSeparator
67                # NB, this is a slight fudge as cannot get the detailed logging to work
68                # without setting up a new logger here - which means we get two loggers
69                # outputing data. The initial call to logging needs to be tracked down
70                # and configured correctly, so this can be avoided...
71#               self.logger = logging.getLogger()
72#               self.logger.setLevel(loggingLevel)
73
74                # create console handler and set level to debug
75#               ch = logging.StreamHandler()
76#               ch.setLevel(loggingLevel)
77                # create formatter
78#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
79                # add formatter to ch
80#               ch.setFormatter(formatter)
81                # add ch to logger
82#               self.logger.addHandler(ch)
83                return args
84
85
86        def getID(self, filename):
87                '''
88                Gets the identifier out of an input metadata xml record.
89                @param filename - name of document file being processed
90                @return: ID - id to use to refer to the document
91                '''
92                logging.info("Retrieving identifier for metadata record " + filename)
93                xml=file(filename).read()
94                ID = idget(xml)
95                return ID
96       
97       
98        def addFileToPostgresDB(self, filename):
99                '''
100                Add a file to the postgres DB - extracting and storing all the required
101                data in the process
102                @param filename: full path of file to add to postgres DB
103                '''
104                logging.info("Adding file, " + filename + ", to postgres DB")
105               
106                # first of all create a PostgresRecord - this object represents all the data required
107                # for a DB entry
108                dao = None
109                try:
110                        discoveryID = self.getID(filename)
111                       
112                        record = PostgresRecord(filename, self._NDG_dataProvider, \
113                                                            self._datacentre_groups, self._datacentre_namespace, \
114                                                            discoveryID, self._xq, self._datacentre_format)
115                       
116                        print self._xq
117                        # Now create the data access object to interface to the DB
118                        dao = PostgresDAO(record, pgClient = self.pgc)
119               
120                        # Finally, write the new record
121                        if dao.createOrUpdateRecord():
122                                self._no_files_ingested += 1
123                except:
124                        logging.error("Exception thrown - detail: ")
125                        errors = sys.exc_info()
126                        logging.error(errors)
127                        self._error_messages += "%s\n" %str(errors[1])
128                       
129                        if dao:
130                                logging.info("Removing record and its associated info from DB")
131                                logging.info("- to allow clean ingestion on rerun")
132                                try:
133                                        dao.deleteOriginalRecord()
134                                except:
135                                        logging.error("Problem encountered when removing record: ")
136                                        logging.error(sys.exc_info())
137                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
138
139                        self._no_problem_files += 1
140                        logging.info("Continue processing other files")
141       
142       
143        def getConfigDetails(self, datacentre):
144                '''
145                Get the harvested records directory and groups for this datacentre from the
146                datacentre specific config file.  The harvested records directory depends on the
147                datacentres OAI base url, the set and format. These have to be know up-front.
148                The groups denote which 'portal groups' they belong to - for limiting searches to
149                say NERC-only datacentres records.
150                Groups are added to the intermediate MOLES when it is created.
151                @param datacentre: datacentre to use when looking up config file
152                '''
153                # initialise the variables to retrieve from the config file
154                self._harvest_home = ""
155                self._datacentre_groups = ""
156                self._datacentre_format = ""
157                self._datacentre_namespace = ""
158                self._NDG_dataProvider = False
159
160                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
161                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
162                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
163
164                for line in file.split('\n'):
165                        words = line.split()
166                        if len(words) == 0:
167                                continue
168                        elif words[0] == 'host_path':
169                                self._harvest_home = words[1].rstrip()
170                        elif words[0] == 'groups':
171                                self._datacentre_groups = words[1:]
172                        elif words[0] == 'format':
173                                self._datacentre_format = words[1]
174                        elif words[0] == 'namespace':
175                                self._datacentre_namespace = words[1]
176                        elif words[0] == 'NDG_dataProvider':
177                                self._NDG_dataProvider = True
178                       
179                if self._harvest_home == "":
180                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
181               
182                logging.info("harvested records are in " + self._harvest_home)
183               
184                if self._datacentre_groups == "":
185                    logging.info("No groups/keywords set for datacentre " + datacentre)
186                else:
187                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
188               
189                if self._datacentre_format == "":
190                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
191               
192                logging.info("format being harvested: " + self._datacentre_format)
193               
194                if self._datacentre_namespace == "":
195                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
196               
197                logging.info("datacentre namespace: " + self._datacentre_namespace)
198               
199                if self._NDG_dataProvider:
200                        logging.info("Datacentre classified as an NDG data provider")
201                else:
202                        logging.info("Datacentre is not classified as an NDG data provider")
203                logging.info(self.lineSeparator)
204
205
206        def _convertIngestFiles(self, originals_dir, discovery_dir):
207                '''
208                Processes/renames the files (changed 08/01/07 to get id from inside file)
209                 - also replace any namespace declarations with a standard one which we know works in NDG
210                 NB, this copies files from the original dir to the discovery dir
211                 @param originals_dir: directory to convert files from
212                 @param discovery_dir: directory in which converted files will end up
213                 @return numfilesproc: counter of number of files processed
214                '''
215                numfilesproc = 0
216                logging.info(self.lineSeparator)
217                logging.info("Renaming files:")
218                for filename in os.listdir(originals_dir):
219                        if not filename.endswith('.xml'):
220                                logging.warning('File %s is not xml format. Not processed'  %(filename))
221                                continue
222                       
223                        original_filename = originals_dir + filename
224                        try:
225                                ident=self.getID(original_filename)
226                        except Exception, detail:
227                                logging.error("Could not retrieve ID from file, %s" %filename)
228                                logging.error("Detail: %s" %detail)
229                                logging.info("Continue with next file")
230                                continue
231                       
232                        if self._NDG_dataProvider:
233                                new_filename = discovery_dir + ident.replace(":", "__")+".xml"
234                        else:
235                                ident = ident.replace(":", "-")
236                                ident = ident.replace("/", "-")
237                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
238                                logging.info("original file = " + original_filename)
239                                logging.info("newfile = " + new_filename)
240                       
241                        # now correct any namespace issues
242                        try:
243                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
244                        except Exception, detail:
245                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
246                                logging.error("Detail: %s" %detail)
247                                logging.info("Continue with next file")
248                                continue
249                        numfilesproc += 1
250               
251                logging.info("File renaming and converting completed")
252                logging.info(self.lineSeparator)
253                return numfilesproc
254
255               
256        def _getPostgresDBConnection(self):
257                '''
258                Get the default postgres DB connection - by reading in data from the db config file
259                '''
260                logging.debug("Setting up connection to postgres DB")
261                self.pgc = pgc(configFile = 'ingest.config')
262        logging.info("Postgres DB connection now set up")
263
264
265        def     _backupAndCleanData(self):
266                '''
267                Back up ingested data for specified data centre, then clean any of
268                the ingest dirs
269                '''
270                logging.info("Backing up ingested data")
271                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
272                  strftime("%y%m%d_%H%M") + "_originals/"
273               
274                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
275                logging.info("Data backed up - now clearing ingest directories")
276                #Clear out the original harvest records area and discovery dir
277                FileUtilities.cleanDir(self.originals_dir)
278                FileUtilities.cleanDir(self.discovery_dir)
279                logging.info("Ingest directories cleared")
280
281
282        def     _setupDataCentreDirs(self):
283                '''
284                Set up directories appropriate for the current data centre
285                '''
286                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
287                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
288               
289                # the following dirs define where the specific documents should go
290                self.originals_dir = data_dir + "/oai/originals/"
291                self.discovery_dir = data_dir + "/discovery/"
292               
293                # Create/clear the 'in' and 'out' directories
294                FileUtilities.setUpDir(self.originals_dir)
295                FileUtilities.setUpDir(self.discovery_dir)
296               
297                logging.info("Ingest directories for data centre set up")
298
299
300        def _convertAndIngestFiles(self, originals_dir, discovery_dir):
301                '''
302                Convert files from originals dir to discovery one, then
303                ingest, backup and clean
304                @param originals_dir: directory to ingest docs from
305                @param discovery_dir: directory to use to process ingested docs
306                '''
307                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
308               
309                filenames = os.listdir(self.discovery_dir)
310                for filename in filenames:
311                        fullPath = self.discovery_dir + filename
312                        if os.path.isfile(fullPath):
313                                self.addFileToPostgresDB(fullPath)
314               
315                self._backupAndCleanData()
316                return numfilesproc
317
318
319
320        def _setupXQueries(self):
321                '''
322                Set up the required XQueries
323                - NB, extract the xquery libraries locally for easy reference
324                '''
325                self._xq = ndgResources()
326                for libFile in self._xq.xqlib:
327                        # NB, we don't want the full path to the files - just the filename
328                        fileName = libFile.split('/')[-1]
329                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.