source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5523

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5523
Revision 5523, 17.2 KB checked in by sdonegan, 11 years ago (diff)

synchronising dev with buildouts for info editor operations

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22
23class AbstractDocumentIngester(object):
24        '''
25        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
26        - including running the various transforms and parsings to get all doc types and spatiotemporal
27        data in the correct form in the DB
28        '''
29        lineSeparator = "-----------------------------"
30                       
31        # The directory to put things for a tape backup (should already exist)
32        BACKUP_DIR = '/disks/glue1/oaiBackup/'
33       
34        NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
35
36        def _setupCmdLineOptions(self):
37                '''
38                Determine the logging level to use and configure this appropriately
39                @return args: any input arguments - excluding options
40                '''
41               
42                # check for verbose option
43               
44                try:
45                        opts, args = getopt.getopt(sys.argv[1:], "vdi")
46                except getopt.GetoptError, err:
47                    # print help information and exit:
48                    print str(err) # will print something like "option -a not recognized"
49                    sys.exit(2)
50                   
51                if len(args) < 1:
52                        self.usage()
53                   
54                loggingLevel = logging.WARNING
55                for o, a in opts:
56                    if o == "-v":
57                        print " - Verbose mode ON"
58                        loggingLevel = logging.INFO
59                    elif o == "-d":
60                        print " - Debug mode ON"
61                        loggingLevel = logging.DEBUG
62                   
63               
64                # set up any keywords on the object
65                # NB, be careful to keep the instance variables the same name as the keywords!
66                for arg in args:
67                       
68                        bits = arg.split('=')
69                        if len(bits) == 2:
70                                if bits[0] == 'ingestFromDate':
71                                        self.setIngestFromDate(bits[1])
72                                elif bits[0] == 'interval':
73                                        self.setPollInterval(bits[1])
74                                elif bits[0] == 'individualFile':
75                                        print " - Running in single file ingestion mode!"
76                                        self.setIndFileToIngest(bits[1])
77                                else:
78                                        setattr(self, bits[0], bits[1])
79               
80                print self.lineSeparator
81                # NB, this is a slight fudge as cannot get the detailed logging to work
82                # without setting up a new logger here - which means we get two loggers
83                # outputing data. The initial call to logging needs to be tracked down
84                # and configured correctly, so this can be avoided...
85#               self.logger = logging.getLogger()
86#               self.logger.setLevel(loggingLevel)
87
88                # create console handler and set level to debug
89#               ch = logging.StreamHandler()
90#               ch.setLevel(loggingLevel)
91                # create formatter
92#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
93                # add formatter to ch
94#               ch.setFormatter(formatter)
95                # add ch to logger
96#               self.logger.addHandler(ch)
97                return args
98
99
100        def getID(self, filename):
101                '''
102                Gets the identifier out of an input metadata xml record.
103                @param filename - name of document file being processed
104                @return: ID - id to use to refer to the document
105                '''
106                logging.info("Retrieving identifier for metadata record " + filename)
107                xml=file(filename).read()
108                ID = idget(xml)
109                return ID
110       
111       
112        def addFileToPostgresDB(self, filename):
113                '''
114                Add a file to the postgres DB - extracting and storing all the required
115                data in the process
116                @param filename: full path of file to add to postgres DB
117                '''
118                logging.info("Adding file, " + filename + ", to postgres DB")
119               
120                numSlash = len(filename.split('/'))
121               
122                shortFilename = filename.split('/')[numSlash - 1]
123               
124                self.recordAttemptToIngestDiscoveryID = ""                             
125               
126                # first of all create a PostgresRecord - this object represents all the data required
127                # for a DB entry
128                dao = None
129               
130                try:
131                        #discoveryID = self.getID(filename)
132                       
133                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
134                                               
135                        discoveryID = basicParameters.datasetID                 
136                        datasetName = basicParameters.datasetName
137                        datacentreName = basicParameters.datacentreName
138                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
139                        datasetStartDateNom = basicParameters.datasetStartDateNom
140                       
141                        #record whats attempting to be ingested
142                        self.recordAttemptToIngestDiscoveryID = discoveryID
143                       
144                                                                                                           
145                        record = PostgresRecord(filename, self._NDG_dataProvider, \
146                                                            self._datacentre_groups, self._datacentre_namespace, \
147                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom, \
148                                                            self._xq, self._datacentre_format)
149                       
150                       
151                        # Now create the data access object to interface to the DB
152                        dao = PostgresDAO(record, pgClient = self.pgc)
153                       
154                        # Finally, write the new record
155                        # 0 if failed, 1 if update, 2 if create
156                        returnCode = dao.createOrUpdateRecord() 
157                        if returnCode == 2:
158                                self._no_files_ingested += 1
159                        elif returnCode == 1:
160                                self._no_files_changed += 1
161                        #if 0 nothing incremented
162                       
163                except:
164                       
165                        #if error encountered, add to failure lisr
166                        logging.error("Could not update: " + filename)                 
167                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
168                        self.updateFailList.append(originalRecordFilename)
169                       
170                        logging.error("Exception thrown - detail: ")
171                        errors = sys.exc_info()
172                        logging.error(errors)
173                        self._error_messages += "%s\n" %str(errors[1])
174                       
175                        if dao:
176                                logging.info("Removing record and its associated info from DB")
177                                logging.info("- to allow clean ingestion on rerun")
178                                try:
179                                        dao.deleteOriginalRecord()
180                                except:
181                                        logging.error("Problem encountered when removing record: ")
182                                        logging.error(sys.exc_info())
183                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
184
185                        self._no_problem_files += 1
186                       
187                        logging.info("Continue processing other files")
188                       
189                return self.recordAttemptToIngestDiscoveryID
190                       
191       
192        def getConfigDetails(self, datacentre):
193                '''
194                Get the harvested records directory and groups for this datacentre from the
195                datacentre specific config file.  The harvested records directory depends on the
196                datacentres OAI base url, the set and format. These have to be know up-front.
197                The groups denote which 'portal groups' they belong to - for limiting searches to
198                say NERC-only datacentres records.
199                Groups are added to the intermediate MOLES when it is created.
200                @param datacentre: datacentre to use when looking up config file
201                '''
202                # initialise the variables to retrieve from the config file
203                self._harvest_home = ""
204                self._datacentre_groups = ""
205                self._datacentre_format = ""
206                self._datacentre_namespace = ""
207                self._NDG_dataProvider = False
208
209                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
210                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
211               
212
213                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
214               
215                for line in file.split('\n'):
216                        words = line.split()
217                        if len(words) == 0:
218                                continue
219                        elif words[0] == 'host_path':
220                                self._harvest_home = words[1].rstrip()
221                        elif words[0] == 'groups':
222                                self._datacentre_groups = words[1:]
223                        elif words[0] == 'format':
224                                self._datacentre_format = words[1]
225                        elif words[0] == 'namespace':
226                                self._datacentre_namespace = words[1]
227                        elif words[0] == 'NDG_dataProvider':
228                                self._NDG_dataProvider = True
229                       
230                if self._harvest_home == "":
231                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
232               
233                logging.info("harvested records are in " + self._harvest_home)
234               
235                if self._datacentre_groups == "":
236                    logging.info("No groups/keywords set for datacentre " + datacentre)
237                else:
238                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
239               
240                if self._datacentre_format == "":
241                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
242               
243                logging.info("format being harvested: " + self._datacentre_format)
244               
245                if self._datacentre_namespace == "":
246                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
247               
248                logging.info("datacentre namespace: " + self._datacentre_namespace)
249               
250                if self._NDG_dataProvider:
251                        logging.info("Datacentre classified as an NDG data provider")
252                else:
253                        logging.info("Datacentre is not classified as an NDG data provider")
254                logging.info(self.lineSeparator)
255
256
257        def _convertIngestFiles(self, originals_dir, discovery_dir):
258                '''
259                Processes/renames the files (changed 08/01/07 to get id from inside file)
260                 - also replace any namespace declarations with a standard one which we know works in NDG
261                 NB, this copies files from the original dir to the discovery dir
262                 @param originals_dir: directory to convert files from
263                 @param discovery_dir: directory in which converted files will end up
264                 @return numfilesproc: counter of number of files processed
265                '''
266                numfilesproc = 0
267               
268                self.inputFileOrigFinal = {}
269               
270                logging.info(self.lineSeparator)
271                logging.info("Renaming files:")
272               
273                for filename in os.listdir(originals_dir):
274                        if not filename.endswith('.xml'):
275                                logging.warning('File %s is not xml format. Not processed'  %(filename))
276                                continue
277                       
278                        original_filename = originals_dir + filename
279                       
280                       
281                        #convert urls within original xml input file to NDG redirect URLS
282                       
283                        ''' This xquery method superceded by new python class in Utilities.py that uses
284                        elementtree to do the same job but with better handling of character sets...'''
285                        '''
286                        if self._datacentre_format == 'DIF':           
287                                xQueryType = 'transform_DIF_URL'
288                               
289                        elif self._datacentre_format == 'MDIP':
290                                xQueryType = 'transform_MDIP_URL'
291                               
292                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/'
293                       
294                        #filename_rd = filename.replace(".xml","_redirect.xml")
295                        #original_filename_urlRedirect = originals_dir + filename_rd
296                       
297                        ndgRedirectTransform(xQueryType,redirect_url,original_filename)
298                        '''
299                       
300                        #call new class in Utilities.py --will replace original file...
301                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
302                       
303               
304                                               
305                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
306                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
307                                   
308                        try:
309                                #ident=self.getID(original_filename)
310                                ident=basicParameters.datasetID
311                                logging.info("Dataset ID is " + ident)
312                                       
313                        except Exception, detail:
314                                logging.error("Could not retrieve ID from file, %s" %filename)
315                                logging.error("Detail: %s" %detail)
316                                logging.info("Continue with next file")
317                                continue
318                       
319                        if self._NDG_dataProvider:
320                                new_filename = discovery_dir + ident.replace(":", "__")+".xml"
321                                new_filename_short = ident.replace(":", "__")+".xml"
322                               
323                        else:
324                                ident = ident.replace(":", "-")
325                                ident = ident.replace("/", "-")
326                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
327                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
328                               
329                               
330                        logging.info("original file = " + original_filename)
331                        logging.info("newfile = " + new_filename)
332                               
333                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
334                        #this links a derived filename in processing dir with original filename
335                        #get basic filename from the path+filename passed to this function to use as key               
336                        self.inputFileOrigFinal[new_filename_short]=filename
337                       
338                        # now correct any namespace issues
339                        try:
340                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
341                        except Exception, detail:
342                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
343                                logging.error("Detail: %s" %detail)
344                                logging.info("Continue with next file")
345                                continue
346                        numfilesproc += 1
347               
348                logging.info("File renaming and converting completed")
349                logging.info(self.lineSeparator)
350                return numfilesproc
351
352               
353        def _getPostgresDBConnection(self):
354                '''
355                Get the default postgres DB connection - by reading in data from the db config file
356                '''
357               
358                logging.debug("Setting up connection to postgres DB")
359                self.pgc = pgc(configFile = 'ingest.config')
360                logging.info("Postgres DB connection now set up")
361
362
363
364        def     _backupAndCleanData(self):
365                '''
366                Back up ingested data for specified data centre, then clean any of
367                the ingest dirs
368                '''
369                logging.info("Backing up ingested data")
370                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
371                  strftime("%y%m%d_%H%M") + "_originals/"
372               
373                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
374                logging.info("Data backed up - now clearing ingest directories")
375                #Clear out the original harvest records area and discovery dir
376                #FileUtilities.cleanDir(self.originals_dir)
377                FileUtilities.cleanDir(self.discovery_dir)
378                logging.info("Ingest directories cleared")
379
380
381        def     _setupDataCentreDirs(self):
382                '''
383                Set up directories appropriate for the current data centre
384                '''
385                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
386                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
387               
388                # the following dirs define where the specific documents should go
389                self.originals_dir = data_dir + "/oai/originals/"
390                self.discovery_dir = data_dir + "/discovery/"
391               
392                # Create/clear the 'in' and 'out' directories
393                FileUtilities.setUpDir(self.originals_dir)
394                FileUtilities.setUpDir(self.discovery_dir)
395               
396                logging.info("Ingest directories for data centre set up")
397
398
399        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre):
400                '''
401                Convert files from originals dir to discovery one, then
402                ingest, backup and clean
403                @param originals_dir: directory to ingest docs from
404                @param discovery_dir: directory to use to process ingested docs
405                '''
406               
407                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
408               
409                filenames = os.listdir(self.discovery_dir)
410               
411                #generate a list of files ALREADY in database so can compare with what has been ingested
412                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
413                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
414                filesPresentList=[]
415                if filePresentListArr:
416                        for fileArr in filePresentListArr:
417                                filesPresentList.append(fileArr[0])
418       
419                #create list to to hold files ingest failed on.
420                self.updateFailList = []
421                self.deletedFailList = []
422               
423                for filename in filenames:
424                        fullPath = self.discovery_dir + filename
425                       
426                        if os.path.isfile(fullPath):
427                                thisIngestedID = self.addFileToPostgresDB(fullPath)
428                               
429                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
430                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
431                                        if thisIngestedID in filesPresentList:                                         
432                                                filesPresentList.remove(thisIngestedID)                                 
433                                               
434                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
435                #will need to be removed.
436                for item in filesPresentList:
437                        logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")
438                        DeleteRecord(item)
439                        self.deletedFailList.append(item)
440                        self._no_files_deleted += 1
441                       
442                       
443                self._backupAndCleanData()
444               
445                #at this stage put the reporting code in (will work for oai or atom feed)
446                #create a summary file for each data centre ingest
447                data_dir = self._base_dir + "data/"
448                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
449                recOpFile = open(recOpFileName,'w')
450               
451                logging.info("oai_document_ingest processing complete:")
452               
453                recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
454                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
455                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
456                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
457                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
458                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
459                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
460                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")
461               
462                for badFile in self.updateFailList:
463                        recOpFile.write("PROBLEM_FILE " + badFile + "\n")
464               
465                return numfilesproc
466
467
468
469        def _setupXQueries(self):
470                '''
471                Set up the required XQueries
472                - NB, extract the xquery libraries locally for easy reference
473                '''
474                self._xq = ndgResources()
475                for libFile in self._xq.xqlib:
476                        # NB, we don't want the full path to the files - just the filename
477                        fileName = libFile.split('/')[-1]
478                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.