source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 6099

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@6099
Revision 6099, 19.4 KB checked in by sdonegan, 10 years ago (diff)

Updated reporting

  • Property svn:executable set to *
RevLine 
[4854]1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
[5414]8import os, sys, string, getopt, logging, re, pkg_resources
[4854]9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
[4948]11from ndg.common.src.lib.ndgresources import ndgResources
[4854]12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
[5414]17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
[4854]22
23class AbstractDocumentIngester(object):
24        '''
25        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
26        - including running the various transforms and parsings to get all doc types and spatiotemporal
27        data in the correct form in the DB
28        '''
[5737]29        lineSeparator = "------------------------------"
[4854]30                       
31        # The directory to put things for a tape backup (should already exist)
32        BACKUP_DIR = '/disks/glue1/oaiBackup/'
[5414]33       
[6031]34        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
35       
36        #Use PRODUCTION NDG redirect service on triton now (20/11/09)
37        NDG_redirect_URL = 'http://triton.badc.rl.ac.uk/NDGredirection/ndgURLredirect/redirect?url='
[4854]38
39        def _setupCmdLineOptions(self):
40                '''
41                Determine the logging level to use and configure this appropriately
42                @return args: any input arguments - excluding options
43                '''
[5414]44               
[4854]45                # check for verbose option
[5414]46               
[4854]47                try:
[5600]48                        opts, args = getopt.getopt(sys.argv[1:], "vd")
[4854]49                except getopt.GetoptError, err:
50                    # print help information and exit:
51                    print str(err) # will print something like "option -a not recognized"
52                    sys.exit(2)
[5414]53                   
[4854]54                if len(args) < 1:
55                        self.usage()
56                   
57                loggingLevel = logging.WARNING
58                for o, a in opts:
59                    if o == "-v":
60                        print " - Verbose mode ON"
61                        loggingLevel = logging.INFO
62                    elif o == "-d":
63                        print " - Debug mode ON"
64                        loggingLevel = logging.DEBUG
[5414]65                   
[4854]66               
67                # set up any keywords on the object
[5976]68                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
[4854]69                for arg in args:
[5414]70                       
[5976]71                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll']
[5846]72                       
[5976]73                       
74                       
[4854]75                        bits = arg.split('=')
76                        if len(bits) == 2:
[5846]77                                if bits[0] == keywordArgs[0]:
[4854]78                                        self.setIngestFromDate(bits[1])
[5846]79                                elif bits[0] == keywordArgs[1]:
[4854]80                                        self.setPollInterval(bits[1])
[5846]81                                elif bits[0] == keywordArgs[2]:
[5414]82                                        print " - Running in single file ingestion mode!"
83                                        self.setIndFileToIngest(bits[1])
[5846]84                                elif bits[0] not in keywordArgs:
85                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
86                                        sys.exit(2)
[4854]87                                else:
88                                        setattr(self, bits[0], bits[1])
[5846]89                       
[4854]90                print self.lineSeparator
[5600]91               
[4854]92                # NB, this is a slight fudge as cannot get the detailed logging to work
93                # without setting up a new logger here - which means we get two loggers
94                # outputing data. The initial call to logging needs to be tracked down
95                # and configured correctly, so this can be avoided...
[5243]96#               self.logger = logging.getLogger()
97#               self.logger.setLevel(loggingLevel)
[4854]98
99                # create console handler and set level to debug
[5243]100#               ch = logging.StreamHandler()
101#               ch.setLevel(loggingLevel)
[4854]102                # create formatter
[5243]103#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
[4854]104                # add formatter to ch
[5243]105#               ch.setFormatter(formatter)
[4854]106                # add ch to logger
[5243]107#               self.logger.addHandler(ch)
[4854]108                return args
109
110
111        def getID(self, filename):
112                '''
113                Gets the identifier out of an input metadata xml record.
114                @param filename - name of document file being processed
115                @return: ID - id to use to refer to the document
116                '''
117                logging.info("Retrieving identifier for metadata record " + filename)
118                xml=file(filename).read()
119                ID = idget(xml)
120                return ID
121       
122       
123        def addFileToPostgresDB(self, filename):
124                '''
125                Add a file to the postgres DB - extracting and storing all the required
126                data in the process
127                @param filename: full path of file to add to postgres DB
128                '''
129                logging.info("Adding file, " + filename + ", to postgres DB")
130               
[5414]131                numSlash = len(filename.split('/'))
132               
133                shortFilename = filename.split('/')[numSlash - 1]
134               
135                self.recordAttemptToIngestDiscoveryID = ""                             
136               
[4854]137                # first of all create a PostgresRecord - this object represents all the data required
138                # for a DB entry
139                dao = None
[5414]140               
[4854]141                try:
[5414]142                        #discoveryID = self.getID(filename)
[5167]143                       
[5414]144                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
145                                               
146                        discoveryID = basicParameters.datasetID                 
147                        datasetName = basicParameters.datasetName
148                        datacentreName = basicParameters.datacentreName
149                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
[5464]150                        datasetStartDateNom = basicParameters.datasetStartDateNom
[5524]151                        datasetEndDateNom = basicParameters.datasetEndDateNom
[5414]152                       
153                        #record whats attempting to be ingested
154                        self.recordAttemptToIngestDiscoveryID = discoveryID
155                       
156                                                                                                           
[4854]157                        record = PostgresRecord(filename, self._NDG_dataProvider, \
158                                                            self._datacentre_groups, self._datacentre_namespace, \
[5524]159                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom,datasetEndDateNom, \
[5414]160                                                            self._xq, self._datacentre_format)
[5737]161                        #import pdb
162                        #pdb.set_trace()
[5167]163                       
[4854]164                        # Now create the data access object to interface to the DB
165                        dao = PostgresDAO(record, pgClient = self.pgc)
[5414]166                       
[4854]167                        # Finally, write the new record
[5414]168                        # 0 if failed, 1 if update, 2 if create
169                        returnCode = dao.createOrUpdateRecord() 
170                        if returnCode == 2:
[4854]171                                self._no_files_ingested += 1
[5414]172                        elif returnCode == 1:
173                                self._no_files_changed += 1
174                        #if 0 nothing incremented
175                       
[4854]176                except:
[5414]177                       
178                        #if error encountered, add to failure lisr
179                        logging.error("Could not update: " + filename)                 
180                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
181                        self.updateFailList.append(originalRecordFilename)
182                       
[4854]183                        logging.error("Exception thrown - detail: ")
[5243]184                        errors = sys.exc_info()
185                        logging.error(errors)
[5252]186                        self._error_messages += "%s\n" %str(errors[1])
[4854]187                       
188                        if dao:
189                                logging.info("Removing record and its associated info from DB")
190                                logging.info("- to allow clean ingestion on rerun")
191                                try:
192                                        dao.deleteOriginalRecord()
193                                except:
194                                        logging.error("Problem encountered when removing record: ")
195                                        logging.error(sys.exc_info())
196                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
197
198                        self._no_problem_files += 1
[5414]199                       
[4854]200                        logging.info("Continue processing other files")
[5414]201                       
202                return self.recordAttemptToIngestDiscoveryID
203                       
[4854]204       
205        def getConfigDetails(self, datacentre):
206                '''
207                Get the harvested records directory and groups for this datacentre from the
208                datacentre specific config file.  The harvested records directory depends on the
209                datacentres OAI base url, the set and format. These have to be know up-front.
210                The groups denote which 'portal groups' they belong to - for limiting searches to
211                say NERC-only datacentres records.
212                Groups are added to the intermediate MOLES when it is created.
213                @param datacentre: datacentre to use when looking up config file
214                '''
215                # initialise the variables to retrieve from the config file
216                self._harvest_home = ""
217                self._datacentre_groups = ""
218                self._datacentre_format = ""
219                self._datacentre_namespace = ""
220                self._NDG_dataProvider = False
221
[5248]222                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
[5218]223                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
[5297]224               
[5523]225
[5252]226                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
[5523]227               
[5248]228                for line in file.split('\n'):
229                        words = line.split()
230                        if len(words) == 0:
231                                continue
232                        elif words[0] == 'host_path':
233                                self._harvest_home = words[1].rstrip()
234                        elif words[0] == 'groups':
235                                self._datacentre_groups = words[1:]
236                        elif words[0] == 'format':
237                                self._datacentre_format = words[1]
238                        elif words[0] == 'namespace':
239                                self._datacentre_namespace = words[1]
240                        elif words[0] == 'NDG_dataProvider':
241                                self._NDG_dataProvider = True
242                       
[4854]243                if self._harvest_home == "":
[5218]244                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
[4854]245               
246                logging.info("harvested records are in " + self._harvest_home)
247               
248                if self._datacentre_groups == "":
249                    logging.info("No groups/keywords set for datacentre " + datacentre)
250                else:
251                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
252               
253                if self._datacentre_format == "":
[5218]254                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
[4854]255               
256                logging.info("format being harvested: " + self._datacentre_format)
257               
258                if self._datacentre_namespace == "":
[5218]259                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
[4854]260               
261                logging.info("datacentre namespace: " + self._datacentre_namespace)
262               
263                if self._NDG_dataProvider:
264                        logging.info("Datacentre classified as an NDG data provider")
265                else:
266                        logging.info("Datacentre is not classified as an NDG data provider")
267                logging.info(self.lineSeparator)
268
269
270        def _convertIngestFiles(self, originals_dir, discovery_dir):
271                '''
272                Processes/renames the files (changed 08/01/07 to get id from inside file)
273                 - also replace any namespace declarations with a standard one which we know works in NDG
274                 NB, this copies files from the original dir to the discovery dir
275                 @param originals_dir: directory to convert files from
276                 @param discovery_dir: directory in which converted files will end up
277                 @return numfilesproc: counter of number of files processed
278                '''
279                numfilesproc = 0
[5414]280               
281                self.inputFileOrigFinal = {}
282               
[4854]283                logging.info(self.lineSeparator)
284                logging.info("Renaming files:")
[5414]285               
[4854]286                for filename in os.listdir(originals_dir):
[5737]287                       
[4854]288                        if not filename.endswith('.xml'):
289                                logging.warning('File %s is not xml format. Not processed'  %(filename))
290                                continue
291                       
292                        original_filename = originals_dir + filename
[5414]293                       
294                       
[5737]295                       
[5414]296                        #convert urls within original xml input file to NDG redirect URLS
297                       
298                        ''' This xquery method superceded by new python class in Utilities.py that uses
299                        elementtree to do the same job but with better handling of character sets...'''
300                        '''
301                        if self._datacentre_format == 'DIF':           
302                                xQueryType = 'transform_DIF_URL'
303                               
304                        elif self._datacentre_format == 'MDIP':
305                                xQueryType = 'transform_MDIP_URL'
306                               
307                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/'
308                       
309                        #filename_rd = filename.replace(".xml","_redirect.xml")
310                        #original_filename_urlRedirect = originals_dir + filename_rd
311                       
312                        ndgRedirectTransform(xQueryType,redirect_url,original_filename)
313                        '''
314                       
315                        #call new class in Utilities.py --will replace original file...
316                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
[5737]317                       
318               
319                                               
[5414]320                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
321                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
322                                   
[4854]323                        try:
[5414]324                                #ident=self.getID(original_filename)
325                                ident=basicParameters.datasetID
326                                logging.info("Dataset ID is " + ident)
327                                       
[4854]328                        except Exception, detail:
329                                logging.error("Could not retrieve ID from file, %s" %filename)
330                                logging.error("Detail: %s" %detail)
331                                logging.info("Continue with next file")
332                                continue
333                       
334                        if self._NDG_dataProvider:
[5414]335                               
[5737]336                                #new_filename = discovery_dir + ident.replace(":", "__")+".xml"
337                                #new_filename_short = ident.replace(":", "__")+".xml"
338                               
339                                #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
340                                new_filename = discovery_dir + self._datacentre_namespace + "__" +self._datacentre_format + "__"+ ident.replace(":", "-")+".xml"
341                                new_filename_short = self._datacentre_namespace + "__" +self._datacentre_format + "__" + ident.replace(":", "-")+".xml"
342                               
[4854]343                        else:
[5737]344                               
[4854]345                                ident = ident.replace(":", "-")
346                                ident = ident.replace("/", "-")
[5737]347                               
[4854]348                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
[5414]349                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
350                               
351                               
352                        logging.info("original file = " + original_filename)
353                        logging.info("newfile = " + new_filename)
354                               
355                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
356                        #this links a derived filename in processing dir with original filename
357                        #get basic filename from the path+filename passed to this function to use as key               
358                        self.inputFileOrigFinal[new_filename_short]=filename
[4854]359                       
360                        # now correct any namespace issues
361                        try:
362                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
363                        except Exception, detail:
364                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
365                                logging.error("Detail: %s" %detail)
366                                logging.info("Continue with next file")
367                                continue
368                        numfilesproc += 1
369               
370                logging.info("File renaming and converting completed")
371                logging.info(self.lineSeparator)
[5737]372               
373                #sys.exit()
374               
[4854]375                return numfilesproc
376
377               
378        def _getPostgresDBConnection(self):
379                '''
380                Get the default postgres DB connection - by reading in data from the db config file
381                '''
[5414]382               
[4854]383                logging.debug("Setting up connection to postgres DB")
[5542]384                self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
[5414]385                logging.info("Postgres DB connection now set up")
[4854]386
387
[5414]388
[4854]389        def     _backupAndCleanData(self):
390                '''
391                Back up ingested data for specified data centre, then clean any of
392                the ingest dirs
393                '''
394                logging.info("Backing up ingested data")
395                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
396                  strftime("%y%m%d_%H%M") + "_originals/"
397               
[5167]398                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
[4854]399                logging.info("Data backed up - now clearing ingest directories")
400                #Clear out the original harvest records area and discovery dir
[5414]401                #FileUtilities.cleanDir(self.originals_dir)
[5542]402                #FileUtilities.cleanDir(self.discovery_dir)
[4854]403                logging.info("Ingest directories cleared")
404
405
406        def     _setupDataCentreDirs(self):
407                '''
408                Set up directories appropriate for the current data centre
409                '''
410                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
411                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
412               
413                # the following dirs define where the specific documents should go
414                self.originals_dir = data_dir + "/oai/originals/"
415                self.discovery_dir = data_dir + "/discovery/"
416               
417                # Create/clear the 'in' and 'out' directories
418                FileUtilities.setUpDir(self.originals_dir)
419                FileUtilities.setUpDir(self.discovery_dir)
420               
421                logging.info("Ingest directories for data centre set up")
422
423
[5600]424        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed):
[4854]425                '''
426                Convert files from originals dir to discovery one, then
427                ingest, backup and clean
428                @param originals_dir: directory to ingest docs from
429                @param discovery_dir: directory to use to process ingested docs
[5600]430                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
[4854]431                '''
[5414]432               
[4854]433                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
434               
435                filenames = os.listdir(self.discovery_dir)
[5414]436               
437                #generate a list of files ALREADY in database so can compare with what has been ingested
438                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
[5600]439               
440               
[5414]441                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
442                filesPresentList=[]
443                if filePresentListArr:
444                        for fileArr in filePresentListArr:
445                                filesPresentList.append(fileArr[0])
[5537]446                               
447                                #TODO - is above relevant - duplicating functionlaity?
[5414]448       
449                #create list to to hold files ingest failed on.
450                self.updateFailList = []
451                self.deletedFailList = []
452               
[5537]453               
[4854]454                for filename in filenames:
455                        fullPath = self.discovery_dir + filename
[5414]456                       
[4854]457                        if os.path.isfile(fullPath):
[5414]458                                thisIngestedID = self.addFileToPostgresDB(fullPath)
459                               
460                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
461                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
[5537]462                                        if thisIngestedID in filesPresentList: 
463                                                filesPresentList.remove(thisIngestedID)                 
[5414]464                                               
465                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
466                #will need to be removed.
[5537]467               
[5600]468                #only do this if not in single file mode (or else wverything else gets deleted!)               
469                if ((self.indFileToIngest == "") & (feed)):
[5537]470                        for item in filesPresentList:
471                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
[5737]472                                DeleteRecord(item)
[5537]473                                self.deletedFailList.append(item)
474                                self._no_files_deleted += 1
[5414]475                       
476                       
477                self._backupAndCleanData()
[4854]478               
[5414]479                #at this stage put the reporting code in (will work for oai or atom feed)
480                #create a summary file for each data centre ingest
481                data_dir = self._base_dir + "data/"
482                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
483                recOpFile = open(recOpFileName,'w')
484               
485                logging.info("oai_document_ingest processing complete:")
486               
[5542]487                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
488                message = "Ingest report for data centre: " + datacentre + "\n"
489                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
490                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
[6099]491                message = message + "TOTAL_PROCESSED " + str(numfilesproc) + "\n"
492                message = message + "INGESTED_Created " + str(self._no_files_changed)  + "\n"
493                message = message + "INGESTED_Updated " + str(self._no_files_ingested)  + "\n"
[5542]494                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
[6099]495                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
[5542]496               
497                '''recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
[5414]498                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
499                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
500                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
501                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
502                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
503                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
[5542]504                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")'''
[5414]505               
[5542]506               
[5414]507                for badFile in self.updateFailList:
[5542]508                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
509                        message = message +"PROBLEM_FILE " + badFile + "\n"
510                         
511                recOpFile.write(message)
512                               
513                return numfilesproc, message
[4854]514
515
516
517        def _setupXQueries(self):
518                '''
519                Set up the required XQueries
520                - NB, extract the xquery libraries locally for easy reference
521                '''
[4948]522                self._xq = ndgResources()
[4854]523                for libFile in self._xq.xqlib:
[4948]524                        # NB, we don't want the full path to the files - just the filename
525                        fileName = libFile.split('/')[-1]
526                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.