source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 6130

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@6130
Revision 6130, 19.5 KB checked in by sdonegan, 10 years ago (diff)

Updated handling of directories - put in absolute production paths in badc user buildout

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22
23class AbstractDocumentIngester(object):
24        '''
25        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
26        - including running the various transforms and parsings to get all doc types and spatiotemporal
27        data in the correct form in the DB
28        '''
29        lineSeparator = "------------------------------"
30                       
31        # The directory to put things for a tape backup (should already exist)
32        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
33       
34        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
35       
36        #Use PRODUCTION NDG redirect service on triton now (20/11/09)
37        NDG_redirect_URL = 'http://triton.badc.rl.ac.uk/NDGredirection/ndgURLredirect/redirect?url='
38
39        def _setupCmdLineOptions(self):
40                '''
41                Determine the logging level to use and configure this appropriately
42                @return args: any input arguments - excluding options
43                '''
44               
45                # check for verbose option
46               
47                try:
48                        opts, args = getopt.getopt(sys.argv[1:], "vd")
49                except getopt.GetoptError, err:
50                    # print help information and exit:
51                    print str(err) # will print something like "option -a not recognized"
52                    sys.exit(2)
53                   
54                if len(args) < 1:
55                        self.usage()
56                   
57                loggingLevel = logging.WARNING
58                for o, a in opts:
59                    if o == "-v":
60                        print " - Verbose mode ON"
61                        loggingLevel = logging.INFO
62                    elif o == "-d":
63                        print " - Debug mode ON"
64                        loggingLevel = logging.DEBUG
65                   
66               
67                # set up any keywords on the object
68                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
69                for arg in args:
70                       
71                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll']
72                       
73                       
74                       
75                        bits = arg.split('=')
76                        if len(bits) == 2:
77                                if bits[0] == keywordArgs[0]:
78                                        self.setIngestFromDate(bits[1])
79                                elif bits[0] == keywordArgs[1]:
80                                        self.setPollInterval(bits[1])
81                                elif bits[0] == keywordArgs[2]:
82                                        print " - Running in single file ingestion mode!"
83                                        self.setIndFileToIngest(bits[1])
84                                elif bits[0] not in keywordArgs:
85                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
86                                        sys.exit(2)
87                                else:
88                                        setattr(self, bits[0], bits[1])
89                       
90                print self.lineSeparator
91               
92                # NB, this is a slight fudge as cannot get the detailed logging to work
93                # without setting up a new logger here - which means we get two loggers
94                # outputing data. The initial call to logging needs to be tracked down
95                # and configured correctly, so this can be avoided...
96#               self.logger = logging.getLogger()
97#               self.logger.setLevel(loggingLevel)
98
99                # create console handler and set level to debug
100#               ch = logging.StreamHandler()
101#               ch.setLevel(loggingLevel)
102                # create formatter
103#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
104                # add formatter to ch
105#               ch.setFormatter(formatter)
106                # add ch to logger
107#               self.logger.addHandler(ch)
108                return args
109
110
111        def getID(self, filename):
112                '''
113                Gets the identifier out of an input metadata xml record.
114                @param filename - name of document file being processed
115                @return: ID - id to use to refer to the document
116                '''
117                logging.info("Retrieving identifier for metadata record " + filename)
118                xml=file(filename).read()
119                ID = idget(xml)
120                return ID
121       
122       
123        def addFileToPostgresDB(self, filename):
124                '''
125                Add a file to the postgres DB - extracting and storing all the required
126                data in the process
127                @param filename: full path of file to add to postgres DB
128                '''
129                logging.info("Adding file, " + filename + ", to postgres DB")
130               
131                numSlash = len(filename.split('/'))
132               
133                shortFilename = filename.split('/')[numSlash - 1]
134               
135                self.recordAttemptToIngestDiscoveryID = ""                             
136               
137                # first of all create a PostgresRecord - this object represents all the data required
138                # for a DB entry
139                dao = None
140               
141                try:
142                        #discoveryID = self.getID(filename)
143                       
144                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
145                                               
146                        discoveryID = basicParameters.datasetID                 
147                        datasetName = basicParameters.datasetName
148                        datacentreName = basicParameters.datacentreName
149                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
150                        datasetStartDateNom = basicParameters.datasetStartDateNom
151                        datasetEndDateNom = basicParameters.datasetEndDateNom
152                       
153                        #record whats attempting to be ingested
154                        self.recordAttemptToIngestDiscoveryID = discoveryID
155                       
156                                                                                                           
157                        record = PostgresRecord(filename, self._NDG_dataProvider, \
158                                                            self._datacentre_groups, self._datacentre_namespace, \
159                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom,datasetEndDateNom, \
160                                                            self._xq, self._datacentre_format)
161                        #import pdb
162                        #pdb.set_trace()
163                       
164                        # Now create the data access object to interface to the DB
165                        dao = PostgresDAO(record, pgClient = self.pgc)
166                       
167                        # Finally, write the new record
168                        # 0 if failed, 1 if update, 2 if create
169                        returnCode = dao.createOrUpdateRecord() 
170                        if returnCode == 2:
171                                self._no_files_ingested += 1
172                        elif returnCode == 1:
173                                self._no_files_changed += 1
174                        #if 0 nothing incremented
175                       
176                except:
177                       
178                        #if error encountered, add to failure lisr
179                        logging.error("Could not update: " + filename)                 
180                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
181                        self.updateFailList.append(originalRecordFilename)
182                       
183                        logging.error("Exception thrown - detail: ")
184                        errors = sys.exc_info()
185                        logging.error(errors)
186                        self._error_messages += "%s\n" %str(errors[1])
187                       
188                        if dao:
189                                logging.info("Removing record and its associated info from DB")
190                                logging.info("- to allow clean ingestion on rerun")
191                                try:
192                                        dao.deleteOriginalRecord()
193                                except:
194                                        logging.error("Problem encountered when removing record: ")
195                                        logging.error(sys.exc_info())
196                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
197
198                        self._no_problem_files += 1
199                       
200                        logging.info("Continue processing other files")
201                       
202                return self.recordAttemptToIngestDiscoveryID
203                       
204       
205        def getConfigDetails(self, datacentre):
206                '''
207                Get the harvested records directory and groups for this datacentre from the
208                datacentre specific config file.  The harvested records directory depends on the
209                datacentres OAI base url, the set and format. These have to be know up-front.
210                The groups denote which 'portal groups' they belong to - for limiting searches to
211                say NERC-only datacentres records.
212                Groups are added to the intermediate MOLES when it is created.
213                @param datacentre: datacentre to use when looking up config file
214                '''
215                # initialise the variables to retrieve from the config file
216                self._harvest_home = ""
217                self._datacentre_groups = ""
218                self._datacentre_format = ""
219                self._datacentre_namespace = ""
220                self._NDG_dataProvider = False
221
222                #note changed to production buildout path
223                datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
224                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
225               
226
227                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
228               
229                for line in file.split('\n'):
230                        words = line.split()
231                        if len(words) == 0:
232                                continue
233                        elif words[0] == 'host_path':
234                                self._harvest_home = words[1].rstrip()
235                        elif words[0] == 'groups':
236                                self._datacentre_groups = words[1:]
237                        elif words[0] == 'format':
238                                self._datacentre_format = words[1]
239                        elif words[0] == 'namespace':
240                                self._datacentre_namespace = words[1]
241                        elif words[0] == 'NDG_dataProvider':
242                                self._NDG_dataProvider = True
243                       
244                if self._harvest_home == "":
245                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
246               
247                logging.info("harvested records are in " + self._harvest_home)
248               
249                if self._datacentre_groups == "":
250                    logging.info("No groups/keywords set for datacentre " + datacentre)
251                else:
252                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
253               
254                if self._datacentre_format == "":
255                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
256               
257                logging.info("format being harvested: " + self._datacentre_format)
258               
259                if self._datacentre_namespace == "":
260                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
261               
262                logging.info("datacentre namespace: " + self._datacentre_namespace)
263               
264                if self._NDG_dataProvider:
265                        logging.info("Datacentre classified as an NDG data provider")
266                else:
267                        logging.info("Datacentre is not classified as an NDG data provider")
268                logging.info(self.lineSeparator)
269
270
271        def _convertIngestFiles(self, originals_dir, discovery_dir):
272                '''
273                Processes/renames the files (changed 08/01/07 to get id from inside file)
274                 - also replace any namespace declarations with a standard one which we know works in NDG
275                 NB, this copies files from the original dir to the discovery dir
276                 @param originals_dir: directory to convert files from
277                 @param discovery_dir: directory in which converted files will end up
278                 @return numfilesproc: counter of number of files processed
279                '''
280                numfilesproc = 0
281               
282                self.inputFileOrigFinal = {}
283               
284                logging.info(self.lineSeparator)
285                logging.info("Renaming files:")
286               
287                for filename in os.listdir(originals_dir):
288                       
289                        if not filename.endswith('.xml'):
290                                logging.warning('File %s is not xml format. Not processed'  %(filename))
291                                continue
292                       
293                        original_filename = originals_dir + filename
294                       
295                       
296                       
297                        #convert urls within original xml input file to NDG redirect URLS
298                       
299                        ''' This xquery method superceded by new python class in Utilities.py that uses
300                        elementtree to do the same job but with better handling of character sets...'''
301                        '''
302                        if self._datacentre_format == 'DIF':           
303                                xQueryType = 'transform_DIF_URL'
304                               
305                        elif self._datacentre_format == 'MDIP':
306                                xQueryType = 'transform_MDIP_URL'
307                               
308                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/'
309                       
310                        #filename_rd = filename.replace(".xml","_redirect.xml")
311                        #original_filename_urlRedirect = originals_dir + filename_rd
312                       
313                        ndgRedirectTransform(xQueryType,redirect_url,original_filename)
314                        '''
315                       
316                        #call new class in Utilities.py --will replace original file...
317                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
318                       
319               
320                                               
321                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
322                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
323                                   
324                        try:
325                                #ident=self.getID(original_filename)
326                                ident=basicParameters.datasetID
327                                logging.info("Dataset ID is " + ident)
328                                       
329                        except Exception, detail:
330                                logging.error("Could not retrieve ID from file, %s" %filename)
331                                logging.error("Detail: %s" %detail)
332                                logging.info("Continue with next file")
333                                continue
334                       
335                        if self._NDG_dataProvider:
336                               
337                                #new_filename = discovery_dir + ident.replace(":", "__")+".xml"
338                                #new_filename_short = ident.replace(":", "__")+".xml"
339                               
340                                #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
341                                new_filename = discovery_dir + self._datacentre_namespace + "__" +self._datacentre_format + "__"+ ident.replace(":", "-")+".xml"
342                                new_filename_short = self._datacentre_namespace + "__" +self._datacentre_format + "__" + ident.replace(":", "-")+".xml"
343                               
344                        else:
345                               
346                                ident = ident.replace(":", "-")
347                                ident = ident.replace("/", "-")
348                               
349                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
350                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
351                               
352                               
353                        logging.info("original file = " + original_filename)
354                        logging.info("newfile = " + new_filename)
355                               
356                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
357                        #this links a derived filename in processing dir with original filename
358                        #get basic filename from the path+filename passed to this function to use as key               
359                        self.inputFileOrigFinal[new_filename_short]=filename
360                       
361                        # now correct any namespace issues
362                        try:
363                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
364                        except Exception, detail:
365                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
366                                logging.error("Detail: %s" %detail)
367                                logging.info("Continue with next file")
368                                continue
369                        numfilesproc += 1
370               
371                logging.info("File renaming and converting completed")
372                logging.info(self.lineSeparator)
373               
374                #sys.exit()
375               
376                return numfilesproc
377
378               
379        def _getPostgresDBConnection(self):
380                '''
381                Get the default postgres DB connection - by reading in data from the db config file
382                '''
383               
384                logging.debug("Setting up connection to postgres DB")
385                self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
386                logging.info("Postgres DB connection now set up")
387
388
389
390        def     _backupAndCleanData(self):
391                '''
392                Back up ingested data for specified data centre, then clean any of
393                the ingest dirs
394                '''
395                logging.info("Backing up ingested data")
396                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
397                  strftime("%y%m%d_%H%M") + "_originals/"
398               
399                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
400                logging.info("Data backed up - now clearing ingest directories")
401                #Clear out the original harvest records area and discovery dir
402                #FileUtilities.cleanDir(self.originals_dir)
403                #FileUtilities.cleanDir(self.discovery_dir)
404                logging.info("Ingest directories cleared")
405
406
407        def     _setupDataCentreDirs(self):
408                '''
409                Set up directories appropriate for the current data centre
410                '''
411                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
412                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
413               
414                # the following dirs define where the specific documents should go
415                self.originals_dir = data_dir + "/oai/originals/"
416                self.discovery_dir = data_dir + "/discovery/"
417               
418                # Create/clear the 'in' and 'out' directories
419                FileUtilities.setUpDir(self.originals_dir)
420                FileUtilities.setUpDir(self.discovery_dir)
421               
422                logging.info("Ingest directories for data centre set up")
423
424
425        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed):
426                '''
427                Convert files from originals dir to discovery one, then
428                ingest, backup and clean
429                @param originals_dir: directory to ingest docs from
430                @param discovery_dir: directory to use to process ingested docs
431                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
432                '''
433               
434                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
435               
436                filenames = os.listdir(self.discovery_dir)
437               
438                #generate a list of files ALREADY in database so can compare with what has been ingested
439                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
440               
441               
442                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
443                filesPresentList=[]
444                if filePresentListArr:
445                        for fileArr in filePresentListArr:
446                                filesPresentList.append(fileArr[0])
447                               
448                                #TODO - is above relevant - duplicating functionlaity?
449       
450                #create list to to hold files ingest failed on.
451                self.updateFailList = []
452                self.deletedFailList = []
453               
454               
455                for filename in filenames:
456                        fullPath = self.discovery_dir + filename
457                       
458                        if os.path.isfile(fullPath):
459                                thisIngestedID = self.addFileToPostgresDB(fullPath)
460                               
461                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
462                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
463                                        if thisIngestedID in filesPresentList: 
464                                                filesPresentList.remove(thisIngestedID)                 
465                                               
466                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
467                #will need to be removed.
468               
469                #only do this if not in single file mode (or else wverything else gets deleted!)               
470                if ((self.indFileToIngest == "") & (feed)):
471                        for item in filesPresentList:
472                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
473                                DeleteRecord(item)
474                                self.deletedFailList.append(item)
475                                self._no_files_deleted += 1
476                       
477                       
478                self._backupAndCleanData()
479               
480                #at this stage put the reporting code in (will work for oai or atom feed)
481                #create a summary file for each data centre ingest
482                data_dir = self._base_dir + "data/"
483                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
484                recOpFile = open(recOpFileName,'w')
485               
486                logging.info("oai_document_ingest processing complete:")
487               
488                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
489                message = "Ingest report for data centre: " + datacentre + "\n"
490                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
491                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
492                message = message + "TOTAL_PROCESSED " + str(numfilesproc) + "\n"
493                message = message + "INGESTED_Created " + str(self._no_files_changed)  + "\n"
494                message = message + "INGESTED_Updated " + str(self._no_files_ingested)  + "\n"
495                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
496                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
497               
498                '''recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
499                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
500                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
501                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
502                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
503                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
504                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
505                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")'''
506               
507               
508                for badFile in self.updateFailList:
509                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
510                        message = message +"PROBLEM_FILE " + badFile + "\n"
511                         
512                recOpFile.write(message)
513                               
514                return numfilesproc, message
515
516
517
518        def _setupXQueries(self):
519                '''
520                Set up the required XQueries
521                - NB, extract the xquery libraries locally for easy reference
522                '''
523                self._xq = ndgResources()
524                for libFile in self._xq.xqlib:
525                        # NB, we don't want the full path to the files - just the filename
526                        fileName = libFile.split('/')[-1]
527                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.