source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5976

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5976
Revision 5976, 19.2 KB checked in by sdonegan, 10 years ago (diff)

updated keyword provision when calling method

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22
23class AbstractDocumentIngester(object):
24        '''
25        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
26        - including running the various transforms and parsings to get all doc types and spatiotemporal
27        data in the correct form in the DB
28        '''
29        lineSeparator = "------------------------------"
30                       
31        # The directory to put things for a tape backup (should already exist)
32        BACKUP_DIR = '/disks/glue1/oaiBackup/'
33       
34        NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
35
36        def _setupCmdLineOptions(self):
37                '''
38                Determine the logging level to use and configure this appropriately
39                @return args: any input arguments - excluding options
40                '''
41               
42                # check for verbose option
43               
44                try:
45                        opts, args = getopt.getopt(sys.argv[1:], "vd")
46                except getopt.GetoptError, err:
47                    # print help information and exit:
48                    print str(err) # will print something like "option -a not recognized"
49                    sys.exit(2)
50                   
51                if len(args) < 1:
52                        self.usage()
53                   
54                loggingLevel = logging.WARNING
55                for o, a in opts:
56                    if o == "-v":
57                        print " - Verbose mode ON"
58                        loggingLevel = logging.INFO
59                    elif o == "-d":
60                        print " - Debug mode ON"
61                        loggingLevel = logging.DEBUG
62                   
63               
64                # set up any keywords on the object
65                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
66                for arg in args:
67                       
68                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll']
69                       
70                       
71                       
72                        bits = arg.split('=')
73                        if len(bits) == 2:
74                                if bits[0] == keywordArgs[0]:
75                                        self.setIngestFromDate(bits[1])
76                                elif bits[0] == keywordArgs[1]:
77                                        self.setPollInterval(bits[1])
78                                elif bits[0] == keywordArgs[2]:
79                                        print " - Running in single file ingestion mode!"
80                                        self.setIndFileToIngest(bits[1])
81                                elif bits[0] not in keywordArgs:
82                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
83                                        sys.exit(2)
84                                else:
85                                        setattr(self, bits[0], bits[1])
86                       
87                print self.lineSeparator
88               
89                # NB, this is a slight fudge as cannot get the detailed logging to work
90                # without setting up a new logger here - which means we get two loggers
91                # outputing data. The initial call to logging needs to be tracked down
92                # and configured correctly, so this can be avoided...
93#               self.logger = logging.getLogger()
94#               self.logger.setLevel(loggingLevel)
95
96                # create console handler and set level to debug
97#               ch = logging.StreamHandler()
98#               ch.setLevel(loggingLevel)
99                # create formatter
100#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
101                # add formatter to ch
102#               ch.setFormatter(formatter)
103                # add ch to logger
104#               self.logger.addHandler(ch)
105                return args
106
107
108        def getID(self, filename):
109                '''
110                Gets the identifier out of an input metadata xml record.
111                @param filename - name of document file being processed
112                @return: ID - id to use to refer to the document
113                '''
114                logging.info("Retrieving identifier for metadata record " + filename)
115                xml=file(filename).read()
116                ID = idget(xml)
117                return ID
118       
119       
120        def addFileToPostgresDB(self, filename):
121                '''
122                Add a file to the postgres DB - extracting and storing all the required
123                data in the process
124                @param filename: full path of file to add to postgres DB
125                '''
126                logging.info("Adding file, " + filename + ", to postgres DB")
127               
128                numSlash = len(filename.split('/'))
129               
130                shortFilename = filename.split('/')[numSlash - 1]
131               
132                self.recordAttemptToIngestDiscoveryID = ""                             
133               
134                # first of all create a PostgresRecord - this object represents all the data required
135                # for a DB entry
136                dao = None
137               
138                try:
139                        #discoveryID = self.getID(filename)
140                       
141                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
142                                               
143                        discoveryID = basicParameters.datasetID                 
144                        datasetName = basicParameters.datasetName
145                        datacentreName = basicParameters.datacentreName
146                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
147                        datasetStartDateNom = basicParameters.datasetStartDateNom
148                        datasetEndDateNom = basicParameters.datasetEndDateNom
149                       
150                        #record whats attempting to be ingested
151                        self.recordAttemptToIngestDiscoveryID = discoveryID
152                       
153                                                                                                           
154                        record = PostgresRecord(filename, self._NDG_dataProvider, \
155                                                            self._datacentre_groups, self._datacentre_namespace, \
156                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom,datasetEndDateNom, \
157                                                            self._xq, self._datacentre_format)
158                        #import pdb
159                        #pdb.set_trace()
160                       
161                        # Now create the data access object to interface to the DB
162                        dao = PostgresDAO(record, pgClient = self.pgc)
163                       
164                        # Finally, write the new record
165                        # 0 if failed, 1 if update, 2 if create
166                        returnCode = dao.createOrUpdateRecord() 
167                        if returnCode == 2:
168                                self._no_files_ingested += 1
169                        elif returnCode == 1:
170                                self._no_files_changed += 1
171                        #if 0 nothing incremented
172                       
173                except:
174                       
175                        #if error encountered, add to failure lisr
176                        logging.error("Could not update: " + filename)                 
177                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
178                        self.updateFailList.append(originalRecordFilename)
179                       
180                        logging.error("Exception thrown - detail: ")
181                        errors = sys.exc_info()
182                        logging.error(errors)
183                        self._error_messages += "%s\n" %str(errors[1])
184                       
185                        if dao:
186                                logging.info("Removing record and its associated info from DB")
187                                logging.info("- to allow clean ingestion on rerun")
188                                try:
189                                        dao.deleteOriginalRecord()
190                                except:
191                                        logging.error("Problem encountered when removing record: ")
192                                        logging.error(sys.exc_info())
193                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
194
195                        self._no_problem_files += 1
196                       
197                        logging.info("Continue processing other files")
198                       
199                return self.recordAttemptToIngestDiscoveryID
200                       
201       
202        def getConfigDetails(self, datacentre):
203                '''
204                Get the harvested records directory and groups for this datacentre from the
205                datacentre specific config file.  The harvested records directory depends on the
206                datacentres OAI base url, the set and format. These have to be know up-front.
207                The groups denote which 'portal groups' they belong to - for limiting searches to
208                say NERC-only datacentres records.
209                Groups are added to the intermediate MOLES when it is created.
210                @param datacentre: datacentre to use when looking up config file
211                '''
212                # initialise the variables to retrieve from the config file
213                self._harvest_home = ""
214                self._datacentre_groups = ""
215                self._datacentre_format = ""
216                self._datacentre_namespace = ""
217                self._NDG_dataProvider = False
218
219                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
220                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
221               
222
223                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
224               
225                for line in file.split('\n'):
226                        words = line.split()
227                        if len(words) == 0:
228                                continue
229                        elif words[0] == 'host_path':
230                                self._harvest_home = words[1].rstrip()
231                        elif words[0] == 'groups':
232                                self._datacentre_groups = words[1:]
233                        elif words[0] == 'format':
234                                self._datacentre_format = words[1]
235                        elif words[0] == 'namespace':
236                                self._datacentre_namespace = words[1]
237                        elif words[0] == 'NDG_dataProvider':
238                                self._NDG_dataProvider = True
239                       
240                if self._harvest_home == "":
241                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
242               
243                logging.info("harvested records are in " + self._harvest_home)
244               
245                if self._datacentre_groups == "":
246                    logging.info("No groups/keywords set for datacentre " + datacentre)
247                else:
248                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
249               
250                if self._datacentre_format == "":
251                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
252               
253                logging.info("format being harvested: " + self._datacentre_format)
254               
255                if self._datacentre_namespace == "":
256                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
257               
258                logging.info("datacentre namespace: " + self._datacentre_namespace)
259               
260                if self._NDG_dataProvider:
261                        logging.info("Datacentre classified as an NDG data provider")
262                else:
263                        logging.info("Datacentre is not classified as an NDG data provider")
264                logging.info(self.lineSeparator)
265
266
267        def _convertIngestFiles(self, originals_dir, discovery_dir):
268                '''
269                Processes/renames the files (changed 08/01/07 to get id from inside file)
270                 - also replace any namespace declarations with a standard one which we know works in NDG
271                 NB, this copies files from the original dir to the discovery dir
272                 @param originals_dir: directory to convert files from
273                 @param discovery_dir: directory in which converted files will end up
274                 @return numfilesproc: counter of number of files processed
275                '''
276                numfilesproc = 0
277               
278                self.inputFileOrigFinal = {}
279               
280                logging.info(self.lineSeparator)
281                logging.info("Renaming files:")
282               
283                for filename in os.listdir(originals_dir):
284                       
285                        if not filename.endswith('.xml'):
286                                logging.warning('File %s is not xml format. Not processed'  %(filename))
287                                continue
288                       
289                        original_filename = originals_dir + filename
290                       
291                       
292                       
293                        #convert urls within original xml input file to NDG redirect URLS
294                       
295                        ''' This xquery method superceded by new python class in Utilities.py that uses
296                        elementtree to do the same job but with better handling of character sets...'''
297                        '''
298                        if self._datacentre_format == 'DIF':           
299                                xQueryType = 'transform_DIF_URL'
300                               
301                        elif self._datacentre_format == 'MDIP':
302                                xQueryType = 'transform_MDIP_URL'
303                               
304                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/'
305                       
306                        #filename_rd = filename.replace(".xml","_redirect.xml")
307                        #original_filename_urlRedirect = originals_dir + filename_rd
308                       
309                        ndgRedirectTransform(xQueryType,redirect_url,original_filename)
310                        '''
311                       
312                        #call new class in Utilities.py --will replace original file...
313                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
314                       
315               
316                                               
317                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
318                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
319                                   
320                        try:
321                                #ident=self.getID(original_filename)
322                                ident=basicParameters.datasetID
323                                logging.info("Dataset ID is " + ident)
324                                       
325                        except Exception, detail:
326                                logging.error("Could not retrieve ID from file, %s" %filename)
327                                logging.error("Detail: %s" %detail)
328                                logging.info("Continue with next file")
329                                continue
330                       
331                        if self._NDG_dataProvider:
332                               
333                                #new_filename = discovery_dir + ident.replace(":", "__")+".xml"
334                                #new_filename_short = ident.replace(":", "__")+".xml"
335                               
336                                #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
337                                new_filename = discovery_dir + self._datacentre_namespace + "__" +self._datacentre_format + "__"+ ident.replace(":", "-")+".xml"
338                                new_filename_short = self._datacentre_namespace + "__" +self._datacentre_format + "__" + ident.replace(":", "-")+".xml"
339                               
340                        else:
341                               
342                                ident = ident.replace(":", "-")
343                                ident = ident.replace("/", "-")
344                               
345                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
346                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
347                               
348                               
349                        logging.info("original file = " + original_filename)
350                        logging.info("newfile = " + new_filename)
351                               
352                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
353                        #this links a derived filename in processing dir with original filename
354                        #get basic filename from the path+filename passed to this function to use as key               
355                        self.inputFileOrigFinal[new_filename_short]=filename
356                       
357                        # now correct any namespace issues
358                        try:
359                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
360                        except Exception, detail:
361                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
362                                logging.error("Detail: %s" %detail)
363                                logging.info("Continue with next file")
364                                continue
365                        numfilesproc += 1
366               
367                logging.info("File renaming and converting completed")
368                logging.info(self.lineSeparator)
369               
370                #sys.exit()
371               
372                return numfilesproc
373
374               
375        def _getPostgresDBConnection(self):
376                '''
377                Get the default postgres DB connection - by reading in data from the db config file
378                '''
379               
380                logging.debug("Setting up connection to postgres DB")
381                self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
382                logging.info("Postgres DB connection now set up")
383
384
385
386        def     _backupAndCleanData(self):
387                '''
388                Back up ingested data for specified data centre, then clean any of
389                the ingest dirs
390                '''
391                logging.info("Backing up ingested data")
392                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
393                  strftime("%y%m%d_%H%M") + "_originals/"
394               
395                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
396                logging.info("Data backed up - now clearing ingest directories")
397                #Clear out the original harvest records area and discovery dir
398                #FileUtilities.cleanDir(self.originals_dir)
399                #FileUtilities.cleanDir(self.discovery_dir)
400                logging.info("Ingest directories cleared")
401
402
403        def     _setupDataCentreDirs(self):
404                '''
405                Set up directories appropriate for the current data centre
406                '''
407                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
408                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
409               
410                # the following dirs define where the specific documents should go
411                self.originals_dir = data_dir + "/oai/originals/"
412                self.discovery_dir = data_dir + "/discovery/"
413               
414                # Create/clear the 'in' and 'out' directories
415                FileUtilities.setUpDir(self.originals_dir)
416                FileUtilities.setUpDir(self.discovery_dir)
417               
418                logging.info("Ingest directories for data centre set up")
419
420
421        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed):
422                '''
423                Convert files from originals dir to discovery one, then
424                ingest, backup and clean
425                @param originals_dir: directory to ingest docs from
426                @param discovery_dir: directory to use to process ingested docs
427                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
428                '''
429               
430                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
431               
432                filenames = os.listdir(self.discovery_dir)
433               
434                #generate a list of files ALREADY in database so can compare with what has been ingested
435                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
436               
437               
438                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
439                filesPresentList=[]
440                if filePresentListArr:
441                        for fileArr in filePresentListArr:
442                                filesPresentList.append(fileArr[0])
443                               
444                                #TODO - is above relevant - duplicating functionlaity?
445       
446                #create list to to hold files ingest failed on.
447                self.updateFailList = []
448                self.deletedFailList = []
449               
450               
451                for filename in filenames:
452                        fullPath = self.discovery_dir + filename
453                       
454                        if os.path.isfile(fullPath):
455                                thisIngestedID = self.addFileToPostgresDB(fullPath)
456                               
457                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
458                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
459                                        if thisIngestedID in filesPresentList: 
460                                                filesPresentList.remove(thisIngestedID)                 
461                                               
462                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
463                #will need to be removed.
464               
465                #only do this if not in single file mode (or else wverything else gets deleted!)               
466                if ((self.indFileToIngest == "") & (feed)):
467                        for item in filesPresentList:
468                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
469                                DeleteRecord(item)
470                                self.deletedFailList.append(item)
471                                self._no_files_deleted += 1
472                       
473                       
474                self._backupAndCleanData()
475               
476                #at this stage put the reporting code in (will work for oai or atom feed)
477                #create a summary file for each data centre ingest
478                data_dir = self._base_dir + "data/"
479                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
480                recOpFile = open(recOpFileName,'w')
481               
482                logging.info("oai_document_ingest processing complete:")
483               
484                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
485                message = "Ingest report for data centre: " + datacentre + "\n"
486                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
487                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
488                message = message + "TOTAL PROCESSED " + str(numfilesproc) + "\n"
489                message = message + "INGESTED (Created) " + str(self._no_files_changed)  + "\n"
490                message = message + "INGESTED (Updated) " + str(self._no_files_ingested)  + "\n"
491                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
492                message = message + "PROBLEM FILES " + str(self._no_problem_files)  + "\n"
493               
494                '''recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
495                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
496                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
497                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
498                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
499                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
500                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
501                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")'''
502               
503               
504                for badFile in self.updateFailList:
505                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
506                        message = message +"PROBLEM_FILE " + badFile + "\n"
507                         
508                recOpFile.write(message)
509                               
510                return numfilesproc, message
511
512
513
514        def _setupXQueries(self):
515                '''
516                Set up the required XQueries
517                - NB, extract the xquery libraries locally for easy reference
518                '''
519                self._xq = ndgResources()
520                for libFile in self._xq.xqlib:
521                        # NB, we don't want the full path to the files - just the filename
522                        fileName = libFile.split('/')[-1]
523                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.