source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 6187

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@6187
Revision 6187, 21.9 KB checked in by sdonegan, 11 years ago (diff)

Added ability to take processing info from a config file (not in svn for obvious reasons!). - debug

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22
23class AbstractDocumentIngester(object):
24        '''
25        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
26        - including running the various transforms and parsings to get all doc types and spatiotemporal
27        data in the correct form in the DB
28        '''
29        lineSeparator = "------------------------------"
30                       
31        # The directory to put things for a tape backup (should already exist)
32        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
33       
34        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
35       
36        #Use PRODUCTION NDG redirect service on triton now (20/11/09)
37        NDG_redirect_URL = 'http://triton.badc.rl.ac.uk/NDGredirection/ndgURLredirect/redirect?url='
38
39        def _setupCmdLineOptions(self):
40                '''
41                Determine the logging level to use and configure this appropriately
42                @return args: any input arguments - excluding options
43                '''
44               
45                # check for verbose option
46               
47                try:
48                        opts, args = getopt.getopt(sys.argv[1:], "vd")
49                except getopt.GetoptError, err:
50                    # print help information and exit:
51                    print str(err) # will print something like "option -a not recognized"
52                    sys.exit(2)
53                   
54                if len(args) < 1:
55                        self.usage()
56                   
57                loggingLevel = logging.WARNING
58                for o, a in opts:
59                    if o == "-v":
60                        print " - Verbose mode ON"
61                        loggingLevel = logging.INFO
62                    elif o == "-d":
63                        print " - Debug mode ON"
64                        loggingLevel = logging.DEBUG
65                   
66               
67                # set up any keywords on the object
68                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
69                for arg in args:
70                       
71                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll']
72                       
73                       
74                       
75                        bits = arg.split('=')
76                        if len(bits) == 2:
77                                if bits[0] == keywordArgs[0]:
78                                        self.setIngestFromDate(bits[1])
79                                elif bits[0] == keywordArgs[1]:
80                                        self.setPollInterval(bits[1])
81                                elif bits[0] == keywordArgs[2]:
82                                        print " - Running in single file ingestion mode!"
83                                        self.setIndFileToIngest(bits[1])
84                                elif bits[0] not in keywordArgs:
85                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
86                                        sys.exit(2)
87                                else:
88                                        setattr(self, bits[0], bits[1])
89                       
90                print self.lineSeparator
91               
92                # NB, this is a slight fudge as cannot get the detailed logging to work
93                # without setting up a new logger here - which means we get two loggers
94                # outputing data. The initial call to logging needs to be tracked down
95                # and configured correctly, so this can be avoided...
96#               self.logger = logging.getLogger()
97#               self.logger.setLevel(loggingLevel)
98
99                # create console handler and set level to debug
100#               ch = logging.StreamHandler()
101#               ch.setLevel(loggingLevel)
102                # create formatter
103#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
104                # add formatter to ch
105#               ch.setFormatter(formatter)
106                # add ch to logger
107#               self.logger.addHandler(ch)
108                return args
109
110
111        def getID(self, filename):
112                '''
113                Gets the identifier out of an input metadata xml record.
114                @param filename - name of document file being processed
115                @return: ID - id to use to refer to the document
116                '''
117                logging.info("Retrieving identifier for metadata record " + filename)
118                xml=file(filename).read()
119                ID = idget(xml)
120                return ID
121       
122       
123        def addFileToPostgresDB(self, filename):
124                '''
125                Add a file to the postgres DB - extracting and storing all the required
126                data in the process
127                @param filename: full path of file to add to postgres DB
128                '''
129                logging.info("Adding file, " + filename + ", to postgres DB")
130               
131                numSlash = len(filename.split('/'))
132               
133                shortFilename = filename.split('/')[numSlash - 1]
134               
135                self.recordAttemptToIngestDiscoveryID = ""                             
136               
137                # first of all create a PostgresRecord - this object represents all the data required
138                # for a DB entry
139                dao = None
140               
141                try:
142                        #discoveryID = self.getID(filename)
143                       
144                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
145                                               
146                        discoveryID = basicParameters.datasetID                 
147                        datasetName = basicParameters.datasetName
148                        datacentreName = basicParameters.datacentreName
149                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
150                        datasetStartDateNom = basicParameters.datasetStartDateNom
151                        datasetEndDateNom = basicParameters.datasetEndDateNom
152                       
153                        #record whats attempting to be ingested
154                        self.recordAttemptToIngestDiscoveryID = discoveryID
155                       
156                                                                                                           
157                        record = PostgresRecord(filename, self._NDG_dataProvider, \
158                                                            self._datacentre_groups, self._datacentre_namespace, \
159                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom,datasetEndDateNom, \
160                                                            self._xq, self._datacentre_format,self._base_dir,self._code_dir)
161                        #import pdb
162                        #pdb.set_trace()
163                       
164                        # Now create the data access object to interface to the DB
165                        dao = PostgresDAO(record, pgClient = self.pgc)
166                       
167                        # Finally, write the new record
168                        # 0 if failed, 1 if update, 2 if create
169                        returnCode = dao.createOrUpdateRecord() 
170                        if returnCode == 2:
171                                self._no_files_ingested += 1
172                        elif returnCode == 1:
173                                self._no_files_changed += 1
174                        #if 0 nothing incremented
175                       
176                except:
177                       
178                        #if error encountered, add to failure lisr
179                        logging.error("Could not update: " + filename)                 
180                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
181                        self.updateFailList.append(originalRecordFilename)
182                       
183                        logging.error("Exception thrown - detail: ")
184                        errors = sys.exc_info()
185                        logging.error(errors)
186                        self._error_messages += "%s\n" %str(errors[1])
187                       
188                        if dao:
189                                logging.info("Removing record and its associated info from DB")
190                                logging.info("- to allow clean ingestion on rerun")
191                                try:
192                                        dao.deleteOriginalRecord()
193                                except:
194                                        logging.error("Problem encountered when removing record: ")
195                                        logging.error(sys.exc_info())
196                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
197
198                        self._no_problem_files += 1
199                       
200                        logging.info("Continue processing other files")
201                       
202                return self.recordAttemptToIngestDiscoveryID
203                       
204       
205        def getProcessingConfig(self,configFilePath):
206               
207                '''
208                Fed up with messing about with hardcoded directory paths. 
209                This method to get relevant values out of the oai_document_ingester.config file
210               
211                Returns a dictionary of values with keys:
212               
213                #directory in which the code resides
214                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
215
216                #base directory in which metadata is extracted to and converted to
217                base_directory /home/badc/discovery_docs/ingestDocs/
218
219                #directory in which to write reports to
220                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
221
222                #path to the passwords file
223                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
224
225                #datacentre config file directory path
226                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
227                '''
228               
229                # Check this file exists; if not, assume an invalid datacentre has been specified
230                if not os.path.isfile(configFilePath):
231                    sys.exit("ERROR: Could not find the processing config file")
232                   
233                processingConfig = {}
234                   
235                processing_config_file = open(configFilePath, "r")
236               
237                for line in processing_config_file.readlines():
238                        words  = string.split(line)
239                        if len(words) == 0:
240                                continue
241                        elif words[0] == 'code_directory':
242                                processingConfig['code_directory'] = words[1]                                                           
243                        elif words[0] == 'base_directory':
244                                processingConfig['base_directory'] = words[1]
245                        elif words[0] == 'reporting_directory':
246                                processingConfig['reporting_directory'] = words[1]
247                        elif words[0] == 'passwords_file':
248                                processingConfig['passwords_file'] = words[1]
249                        elif words[0] == 'datcentre_configs':
250                                processingConfig['datcentre_configs'] = words[1]                               
251                       
252                return processingConfig
253       
254       
255        def getConfigDetails(self, datacentre):
256                '''
257                Get the harvested records directory and groups for this datacentre from the
258                datacentre specific config file.  The harvested records directory depends on the
259                datacentres OAI base url, the set and format. These have to be know up-front.
260                The groups denote which 'portal groups' they belong to - for limiting searches to
261                say NERC-only datacentres records.
262                Groups are added to the intermediate MOLES when it is created.
263                @param datacentre: datacentre to use when looking up config file
264                '''
265                # initialise the variables to retrieve from the config file
266                self._harvest_home = ""
267                self._datacentre_groups = ""
268                self._datacentre_format = ""
269                self._datacentre_namespace = ""
270                self._NDG_dataProvider = False
271
272                #note changed to production buildout path
273                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
274               
275                #fed up with this rubbish causing problems - use a normal file opener.
276                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
277               
278                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
279                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
280               
281                # Check this file exists; if not, assume an invalid datacentre has been specified
282                if not os.path.isfile(self._datacentre_config_filename):
283                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
284                        "specified (%s) is invalid\n" %datacentre)
285                   
286               
287                datacentre_config_file = open(self._datacentre_config_filename, "r")
288               
289                for line in datacentre_config_file.readlines():
290                        words  = string.split(line)
291                        if len(words) == 0:
292                                continue
293                        elif words[0] == 'host_path':
294                                self._harvest_home = words[1].rstrip()
295                        elif words[0] == 'groups':
296                                self._datacentre_groups = words[1:]
297                        elif words[0] == 'format':
298                                self._datacentre_format = words[1]
299                        elif words[0] == 'namespace':
300                                self._datacentre_namespace = words[1]
301                        elif words[0] == 'NDG_dataProvider':
302                                self._NDG_dataProvider = True
303               
304                datacentre_config_file.close()
305                       
306                if self._harvest_home == "":
307                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
308               
309                logging.info("harvested records are in " + self._harvest_home)
310               
311                if self._datacentre_groups == "":
312                    logging.info("No groups/keywords set for datacentre " + datacentre)
313                else:
314                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
315               
316                if self._datacentre_format == "":
317                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
318               
319                logging.info("format being harvested: " + self._datacentre_format)
320               
321                if self._datacentre_namespace == "":
322                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
323               
324                logging.info("datacentre namespace: " + self._datacentre_namespace)
325               
326                if self._NDG_dataProvider:
327                        logging.info("Datacentre classified as an NDG data provider")
328                else:
329                        logging.info("Datacentre is not classified as an NDG data provider")
330                logging.info(self.lineSeparator)
331
332
333        def _convertIngestFiles(self, originals_dir, discovery_dir):
334                '''
335                Processes/renames the files (changed 08/01/07 to get id from inside file)
336                 - also replace any namespace declarations with a standard one which we know works in NDG
337                 NB, this copies files from the original dir to the discovery dir
338                 @param originals_dir: directory to convert files from
339                 @param discovery_dir: directory in which converted files will end up
340                 @return numfilesproc: counter of number of files processed
341                '''
342                numfilesproc = 0
343               
344                self.inputFileOrigFinal = {}
345               
346                logging.info(self.lineSeparator)
347                logging.info("Renaming files:")
348               
349                for filename in os.listdir(originals_dir):
350                       
351                        if not filename.endswith('.xml'):
352                                logging.warning('File %s is not xml format. Not processed'  %(filename))
353                                continue
354                       
355                        original_filename = originals_dir + filename
356                       
357                       
358                       
359                        #convert urls within original xml input file to NDG redirect URLS
360                       
361                        ''' This xquery method superceded by new python class in Utilities.py that uses
362                        elementtree to do the same job but with better handling of character sets...'''
363                        '''
364                        if self._datacentre_format == 'DIF':           
365                                xQueryType = 'transform_DIF_URL'
366                               
367                        elif self._datacentre_format == 'MDIP':
368                                xQueryType = 'transform_MDIP_URL'
369                               
370                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/'
371                       
372                        #filename_rd = filename.replace(".xml","_redirect.xml")
373                        #original_filename_urlRedirect = originals_dir + filename_rd
374                       
375                        ndgRedirectTransform(xQueryType,redirect_url,original_filename)
376                        '''
377                       
378                        #call new class in Utilities.py --will replace original file...
379                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
380                       
381               
382                                               
383                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
384                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
385                                   
386                        try:
387                                #ident=self.getID(original_filename)
388                                ident=basicParameters.datasetID
389                                logging.info("Dataset ID is " + ident)
390                                       
391                        except Exception, detail:
392                                logging.error("Could not retrieve ID from file, %s" %filename)
393                                logging.error("Detail: %s" %detail)
394                                logging.info("Continue with next file")
395                                continue
396                       
397                        if self._NDG_dataProvider:
398                               
399                                #new_filename = discovery_dir + ident.replace(":", "__")+".xml"
400                                #new_filename_short = ident.replace(":", "__")+".xml"
401                               
402                                #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
403                                new_filename = discovery_dir + self._datacentre_namespace + "__" +self._datacentre_format + "__"+ ident.replace(":", "-")+".xml"
404                                new_filename_short = self._datacentre_namespace + "__" +self._datacentre_format + "__" + ident.replace(":", "-")+".xml"
405                               
406                        else:
407                               
408                                ident = ident.replace(":", "-")
409                                ident = ident.replace("/", "-")
410                               
411                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
412                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
413                               
414                               
415                        logging.info("original file = " + original_filename)
416                        logging.info("newfile = " + new_filename)
417                               
418                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
419                        #this links a derived filename in processing dir with original filename
420                        #get basic filename from the path+filename passed to this function to use as key               
421                        self.inputFileOrigFinal[new_filename_short]=filename
422                       
423                        # now correct any namespace issues
424                        try:
425                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
426                        except Exception, detail:
427                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
428                                logging.error("Detail: %s" %detail)
429                                logging.info("Continue with next file")
430                                continue
431                        numfilesproc += 1
432               
433                logging.info("File renaming and converting completed")
434                logging.info(self.lineSeparator)
435               
436                #sys.exit()
437               
438                return numfilesproc
439
440               
441        def _getPostgresDBConnection(self):
442                '''
443                Get the default postgres DB connection - by reading in data from the db config file
444                '''
445               
446                logging.debug("Setting up connection to postgres DB")
447                self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
448                logging.info("Postgres DB connection now set up")
449
450
451
452        def     _backupAndCleanData(self):
453                '''
454                Back up ingested data for specified data centre, then clean any of
455                the ingest dirs
456                '''
457                logging.info("Backing up ingested data")
458                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
459                  strftime("%y%m%d_%H%M") + "_originals/"
460               
461                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
462                logging.info("Data backed up - now clearing ingest directories")
463                #Clear out the original harvest records area and discovery dir
464                #FileUtilities.cleanDir(self.originals_dir)
465                #FileUtilities.cleanDir(self.discovery_dir)
466                logging.info("Ingest directories cleared")
467
468
469        def     _setupDataCentreDirs(self):
470                '''
471                Set up directories appropriate for the current data centre
472                '''
473                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
474                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
475               
476                # the following dirs define where the specific documents should go
477                self.originals_dir = data_dir + "/oai/originals/"
478                self.discovery_dir = data_dir + "/discovery/"
479               
480                # Create/clear the 'in' and 'out' directories
481                FileUtilities.setUpDir(self.originals_dir)
482                FileUtilities.setUpDir(self.discovery_dir)
483               
484                logging.info("Ingest directories for data centre set up")
485
486
487        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed):
488                '''
489                Convert files from originals dir to discovery one, then
490                ingest, backup and clean
491                @param originals_dir: directory to ingest docs from
492                @param discovery_dir: directory to use to process ingested docs
493                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
494                '''
495               
496                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
497               
498                filenames = os.listdir(self.discovery_dir)
499               
500                #generate a list of files ALREADY in database so can compare with what has been ingested
501                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
502               
503               
504                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
505                filesPresentList=[]
506                if filePresentListArr:
507                        for fileArr in filePresentListArr:
508                                filesPresentList.append(fileArr[0])
509                               
510                                #TODO - is above relevant - duplicating functionlaity?
511       
512                #create list to to hold files ingest failed on.
513                self.updateFailList = []
514                self.deletedFailList = []
515               
516               
517                for filename in filenames:
518                        fullPath = self.discovery_dir + filename
519                       
520                        if os.path.isfile(fullPath):
521                                thisIngestedID = self.addFileToPostgresDB(fullPath)
522                               
523                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
524                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
525                                        if thisIngestedID in filesPresentList: 
526                                                filesPresentList.remove(thisIngestedID)                 
527                                               
528                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
529                #will need to be removed.
530               
531                #only do this if not in single file mode (or else wverything else gets deleted!)               
532                if ((self.indFileToIngest == "") & (feed)):
533                        for item in filesPresentList:
534                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
535                                DeleteRecord(item)
536                                self.deletedFailList.append(item)
537                                self._no_files_deleted += 1
538                       
539                       
540                self._backupAndCleanData()
541               
542                #at this stage put the reporting code in (will work for oai or atom feed)
543                #create a summary file for each data centre ingest
544                data_dir = self._base_dir + "data/"
545                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
546                recOpFile = open(recOpFileName,'w')
547               
548                logging.info("oai_document_ingest processing complete:")
549               
550                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
551                message = "Ingest report for data centre: " + datacentre + "\n"
552                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
553                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
554                message = message + "TOTAL_PROCESSED " + str(numfilesproc) + "\n"
555                message = message + "INGESTED_Created " + str(self._no_files_changed)  + "\n"
556                message = message + "INGESTED_Updated " + str(self._no_files_ingested)  + "\n"
557                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
558                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
559               
560                '''recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
561                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
562                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
563                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
564                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
565                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
566                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
567                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")'''
568               
569               
570                for badFile in self.updateFailList:
571                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
572                        message = message +"PROBLEM_FILE " + badFile + "\n"
573                         
574                recOpFile.write(message)
575                               
576                return numfilesproc, message
577
578
579
580        def _setupXQueries(self):
581                '''
582                Set up the required XQueries
583                - NB, extract the xquery libraries locally for easy reference
584                '''
585                self._xq = ndgResources()
586                for libFile in self._xq.xqlib:
587                        # NB, we don't want the full path to the files - just the filename
588                        fileName = libFile.split('/')[-1]
589                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.