source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 4854

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@4854
Revision 4854, 12.0 KB checked in by cbyrom, 11 years ago (diff)

Add new ingest script - to allow ingest of DIF docs from eXist hosted
atom feed. NB, this required restructure of original OAI harvester
to allow re-use of shared code - by abstracting this out into new class,
absstractdocumentingester.

Add new documentation and tidy up codebase removing dependencies where possible to simplify things.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgXqueries import ndgXqueries
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17
18class AbstractDocumentIngester(object):
19        '''
20        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
21        - including running the various transforms and parsings to get all doc types and spatiotemporal
22        data in the correct form in the DB
23        '''
24        lineSeparator = "-----------------------------"
25                       
26        # The directory to put things for a tape backup (should already exist)
27        BACKUP_DIR = '/disks/glue1/oaiBackup/'
28
29        def _setupCmdLineOptions(self):
30                '''
31                Determine the logging level to use and configure this appropriately
32                @return args: any input arguments - excluding options
33                '''
34                # check for verbose option
35                try:
36                        opts, args = getopt.getopt(sys.argv[1:], "vd")
37                except getopt.GetoptError, err:
38                    # print help information and exit:
39                    print str(err) # will print something like "option -a not recognized"
40                    sys.exit(2)
41
42                if len(args) < 1:
43                        self.usage()
44                   
45                loggingLevel = logging.WARNING
46                for o, a in opts:
47                    if o == "-v":
48                        print " - Verbose mode ON"
49                        loggingLevel = logging.INFO
50                    elif o == "-d":
51                        print " - Debug mode ON"
52                        loggingLevel = logging.DEBUG
53               
54                # set up any keywords on the object
55                # NB, be careful to keep the instance variables the same name as the keywords!
56                for arg in args:
57                        bits = arg.split('=')
58                        if len(bits) == 2:
59                                if bits[0] == 'ingestFromDate':
60                                        self.setIngestFromDate(bits[1])
61                                elif bits[0] == 'interval':
62                                        self.setPollInterval(bits[1])
63                                else:
64                                        setattr(self, bits[0], bits[1])
65               
66                print self.lineSeparator
67                # NB, this is a slight fudge as cannot get the detailed logging to work
68                # without setting up a new logger here - which means we get two loggers
69                # outputing data. The initial call to logging needs to be tracked down
70                # and configured correctly, so this can be avoided...
71                self.logger = logging.getLogger()
72                self.logger.setLevel(loggingLevel)
73
74                # create console handler and set level to debug
75                ch = logging.StreamHandler()
76                ch.setLevel(loggingLevel)
77                # create formatter
78                formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
79                # add formatter to ch
80                ch.setFormatter(formatter)
81                # add ch to logger
82                self.logger.addHandler(ch)
83                return args
84
85
86        def getID(self, filename):
87                '''
88                Gets the identifier out of an input metadata xml record.
89                @param filename - name of document file being processed
90                @return: ID - id to use to refer to the document
91                '''
92                logging.info("Retrieving identifier for metadata record " + filename)
93                xml=file(filename).read()
94                ID = idget(xml)
95                return ID
96       
97       
98        def addFileToPostgresDB(self, filename):
99                '''
100                Add a file to the postgres DB - extracting and storing all the required
101                data in the process
102                @param filename: full path of file to add to postgres DB
103                '''
104                logging.info("Adding file, " + filename + ", to postgres DB")
105               
106                # first of all create a PostgresRecord - this object represents all the data required
107                # for a DB entry
108                dao = None
109                try:
110                        discoveryID = self.getID(filename)
111                        record = PostgresRecord(filename, self._NDG_dataProvider, \
112                                                            self._datacentre_groups, self._datacentre_namespace, \
113                                                            discoveryID, self._xq, self._datacentre_format)
114       
115                        # Now create the data access object to interface to the DB
116                        dao = PostgresDAO(record, pgClient = self.pgc)
117               
118                        # Finally, write the new record
119                        if dao.createOrUpdateRecord():
120                                self._no_files_ingested += 1
121                except:
122                        logging.error("Exception thrown - detail: ")
123                        logging.error(sys.exc_info())
124                       
125                        if dao:
126                                logging.info("Removing record and its associated info from DB")
127                                logging.info("- to allow clean ingestion on rerun")
128                                try:
129                                        dao.deleteOriginalRecord()
130                                except:
131                                        logging.error("Problem encountered when removing record: ")
132                                        logging.error(sys.exc_info())
133                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
134
135                        self._no_problem_files += 1
136                        logging.info("Continue processing other files")
137       
138       
139        def getConfigDetails(self, datacentre):
140                '''
141                Get the harvested records directory and groups for this datacentre from the
142                datacentre specific config file.  The harvested records directory depends on the
143                datacentres OAI base url, the set and format. These have to be know up-front.
144                The groups denote which 'portal groups' they belong to - for limiting searches to
145                say NERC-only datacentres records.
146                Groups are added to the intermediate MOLES when it is created.
147                @param datacentre: datacentre to use when looking up config file
148                '''
149                # initialise the variables to retrieve from the config file
150                self._harvest_home = ""
151                self._datacentre_groups = ""
152                self._datacentre_format = ""
153                self._datacentre_namespace = ""
154                self._NDG_dataProvider = False
155
156                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
157                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
158               
159                # Check this file exists; if not, assume an invalid datacentre has been specified
160                if not os.path.isfile(self._datacentre_config_filename):
161                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
162                        "specified (%s) is invalid\n" %self.datacentre)
163                   
164                datacentre_config_file = open(self._datacentre_config_filename, "r")
165               
166                for line in datacentre_config_file.readlines():
167                    words  = string.split(line)
168                    if len(words) == 0:
169                        continue
170                    if words[0] == 'host_path':
171                        self._harvest_home = string.rstrip(words[1])
172                    if words[0] == 'groups':
173                        self._datacentre_groups = words[1:]
174                    if words[0] == 'format':
175                        self._datacentre_format = words[1]
176                    if words[0] == 'namespace':
177                        self._datacentre_namespace = words[1]
178                    if words[0] == 'NDG_dataProvider':
179                        self._NDG_dataProvider = True
180               
181                datacentre_config_file.close()
182               
183                if self._harvest_home == "":
184                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
185               
186                logging.info("harvested records are in " + self._harvest_home)
187               
188                if self._datacentre_groups == "":
189                    logging.info("No groups/keywords set for datacentre " + datacentre)
190                else:
191                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
192               
193                if self._datacentre_format == "":
194                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
195               
196                logging.info("format being harvested: " + self._datacentre_format)
197               
198                if self._datacentre_namespace == "":
199                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
200               
201                logging.info("datacentre namespace: " + self._datacentre_namespace)
202               
203                if self._NDG_dataProvider:
204                        logging.info("Datacentre classified as an NDG data provider")
205                else:
206                        logging.info("Datacentre is not classified as an NDG data provider")
207                logging.info(self.lineSeparator)
208
209
210        def _convertIngestFiles(self, originals_dir, discovery_dir):
211                '''
212                Processes/renames the files (changed 08/01/07 to get id from inside file)
213                 - also replace any namespace declarations with a standard one which we know works in NDG
214                 NB, this copies files from the original dir to the discovery dir
215                 @param originals_dir: directory to convert files from
216                 @param discovery_dir: directory in which converted files will end up
217                 @return numfilesproc: counter of number of files processed
218                '''
219                numfilesproc = 0
220                logging.info(self.lineSeparator)
221                logging.info("Renaming files:")
222                for filename in os.listdir(originals_dir):
223                        if not filename.endswith('.xml'):
224                                logging.warning('File %s is not xml format. Not processed'  %(filename))
225                                continue
226                       
227                        original_filename = originals_dir + filename
228                        try:
229                                ident=self.getID(original_filename)
230                        except Exception, detail:
231                                logging.error("Could not retrieve ID from file, %s" %filename)
232                                logging.error("Detail: %s" %detail)
233                                logging.info("Continue with next file")
234                                continue
235                       
236                        if self._NDG_dataProvider:
237                                new_filename = discovery_dir + ident.replace(":", "__")+".xml"
238                        else:
239                                ident = ident.replace(":", "-")
240                                ident = ident.replace("/", "-")
241                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
242                                logging.info("original file = " + original_filename)
243                                logging.info("newfile = " + new_filename)
244                       
245                        # now correct any namespace issues
246                        try:
247                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
248                        except Exception, detail:
249                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
250                                logging.error("Detail: %s" %detail)
251                                logging.info("Continue with next file")
252                                continue
253                        numfilesproc += 1
254               
255                logging.info("File renaming and converting completed")
256                logging.info(self.lineSeparator)
257                return numfilesproc
258
259               
260        def _getPostgresDBConnection(self):
261                '''
262                Get the default postgres DB connection - by reading in data from the db config file
263                '''
264                logging.debug("Setting up connection to postgres DB")
265                self.pgc = pgc(configFile = 'ingest.config')
266        logging.info("Postgres DB connection now set up")
267
268
269        def     _backupAndCleanData(self):
270                '''
271                Back up ingested data for specified data centre, then clean any of
272                the ingest dirs
273                '''
274                logging.info("Backing up ingested data")
275                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
276                  strftime("%y%m%d_%H%M") + "_originals/"
277               
278                FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
279                logging.info("Data backed up - now clearing ingest directories")
280                #Clear out the original harvest records area and discovery dir
281                FileUtilities.cleanDir(self.originals_dir)
282                FileUtilities.cleanDir(self.discovery_dir)
283                logging.info("Ingest directories cleared")
284
285
286        def     _setupDataCentreDirs(self):
287                '''
288                Set up directories appropriate for the current data centre
289                '''
290                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
291                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
292               
293                # the following dirs define where the specific documents should go
294                self.originals_dir = data_dir + "/oai/originals/"
295                self.discovery_dir = data_dir + "/discovery/"
296               
297                # Create/clear the 'in' and 'out' directories
298                FileUtilities.setUpDir(self.originals_dir)
299                FileUtilities.setUpDir(self.discovery_dir)
300               
301                logging.info("Ingest directories for data centre set up")
302
303
304        def _convertAndIngestFiles(self, originals_dir, discovery_dir):
305                '''
306                Convert files from originals dir to discovery one, then
307                ingest, backup and clean
308                @param originals_dir: directory to ingest docs from
309                @param discovery_dir: directory to use to process ingested docs
310                '''
311                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
312               
313                filenames = os.listdir(self.discovery_dir)
314                for filename in filenames:
315                        fullPath = self.discovery_dir + filename
316                        if os.path.isfile(fullPath):
317                                self.addFileToPostgresDB(fullPath)
318               
319                import pdb
320                pdb.set_trace()
321                self._backupAndCleanData()
322                return numfilesproc
323
324
325
326        def _setupXQueries(self):
327                '''
328                Set up the required XQueries
329                - NB, extract the xquery libraries locally for easy reference
330                '''
331                self._xq=ndgXqueries()
332                for libFile in self._xq.xqlib:
333                        FileUtilities.createFile(libFile, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.