source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5737

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5737
Revision 5737, 18.9 KB checked in by sdonegan, 10 years ago (diff)

Adjusted filename handling and id's to fix problem with presenting Discovery view in service

  • Property svn:executable set to *
RevLine 
[4854]1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
[5414]8import os, sys, string, getopt, logging, re, pkg_resources
[4854]9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
[4948]11from ndg.common.src.lib.ndgresources import ndgResources
[4854]12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
[5414]17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
[4854]22
23class AbstractDocumentIngester(object):
24        '''
25        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
26        - including running the various transforms and parsings to get all doc types and spatiotemporal
27        data in the correct form in the DB
28        '''
[5737]29        lineSeparator = "------------------------------"
[4854]30                       
31        # The directory to put things for a tape backup (should already exist)
32        BACKUP_DIR = '/disks/glue1/oaiBackup/'
[5414]33       
34        NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
[4854]35
36        def _setupCmdLineOptions(self):
37                '''
38                Determine the logging level to use and configure this appropriately
39                @return args: any input arguments - excluding options
40                '''
[5414]41               
[4854]42                # check for verbose option
[5414]43               
[4854]44                try:
[5600]45                        opts, args = getopt.getopt(sys.argv[1:], "vd")
[4854]46                except getopt.GetoptError, err:
47                    # print help information and exit:
48                    print str(err) # will print something like "option -a not recognized"
49                    sys.exit(2)
[5414]50                   
[4854]51                if len(args) < 1:
52                        self.usage()
53                   
54                loggingLevel = logging.WARNING
55                for o, a in opts:
56                    if o == "-v":
57                        print " - Verbose mode ON"
58                        loggingLevel = logging.INFO
59                    elif o == "-d":
60                        print " - Debug mode ON"
61                        loggingLevel = logging.DEBUG
[5414]62                   
[4854]63               
64                # set up any keywords on the object
65                # NB, be careful to keep the instance variables the same name as the keywords!
66                for arg in args:
[5414]67                       
[4854]68                        bits = arg.split('=')
69                        if len(bits) == 2:
70                                if bits[0] == 'ingestFromDate':
71                                        self.setIngestFromDate(bits[1])
72                                elif bits[0] == 'interval':
73                                        self.setPollInterval(bits[1])
[5414]74                                elif bits[0] == 'individualFile':
75                                        print " - Running in single file ingestion mode!"
76                                        self.setIndFileToIngest(bits[1])
[4854]77                                else:
78                                        setattr(self, bits[0], bits[1])
79               
80                print self.lineSeparator
[5600]81               
[4854]82                # NB, this is a slight fudge as cannot get the detailed logging to work
83                # without setting up a new logger here - which means we get two loggers
84                # outputing data. The initial call to logging needs to be tracked down
85                # and configured correctly, so this can be avoided...
[5243]86#               self.logger = logging.getLogger()
87#               self.logger.setLevel(loggingLevel)
[4854]88
89                # create console handler and set level to debug
[5243]90#               ch = logging.StreamHandler()
91#               ch.setLevel(loggingLevel)
[4854]92                # create formatter
[5243]93#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
[4854]94                # add formatter to ch
[5243]95#               ch.setFormatter(formatter)
[4854]96                # add ch to logger
[5243]97#               self.logger.addHandler(ch)
[4854]98                return args
99
100
101        def getID(self, filename):
102                '''
103                Gets the identifier out of an input metadata xml record.
104                @param filename - name of document file being processed
105                @return: ID - id to use to refer to the document
106                '''
107                logging.info("Retrieving identifier for metadata record " + filename)
108                xml=file(filename).read()
109                ID = idget(xml)
110                return ID
111       
112       
113        def addFileToPostgresDB(self, filename):
114                '''
115                Add a file to the postgres DB - extracting and storing all the required
116                data in the process
117                @param filename: full path of file to add to postgres DB
118                '''
119                logging.info("Adding file, " + filename + ", to postgres DB")
120               
[5414]121                numSlash = len(filename.split('/'))
122               
123                shortFilename = filename.split('/')[numSlash - 1]
124               
125                self.recordAttemptToIngestDiscoveryID = ""                             
126               
[4854]127                # first of all create a PostgresRecord - this object represents all the data required
128                # for a DB entry
129                dao = None
[5414]130               
[4854]131                try:
[5414]132                        #discoveryID = self.getID(filename)
[5167]133                       
[5414]134                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
135                                               
136                        discoveryID = basicParameters.datasetID                 
137                        datasetName = basicParameters.datasetName
138                        datacentreName = basicParameters.datacentreName
139                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
[5464]140                        datasetStartDateNom = basicParameters.datasetStartDateNom
[5524]141                        datasetEndDateNom = basicParameters.datasetEndDateNom
[5414]142                       
143                        #record whats attempting to be ingested
144                        self.recordAttemptToIngestDiscoveryID = discoveryID
145                       
146                                                                                                           
[4854]147                        record = PostgresRecord(filename, self._NDG_dataProvider, \
148                                                            self._datacentre_groups, self._datacentre_namespace, \
[5524]149                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom,datasetEndDateNom, \
[5414]150                                                            self._xq, self._datacentre_format)
[5737]151                        #import pdb
152                        #pdb.set_trace()
[5167]153                       
[4854]154                        # Now create the data access object to interface to the DB
155                        dao = PostgresDAO(record, pgClient = self.pgc)
[5414]156                       
[4854]157                        # Finally, write the new record
[5414]158                        # 0 if failed, 1 if update, 2 if create
159                        returnCode = dao.createOrUpdateRecord() 
160                        if returnCode == 2:
[4854]161                                self._no_files_ingested += 1
[5414]162                        elif returnCode == 1:
163                                self._no_files_changed += 1
164                        #if 0 nothing incremented
165                       
[4854]166                except:
[5414]167                       
168                        #if error encountered, add to failure lisr
169                        logging.error("Could not update: " + filename)                 
170                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
171                        self.updateFailList.append(originalRecordFilename)
172                       
[4854]173                        logging.error("Exception thrown - detail: ")
[5243]174                        errors = sys.exc_info()
175                        logging.error(errors)
[5252]176                        self._error_messages += "%s\n" %str(errors[1])
[4854]177                       
178                        if dao:
179                                logging.info("Removing record and its associated info from DB")
180                                logging.info("- to allow clean ingestion on rerun")
181                                try:
182                                        dao.deleteOriginalRecord()
183                                except:
184                                        logging.error("Problem encountered when removing record: ")
185                                        logging.error(sys.exc_info())
186                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
187
188                        self._no_problem_files += 1
[5414]189                       
[4854]190                        logging.info("Continue processing other files")
[5414]191                       
192                return self.recordAttemptToIngestDiscoveryID
193                       
[4854]194       
195        def getConfigDetails(self, datacentre):
196                '''
197                Get the harvested records directory and groups for this datacentre from the
198                datacentre specific config file.  The harvested records directory depends on the
199                datacentres OAI base url, the set and format. These have to be know up-front.
200                The groups denote which 'portal groups' they belong to - for limiting searches to
201                say NERC-only datacentres records.
202                Groups are added to the intermediate MOLES when it is created.
203                @param datacentre: datacentre to use when looking up config file
204                '''
205                # initialise the variables to retrieve from the config file
206                self._harvest_home = ""
207                self._datacentre_groups = ""
208                self._datacentre_format = ""
209                self._datacentre_namespace = ""
210                self._NDG_dataProvider = False
211
[5248]212                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
[5218]213                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
[5297]214               
[5523]215
[5252]216                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
[5523]217               
[5248]218                for line in file.split('\n'):
219                        words = line.split()
220                        if len(words) == 0:
221                                continue
222                        elif words[0] == 'host_path':
223                                self._harvest_home = words[1].rstrip()
224                        elif words[0] == 'groups':
225                                self._datacentre_groups = words[1:]
226                        elif words[0] == 'format':
227                                self._datacentre_format = words[1]
228                        elif words[0] == 'namespace':
229                                self._datacentre_namespace = words[1]
230                        elif words[0] == 'NDG_dataProvider':
231                                self._NDG_dataProvider = True
232                       
[4854]233                if self._harvest_home == "":
[5218]234                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
[4854]235               
236                logging.info("harvested records are in " + self._harvest_home)
237               
238                if self._datacentre_groups == "":
239                    logging.info("No groups/keywords set for datacentre " + datacentre)
240                else:
241                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
242               
243                if self._datacentre_format == "":
[5218]244                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
[4854]245               
246                logging.info("format being harvested: " + self._datacentre_format)
247               
248                if self._datacentre_namespace == "":
[5218]249                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
[4854]250               
251                logging.info("datacentre namespace: " + self._datacentre_namespace)
252               
253                if self._NDG_dataProvider:
254                        logging.info("Datacentre classified as an NDG data provider")
255                else:
256                        logging.info("Datacentre is not classified as an NDG data provider")
257                logging.info(self.lineSeparator)
258
259
260        def _convertIngestFiles(self, originals_dir, discovery_dir):
261                '''
262                Processes/renames the files (changed 08/01/07 to get id from inside file)
263                 - also replace any namespace declarations with a standard one which we know works in NDG
264                 NB, this copies files from the original dir to the discovery dir
265                 @param originals_dir: directory to convert files from
266                 @param discovery_dir: directory in which converted files will end up
267                 @return numfilesproc: counter of number of files processed
268                '''
269                numfilesproc = 0
[5414]270               
271                self.inputFileOrigFinal = {}
272               
[4854]273                logging.info(self.lineSeparator)
274                logging.info("Renaming files:")
[5414]275               
[4854]276                for filename in os.listdir(originals_dir):
[5737]277                       
[4854]278                        if not filename.endswith('.xml'):
279                                logging.warning('File %s is not xml format. Not processed'  %(filename))
280                                continue
281                       
282                        original_filename = originals_dir + filename
[5414]283                       
284                       
[5737]285                       
[5414]286                        #convert urls within original xml input file to NDG redirect URLS
287                       
288                        ''' This xquery method superceded by new python class in Utilities.py that uses
289                        elementtree to do the same job but with better handling of character sets...'''
290                        '''
291                        if self._datacentre_format == 'DIF':           
292                                xQueryType = 'transform_DIF_URL'
293                               
294                        elif self._datacentre_format == 'MDIP':
295                                xQueryType = 'transform_MDIP_URL'
296                               
297                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/'
298                       
299                        #filename_rd = filename.replace(".xml","_redirect.xml")
300                        #original_filename_urlRedirect = originals_dir + filename_rd
301                       
302                        ndgRedirectTransform(xQueryType,redirect_url,original_filename)
303                        '''
304                       
305                        #call new class in Utilities.py --will replace original file...
306                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
[5737]307                       
308               
309                                               
[5414]310                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
311                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
312                                   
[4854]313                        try:
[5414]314                                #ident=self.getID(original_filename)
315                                ident=basicParameters.datasetID
316                                logging.info("Dataset ID is " + ident)
317                                       
[4854]318                        except Exception, detail:
319                                logging.error("Could not retrieve ID from file, %s" %filename)
320                                logging.error("Detail: %s" %detail)
321                                logging.info("Continue with next file")
322                                continue
323                       
324                        if self._NDG_dataProvider:
[5414]325                               
[5737]326                                #new_filename = discovery_dir + ident.replace(":", "__")+".xml"
327                                #new_filename_short = ident.replace(":", "__")+".xml"
328                               
329                                #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
330                                new_filename = discovery_dir + self._datacentre_namespace + "__" +self._datacentre_format + "__"+ ident.replace(":", "-")+".xml"
331                                new_filename_short = self._datacentre_namespace + "__" +self._datacentre_format + "__" + ident.replace(":", "-")+".xml"
332                               
[4854]333                        else:
[5737]334                               
[4854]335                                ident = ident.replace(":", "-")
336                                ident = ident.replace("/", "-")
[5737]337                               
[4854]338                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
[5414]339                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
340                               
341                               
342                        logging.info("original file = " + original_filename)
343                        logging.info("newfile = " + new_filename)
344                               
345                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
346                        #this links a derived filename in processing dir with original filename
347                        #get basic filename from the path+filename passed to this function to use as key               
348                        self.inputFileOrigFinal[new_filename_short]=filename
[4854]349                       
350                        # now correct any namespace issues
351                        try:
352                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
353                        except Exception, detail:
354                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
355                                logging.error("Detail: %s" %detail)
356                                logging.info("Continue with next file")
357                                continue
358                        numfilesproc += 1
359               
360                logging.info("File renaming and converting completed")
361                logging.info(self.lineSeparator)
[5737]362               
363                #sys.exit()
364               
[4854]365                return numfilesproc
366
367               
368        def _getPostgresDBConnection(self):
369                '''
370                Get the default postgres DB connection - by reading in data from the db config file
371                '''
[5414]372               
[4854]373                logging.debug("Setting up connection to postgres DB")
[5542]374                self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
[5414]375                logging.info("Postgres DB connection now set up")
[4854]376
377
[5414]378
[4854]379        def     _backupAndCleanData(self):
380                '''
381                Back up ingested data for specified data centre, then clean any of
382                the ingest dirs
383                '''
384                logging.info("Backing up ingested data")
385                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
386                  strftime("%y%m%d_%H%M") + "_originals/"
387               
[5167]388                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
[4854]389                logging.info("Data backed up - now clearing ingest directories")
390                #Clear out the original harvest records area and discovery dir
[5414]391                #FileUtilities.cleanDir(self.originals_dir)
[5542]392                #FileUtilities.cleanDir(self.discovery_dir)
[4854]393                logging.info("Ingest directories cleared")
394
395
396        def     _setupDataCentreDirs(self):
397                '''
398                Set up directories appropriate for the current data centre
399                '''
400                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
401                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
402               
403                # the following dirs define where the specific documents should go
404                self.originals_dir = data_dir + "/oai/originals/"
405                self.discovery_dir = data_dir + "/discovery/"
406               
407                # Create/clear the 'in' and 'out' directories
408                FileUtilities.setUpDir(self.originals_dir)
409                FileUtilities.setUpDir(self.discovery_dir)
410               
411                logging.info("Ingest directories for data centre set up")
412
413
[5600]414        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed):
[4854]415                '''
416                Convert files from originals dir to discovery one, then
417                ingest, backup and clean
418                @param originals_dir: directory to ingest docs from
419                @param discovery_dir: directory to use to process ingested docs
[5600]420                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
[4854]421                '''
[5414]422               
[4854]423                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
424               
425                filenames = os.listdir(self.discovery_dir)
[5414]426               
427                #generate a list of files ALREADY in database so can compare with what has been ingested
428                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
[5600]429               
430               
[5414]431                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
432                filesPresentList=[]
433                if filePresentListArr:
434                        for fileArr in filePresentListArr:
435                                filesPresentList.append(fileArr[0])
[5537]436                               
437                                #TODO - is above relevant - duplicating functionlaity?
[5414]438       
439                #create list to to hold files ingest failed on.
440                self.updateFailList = []
441                self.deletedFailList = []
442               
[5537]443               
[4854]444                for filename in filenames:
445                        fullPath = self.discovery_dir + filename
[5414]446                       
[4854]447                        if os.path.isfile(fullPath):
[5414]448                                thisIngestedID = self.addFileToPostgresDB(fullPath)
449                               
450                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
451                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
[5537]452                                        if thisIngestedID in filesPresentList: 
453                                                filesPresentList.remove(thisIngestedID)                 
[5414]454                                               
455                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
456                #will need to be removed.
[5537]457               
[5600]458                #only do this if not in single file mode (or else wverything else gets deleted!)               
459                if ((self.indFileToIngest == "") & (feed)):
[5537]460                        for item in filesPresentList:
461                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
[5737]462                                DeleteRecord(item)
[5537]463                                self.deletedFailList.append(item)
464                                self._no_files_deleted += 1
[5414]465                       
466                       
467                self._backupAndCleanData()
[4854]468               
[5414]469                #at this stage put the reporting code in (will work for oai or atom feed)
470                #create a summary file for each data centre ingest
471                data_dir = self._base_dir + "data/"
472                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
473                recOpFile = open(recOpFileName,'w')
474               
475                logging.info("oai_document_ingest processing complete:")
476               
[5542]477                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
478                message = "Ingest report for data centre: " + datacentre + "\n"
479                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
480                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
481                message = message + "TOTAL PROCESSED " + str(numfilesproc) + "\n"
[5600]482                message = message + "INGESTED (Created) " + str(self._no_files_changed)  + "\n"
483                message = message + "INGESTED (Updated) " + str(self._no_files_ingested)  + "\n"
[5542]484                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
485                message = message + "PROBLEM FILES " + str(self._no_problem_files)  + "\n"
486               
487                '''recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
[5414]488                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
489                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
490                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
491                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
492                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
493                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
[5542]494                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")'''
[5414]495               
[5542]496               
[5414]497                for badFile in self.updateFailList:
[5542]498                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
499                        message = message +"PROBLEM_FILE " + badFile + "\n"
500                         
501                recOpFile.write(message)
502                               
503                return numfilesproc, message
[4854]504
505
506
507        def _setupXQueries(self):
508                '''
509                Set up the required XQueries
510                - NB, extract the xquery libraries locally for easy reference
511                '''
[4948]512                self._xq = ndgResources()
[4854]513                for libFile in self._xq.xqlib:
[4948]514                        # NB, we don't want the full path to the files - just the filename
515                        fileName = libFile.split('/')[-1]
516                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.