source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5248

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5248
Revision 5248, 11.9 KB checked in by cbyrom, 10 years ago (diff)

Update ingest scripts for use with the OAIInfoEditor harvest
functionality - to allow config and jar file resources to be
retrieved even if not running in ingest package + allow different
harvest directory and format, compared with that of the config
file, to be specified.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17
18class AbstractDocumentIngester(object):
19        '''
20        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
21        - including running the various transforms and parsings to get all doc types and spatiotemporal
22        data in the correct form in the DB
23        '''
24        lineSeparator = "-----------------------------"
25                       
26        # The directory to put things for a tape backup (should already exist)
27        BACKUP_DIR = '/disks/glue1/oaiBackup/'
28
29        def _setupCmdLineOptions(self):
30                '''
31                Determine the logging level to use and configure this appropriately
32                @return args: any input arguments - excluding options
33                '''
34                # check for verbose option
35                try:
36                        opts, args = getopt.getopt(sys.argv[1:], "vd")
37                except getopt.GetoptError, err:
38                    # print help information and exit:
39                    print str(err) # will print something like "option -a not recognized"
40                    sys.exit(2)
41
42                if len(args) < 1:
43                        self.usage()
44                   
45                loggingLevel = logging.WARNING
46                for o, a in opts:
47                    if o == "-v":
48                        print " - Verbose mode ON"
49                        loggingLevel = logging.INFO
50                    elif o == "-d":
51                        print " - Debug mode ON"
52                        loggingLevel = logging.DEBUG
53               
54                # set up any keywords on the object
55                # NB, be careful to keep the instance variables the same name as the keywords!
56                for arg in args:
57                        bits = arg.split('=')
58                        if len(bits) == 2:
59                                if bits[0] == 'ingestFromDate':
60                                        self.setIngestFromDate(bits[1])
61                                elif bits[0] == 'interval':
62                                        self.setPollInterval(bits[1])
63                                else:
64                                        setattr(self, bits[0], bits[1])
65               
66                print self.lineSeparator
67                # NB, this is a slight fudge as cannot get the detailed logging to work
68                # without setting up a new logger here - which means we get two loggers
69                # outputing data. The initial call to logging needs to be tracked down
70                # and configured correctly, so this can be avoided...
71#               self.logger = logging.getLogger()
72#               self.logger.setLevel(loggingLevel)
73
74                # create console handler and set level to debug
75#               ch = logging.StreamHandler()
76#               ch.setLevel(loggingLevel)
77                # create formatter
78#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
79                # add formatter to ch
80#               ch.setFormatter(formatter)
81                # add ch to logger
82#               self.logger.addHandler(ch)
83                return args
84
85
86        def getID(self, filename):
87                '''
88                Gets the identifier out of an input metadata xml record.
89                @param filename - name of document file being processed
90                @return: ID - id to use to refer to the document
91                '''
92                logging.info("Retrieving identifier for metadata record " + filename)
93                xml=file(filename).read()
94                ID = idget(xml)
95                return ID
96       
97       
98        def addFileToPostgresDB(self, filename):
99                '''
100                Add a file to the postgres DB - extracting and storing all the required
101                data in the process
102                @param filename: full path of file to add to postgres DB
103                '''
104                logging.info("Adding file, " + filename + ", to postgres DB")
105               
106                # first of all create a PostgresRecord - this object represents all the data required
107                # for a DB entry
108                dao = None
109                try:
110                        discoveryID = self.getID(filename)
111                       
112                        record = PostgresRecord(filename, self._NDG_dataProvider, \
113                                                            self._datacentre_groups, self._datacentre_namespace, \
114                                                            discoveryID, self._xq, self._datacentre_format)
115                       
116                        print self._xq
117                        # Now create the data access object to interface to the DB
118                        dao = PostgresDAO(record, pgClient = self.pgc)
119               
120                        # Finally, write the new record
121                        if dao.createOrUpdateRecord():
122                                self._no_files_ingested += 1
123                except:
124                        logging.error("Exception thrown - detail: ")
125                        errors = sys.exc_info()
126                        logging.error(errors)
127                        self._error_messages += str(errors[1])
128                       
129                        if dao:
130                                logging.info("Removing record and its associated info from DB")
131                                logging.info("- to allow clean ingestion on rerun")
132                                try:
133                                        dao.deleteOriginalRecord()
134                                except:
135                                        logging.error("Problem encountered when removing record: ")
136                                        logging.error(sys.exc_info())
137                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
138
139                        self._no_problem_files += 1
140                        logging.info("Continue processing other files")
141       
142       
143        def getConfigDetails(self, datacentre):
144                '''
145                Get the harvested records directory and groups for this datacentre from the
146                datacentre specific config file.  The harvested records directory depends on the
147                datacentres OAI base url, the set and format. These have to be know up-front.
148                The groups denote which 'portal groups' they belong to - for limiting searches to
149                say NERC-only datacentres records.
150                Groups are added to the intermediate MOLES when it is created.
151                @param datacentre: datacentre to use when looking up config file
152                '''
153                # initialise the variables to retrieve from the config file
154                self._harvest_home = ""
155                self._datacentre_groups = ""
156                self._datacentre_format = ""
157                self._datacentre_namespace = ""
158                self._NDG_dataProvider = False
159
160                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
161                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
162                try:
163                        file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
164                except IOError:
165                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
166                        "specified (%s) is invalid\n" %datacentre)
167
168                for line in file.split('\n'):
169                        words = line.split()
170                        if len(words) == 0:
171                                continue
172                        elif words[0] == 'host_path':
173                                self._harvest_home = words[1].rstrip()
174                        elif words[0] == 'groups':
175                                self._datacentre_groups = words[1:]
176                        elif words[0] == 'format':
177                                self._datacentre_format = words[1]
178                        elif words[0] == 'namespace':
179                                self._datacentre_namespace = words[1]
180                        elif words[0] == 'NDG_dataProvider':
181                                self._NDG_dataProvider = True
182                       
183                if self._harvest_home == "":
184                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
185               
186                logging.info("harvested records are in " + self._harvest_home)
187               
188                if self._datacentre_groups == "":
189                    logging.info("No groups/keywords set for datacentre " + datacentre)
190                else:
191                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
192               
193                if self._datacentre_format == "":
194                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
195               
196                logging.info("format being harvested: " + self._datacentre_format)
197               
198                if self._datacentre_namespace == "":
199                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
200               
201                logging.info("datacentre namespace: " + self._datacentre_namespace)
202               
203                if self._NDG_dataProvider:
204                        logging.info("Datacentre classified as an NDG data provider")
205                else:
206                        logging.info("Datacentre is not classified as an NDG data provider")
207                logging.info(self.lineSeparator)
208
209
210        def _convertIngestFiles(self, originals_dir, discovery_dir):
211                '''
212                Processes/renames the files (changed 08/01/07 to get id from inside file)
213                 - also replace any namespace declarations with a standard one which we know works in NDG
214                 NB, this copies files from the original dir to the discovery dir
215                 @param originals_dir: directory to convert files from
216                 @param discovery_dir: directory in which converted files will end up
217                 @return numfilesproc: counter of number of files processed
218                '''
219                numfilesproc = 0
220                logging.info(self.lineSeparator)
221                logging.info("Renaming files:")
222                for filename in os.listdir(originals_dir):
223                        if not filename.endswith('.xml'):
224                                logging.warning('File %s is not xml format. Not processed'  %(filename))
225                                continue
226                       
227                        original_filename = originals_dir + filename
228                        try:
229                                ident=self.getID(original_filename)
230                        except Exception, detail:
231                                logging.error("Could not retrieve ID from file, %s" %filename)
232                                logging.error("Detail: %s" %detail)
233                                logging.info("Continue with next file")
234                                continue
235                       
236                        if self._NDG_dataProvider:
237                                new_filename = discovery_dir + ident.replace(":", "__")+".xml"
238                        else:
239                                ident = ident.replace(":", "-")
240                                ident = ident.replace("/", "-")
241                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
242                                logging.info("original file = " + original_filename)
243                                logging.info("newfile = " + new_filename)
244                       
245                        # now correct any namespace issues
246                        try:
247                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
248                        except Exception, detail:
249                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
250                                logging.error("Detail: %s" %detail)
251                                logging.info("Continue with next file")
252                                continue
253                        numfilesproc += 1
254               
255                logging.info("File renaming and converting completed")
256                logging.info(self.lineSeparator)
257                return numfilesproc
258
259               
260        def _getPostgresDBConnection(self):
261                '''
262                Get the default postgres DB connection - by reading in data from the db config file
263                '''
264                logging.debug("Setting up connection to postgres DB")
265                self.pgc = pgc(configFile = 'ingest.config')
266        logging.info("Postgres DB connection now set up")
267
268
269        def     _backupAndCleanData(self):
270                '''
271                Back up ingested data for specified data centre, then clean any of
272                the ingest dirs
273                '''
274                logging.info("Backing up ingested data")
275                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
276                  strftime("%y%m%d_%H%M") + "_originals/"
277               
278                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
279                logging.info("Data backed up - now clearing ingest directories")
280                #Clear out the original harvest records area and discovery dir
281                FileUtilities.cleanDir(self.originals_dir)
282                FileUtilities.cleanDir(self.discovery_dir)
283                logging.info("Ingest directories cleared")
284
285
286        def     _setupDataCentreDirs(self):
287                '''
288                Set up directories appropriate for the current data centre
289                '''
290                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
291                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
292               
293                # the following dirs define where the specific documents should go
294                self.originals_dir = data_dir + "/oai/originals/"
295                self.discovery_dir = data_dir + "/discovery/"
296               
297                # Create/clear the 'in' and 'out' directories
298                FileUtilities.setUpDir(self.originals_dir)
299                FileUtilities.setUpDir(self.discovery_dir)
300               
301                logging.info("Ingest directories for data centre set up")
302
303
304        def _convertAndIngestFiles(self, originals_dir, discovery_dir):
305                '''
306                Convert files from originals dir to discovery one, then
307                ingest, backup and clean
308                @param originals_dir: directory to ingest docs from
309                @param discovery_dir: directory to use to process ingested docs
310                '''
311                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
312               
313                filenames = os.listdir(self.discovery_dir)
314                for filename in filenames:
315                        fullPath = self.discovery_dir + filename
316                        if os.path.isfile(fullPath):
317                                self.addFileToPostgresDB(fullPath)
318               
319                self._backupAndCleanData()
320                return numfilesproc
321
322
323
324        def _setupXQueries(self):
325                '''
326                Set up the required XQueries
327                - NB, extract the xquery libraries locally for easy reference
328                '''
329                self._xq = ndgResources()
330                for libFile in self._xq.xqlib:
331                        # NB, we don't want the full path to the files - just the filename
332                        fileName = libFile.split('/')[-1]
333                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.