source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5297

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5297
Revision 5297, 11.8 KB checked in by cbyrom, 11 years ago (diff)

Add additional instructions for setting up ingester + improve the
setup script to include more dependencies.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17
18class AbstractDocumentIngester(object):
19        '''
20        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
21        - including running the various transforms and parsings to get all doc types and spatiotemporal
22        data in the correct form in the DB
23        '''
24        lineSeparator = "-----------------------------"
25                       
26        # The directory to put things for a tape backup (should already exist)
27        BACKUP_DIR = '/disks/glue1/oaiBackup/'
28
29        def _setupCmdLineOptions(self):
30                '''
31                Determine the logging level to use and configure this appropriately
32                @return args: any input arguments - excluding options
33                '''
34                # check for verbose option
35                try:
36                        opts, args = getopt.getopt(sys.argv[1:], "vd")
37                except getopt.GetoptError, err:
38                    # print help information and exit:
39                    print str(err) # will print something like "option -a not recognized"
40                    sys.exit(2)
41
42                if len(args) < 1:
43                        self.usage()
44                   
45                loggingLevel = logging.WARNING
46                for o, a in opts:
47                    if o == "-v":
48                        print " - Verbose mode ON"
49                        loggingLevel = logging.INFO
50                    elif o == "-d":
51                        print " - Debug mode ON"
52                        loggingLevel = logging.DEBUG
53               
54                # set up any keywords on the object
55                # NB, be careful to keep the instance variables the same name as the keywords!
56                for arg in args:
57                        bits = arg.split('=')
58                        if len(bits) == 2:
59                                if bits[0] == 'ingestFromDate':
60                                        self.setIngestFromDate(bits[1])
61                                elif bits[0] == 'interval':
62                                        self.setPollInterval(bits[1])
63                                else:
64                                        setattr(self, bits[0], bits[1])
65               
66                print self.lineSeparator
67                # NB, this is a slight fudge as cannot get the detailed logging to work
68                # without setting up a new logger here - which means we get two loggers
69                # outputing data. The initial call to logging needs to be tracked down
70                # and configured correctly, so this can be avoided...
71#               self.logger = logging.getLogger()
72#               self.logger.setLevel(loggingLevel)
73
74                # create console handler and set level to debug
75#               ch = logging.StreamHandler()
76#               ch.setLevel(loggingLevel)
77                # create formatter
78#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
79                # add formatter to ch
80#               ch.setFormatter(formatter)
81                # add ch to logger
82#               self.logger.addHandler(ch)
83                return args
84
85
86        def getID(self, filename):
87                '''
88                Gets the identifier out of an input metadata xml record.
89                @param filename - name of document file being processed
90                @return: ID - id to use to refer to the document
91                '''
92                logging.info("Retrieving identifier for metadata record " + filename)
93                xml=file(filename).read()
94                ID = idget(xml)
95                return ID
96       
97       
98        def addFileToPostgresDB(self, filename):
99                '''
100                Add a file to the postgres DB - extracting and storing all the required
101                data in the process
102                @param filename: full path of file to add to postgres DB
103                '''
104                logging.info("Adding file, " + filename + ", to postgres DB")
105               
106                # first of all create a PostgresRecord - this object represents all the data required
107                # for a DB entry
108                dao = None
109                try:
110                        discoveryID = self.getID(filename)
111                       
112                        record = PostgresRecord(filename, self._NDG_dataProvider, \
113                                                            self._datacentre_groups, self._datacentre_namespace, \
114                                                            discoveryID, self._xq, self._datacentre_format)
115                       
116                        print self._xq
117                        # Now create the data access object to interface to the DB
118                        dao = PostgresDAO(record, pgClient = self.pgc)
119               
120                        # Finally, write the new record
121                        if dao.createOrUpdateRecord():
122                                self._no_files_ingested += 1
123                except:
124                        logging.error("Exception thrown - detail: ")
125                        errors = sys.exc_info()
126                        logging.error(errors)
127                        self._error_messages += "%s\n" %str(errors[1])
128                       
129                        if dao:
130                                logging.info("Removing record and its associated info from DB")
131                                logging.info("- to allow clean ingestion on rerun")
132                                try:
133                                        dao.deleteOriginalRecord()
134                                except:
135                                        logging.error("Problem encountered when removing record: ")
136                                        logging.error(sys.exc_info())
137                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
138
139                        self._no_problem_files += 1
140                        logging.info("Continue processing other files")
141       
142       
143        def getConfigDetails(self, datacentre):
144                '''
145                Get the harvested records directory and groups for this datacentre from the
146                datacentre specific config file.  The harvested records directory depends on the
147                datacentres OAI base url, the set and format. These have to be know up-front.
148                The groups denote which 'portal groups' they belong to - for limiting searches to
149                say NERC-only datacentres records.
150                Groups are added to the intermediate MOLES when it is created.
151                @param datacentre: datacentre to use when looking up config file
152                '''
153                # initialise the variables to retrieve from the config file
154                self._harvest_home = ""
155                self._datacentre_groups = ""
156                self._datacentre_format = ""
157                self._datacentre_namespace = ""
158                self._NDG_dataProvider = False
159
160                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
161                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
162               
163                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
164
165                for line in file.split('\n'):
166                        words = line.split()
167                        if len(words) == 0:
168                                continue
169                        elif words[0] == 'host_path':
170                                self._harvest_home = words[1].rstrip()
171                        elif words[0] == 'groups':
172                                self._datacentre_groups = words[1:]
173                        elif words[0] == 'format':
174                                self._datacentre_format = words[1]
175                        elif words[0] == 'namespace':
176                                self._datacentre_namespace = words[1]
177                        elif words[0] == 'NDG_dataProvider':
178                                self._NDG_dataProvider = True
179                       
180                if self._harvest_home == "":
181                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
182               
183                logging.info("harvested records are in " + self._harvest_home)
184               
185                if self._datacentre_groups == "":
186                    logging.info("No groups/keywords set for datacentre " + datacentre)
187                else:
188                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
189               
190                if self._datacentre_format == "":
191                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
192               
193                logging.info("format being harvested: " + self._datacentre_format)
194               
195                if self._datacentre_namespace == "":
196                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
197               
198                logging.info("datacentre namespace: " + self._datacentre_namespace)
199               
200                if self._NDG_dataProvider:
201                        logging.info("Datacentre classified as an NDG data provider")
202                else:
203                        logging.info("Datacentre is not classified as an NDG data provider")
204                logging.info(self.lineSeparator)
205
206
207        def _convertIngestFiles(self, originals_dir, discovery_dir):
208                '''
209                Processes/renames the files (changed 08/01/07 to get id from inside file)
210                 - also replace any namespace declarations with a standard one which we know works in NDG
211                 NB, this copies files from the original dir to the discovery dir
212                 @param originals_dir: directory to convert files from
213                 @param discovery_dir: directory in which converted files will end up
214                 @return numfilesproc: counter of number of files processed
215                '''
216                numfilesproc = 0
217                logging.info(self.lineSeparator)
218                logging.info("Renaming files:")
219                for filename in os.listdir(originals_dir):
220                        if not filename.endswith('.xml'):
221                                logging.warning('File %s is not xml format. Not processed'  %(filename))
222                                continue
223                       
224                        original_filename = originals_dir + filename
225                        try:
226                                ident=self.getID(original_filename)
227                        except Exception, detail:
228                                logging.error("Could not retrieve ID from file, %s" %filename)
229                                logging.error("Detail: %s" %detail)
230                                logging.info("Continue with next file")
231                                continue
232                       
233                        if self._NDG_dataProvider:
234                                new_filename = discovery_dir + ident.replace(":", "__")+".xml"
235                        else:
236                                ident = ident.replace(":", "-")
237                                ident = ident.replace("/", "-")
238                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
239                                logging.info("original file = " + original_filename)
240                                logging.info("newfile = " + new_filename)
241                       
242                        # now correct any namespace issues
243                        try:
244                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
245                        except Exception, detail:
246                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
247                                logging.error("Detail: %s" %detail)
248                                logging.info("Continue with next file")
249                                continue
250                        numfilesproc += 1
251               
252                logging.info("File renaming and converting completed")
253                logging.info(self.lineSeparator)
254                return numfilesproc
255
256               
257        def _getPostgresDBConnection(self):
258                '''
259                Get the default postgres DB connection - by reading in data from the db config file
260                '''
261                logging.debug("Setting up connection to postgres DB")
262                self.pgc = pgc(configFile = 'ingest.config')
263        logging.info("Postgres DB connection now set up")
264
265
266        def     _backupAndCleanData(self):
267                '''
268                Back up ingested data for specified data centre, then clean any of
269                the ingest dirs
270                '''
271                logging.info("Backing up ingested data")
272                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
273                  strftime("%y%m%d_%H%M") + "_originals/"
274               
275                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
276                logging.info("Data backed up - now clearing ingest directories")
277                #Clear out the original harvest records area and discovery dir
278                FileUtilities.cleanDir(self.originals_dir)
279                FileUtilities.cleanDir(self.discovery_dir)
280                logging.info("Ingest directories cleared")
281
282
283        def     _setupDataCentreDirs(self):
284                '''
285                Set up directories appropriate for the current data centre
286                '''
287                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
288                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
289               
290                # the following dirs define where the specific documents should go
291                self.originals_dir = data_dir + "/oai/originals/"
292                self.discovery_dir = data_dir + "/discovery/"
293               
294                # Create/clear the 'in' and 'out' directories
295                FileUtilities.setUpDir(self.originals_dir)
296                FileUtilities.setUpDir(self.discovery_dir)
297               
298                logging.info("Ingest directories for data centre set up")
299
300
301        def _convertAndIngestFiles(self, originals_dir, discovery_dir):
302                '''
303                Convert files from originals dir to discovery one, then
304                ingest, backup and clean
305                @param originals_dir: directory to ingest docs from
306                @param discovery_dir: directory to use to process ingested docs
307                '''
308                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
309               
310                filenames = os.listdir(self.discovery_dir)
311                for filename in filenames:
312                        fullPath = self.discovery_dir + filename
313                        if os.path.isfile(fullPath):
314                                self.addFileToPostgresDB(fullPath)
315               
316                self._backupAndCleanData()
317                return numfilesproc
318
319
320
321        def _setupXQueries(self):
322                '''
323                Set up the required XQueries
324                - NB, extract the xquery libraries locally for easy reference
325                '''
326                self._xq = ndgResources()
327                for libFile in self._xq.xqlib:
328                        # NB, we don't want the full path to the files - just the filename
329                        fileName = libFile.split('/')[-1]
330                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.