source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5686

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5686
Revision 5686, 18.5 KB checked in by sdonegan, 10 years ago (diff)

withdrawing explict DeleteRecord? activation - handled by postgresql functions anyway?

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22
23class AbstractDocumentIngester(object):
24        '''
25        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
26        - including running the various transforms and parsings to get all doc types and spatiotemporal
27        data in the correct form in the DB
28        '''
29        lineSeparator = "-----------------------------"
30                       
31        # The directory to put things for a tape backup (should already exist)
32        BACKUP_DIR = '/disks/glue1/oaiBackup/'
33       
34        NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
35
36        def _setupCmdLineOptions(self):
37                '''
38                Determine the logging level to use and configure this appropriately
39                @return args: any input arguments - excluding options
40                '''
41               
42                # check for verbose option
43               
44                try:
45                        opts, args = getopt.getopt(sys.argv[1:], "vd")
46                except getopt.GetoptError, err:
47                    # print help information and exit:
48                    print str(err) # will print something like "option -a not recognized"
49                    sys.exit(2)
50                   
51                if len(args) < 1:
52                        self.usage()
53                   
54                loggingLevel = logging.WARNING
55                for o, a in opts:
56                    if o == "-v":
57                        print " - Verbose mode ON"
58                        loggingLevel = logging.INFO
59                    elif o == "-d":
60                        print " - Debug mode ON"
61                        loggingLevel = logging.DEBUG
62                   
63               
64                # set up any keywords on the object
65                # NB, be careful to keep the instance variables the same name as the keywords!
66                for arg in args:
67                       
68                        bits = arg.split('=')
69                        if len(bits) == 2:
70                                if bits[0] == 'ingestFromDate':
71                                        self.setIngestFromDate(bits[1])
72                                elif bits[0] == 'interval':
73                                        self.setPollInterval(bits[1])
74                                elif bits[0] == 'individualFile':
75                                        print " - Running in single file ingestion mode!"
76                                        self.setIndFileToIngest(bits[1])
77                                else:
78                                        setattr(self, bits[0], bits[1])
79               
80                print self.lineSeparator
81               
82                # NB, this is a slight fudge as cannot get the detailed logging to work
83                # without setting up a new logger here - which means we get two loggers
84                # outputing data. The initial call to logging needs to be tracked down
85                # and configured correctly, so this can be avoided...
86#               self.logger = logging.getLogger()
87#               self.logger.setLevel(loggingLevel)
88
89                # create console handler and set level to debug
90#               ch = logging.StreamHandler()
91#               ch.setLevel(loggingLevel)
92                # create formatter
93#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
94                # add formatter to ch
95#               ch.setFormatter(formatter)
96                # add ch to logger
97#               self.logger.addHandler(ch)
98                return args
99
100
101        def getID(self, filename):
102                '''
103                Gets the identifier out of an input metadata xml record.
104                @param filename - name of document file being processed
105                @return: ID - id to use to refer to the document
106                '''
107                logging.info("Retrieving identifier for metadata record " + filename)
108                xml=file(filename).read()
109                ID = idget(xml)
110                return ID
111       
112       
113        def addFileToPostgresDB(self, filename):
114                '''
115                Add a file to the postgres DB - extracting and storing all the required
116                data in the process
117                @param filename: full path of file to add to postgres DB
118                '''
119                logging.info("Adding file, " + filename + ", to postgres DB")
120               
121                numSlash = len(filename.split('/'))
122               
123                shortFilename = filename.split('/')[numSlash - 1]
124               
125                self.recordAttemptToIngestDiscoveryID = ""                             
126               
127                # first of all create a PostgresRecord - this object represents all the data required
128                # for a DB entry
129                dao = None
130               
131                try:
132                        #discoveryID = self.getID(filename)
133                       
134                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
135                                               
136                        discoveryID = basicParameters.datasetID                 
137                        datasetName = basicParameters.datasetName
138                        datacentreName = basicParameters.datacentreName
139                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
140                        datasetStartDateNom = basicParameters.datasetStartDateNom
141                        datasetEndDateNom = basicParameters.datasetEndDateNom
142                       
143                        #record whats attempting to be ingested
144                        self.recordAttemptToIngestDiscoveryID = discoveryID
145                       
146                                                                                                           
147                        record = PostgresRecord(filename, self._NDG_dataProvider, \
148                                                            self._datacentre_groups, self._datacentre_namespace, \
149                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom,datasetEndDateNom, \
150                                                            self._xq, self._datacentre_format)
151                       
152                       
153                        # Now create the data access object to interface to the DB
154                        dao = PostgresDAO(record, pgClient = self.pgc)
155                       
156                        # Finally, write the new record
157                        # 0 if failed, 1 if update, 2 if create
158                        returnCode = dao.createOrUpdateRecord() 
159                        if returnCode == 2:
160                                self._no_files_ingested += 1
161                        elif returnCode == 1:
162                                self._no_files_changed += 1
163                        #if 0 nothing incremented
164                       
165                except:
166                       
167                        #if error encountered, add to failure lisr
168                        logging.error("Could not update: " + filename)                 
169                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
170                        self.updateFailList.append(originalRecordFilename)
171                       
172                        logging.error("Exception thrown - detail: ")
173                        errors = sys.exc_info()
174                        logging.error(errors)
175                        self._error_messages += "%s\n" %str(errors[1])
176                       
177                        if dao:
178                                logging.info("Removing record and its associated info from DB")
179                                logging.info("- to allow clean ingestion on rerun")
180                                try:
181                                        dao.deleteOriginalRecord()
182                                except:
183                                        logging.error("Problem encountered when removing record: ")
184                                        logging.error(sys.exc_info())
185                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
186
187                        self._no_problem_files += 1
188                       
189                        logging.info("Continue processing other files")
190                       
191                return self.recordAttemptToIngestDiscoveryID
192                       
193       
194        def getConfigDetails(self, datacentre):
195                '''
196                Get the harvested records directory and groups for this datacentre from the
197                datacentre specific config file.  The harvested records directory depends on the
198                datacentres OAI base url, the set and format. These have to be know up-front.
199                The groups denote which 'portal groups' they belong to - for limiting searches to
200                say NERC-only datacentres records.
201                Groups are added to the intermediate MOLES when it is created.
202                @param datacentre: datacentre to use when looking up config file
203                '''
204                # initialise the variables to retrieve from the config file
205                self._harvest_home = ""
206                self._datacentre_groups = ""
207                self._datacentre_format = ""
208                self._datacentre_namespace = ""
209                self._NDG_dataProvider = False
210
211                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
212                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
213               
214
215                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
216               
217                for line in file.split('\n'):
218                        words = line.split()
219                        if len(words) == 0:
220                                continue
221                        elif words[0] == 'host_path':
222                                self._harvest_home = words[1].rstrip()
223                        elif words[0] == 'groups':
224                                self._datacentre_groups = words[1:]
225                        elif words[0] == 'format':
226                                self._datacentre_format = words[1]
227                        elif words[0] == 'namespace':
228                                self._datacentre_namespace = words[1]
229                        elif words[0] == 'NDG_dataProvider':
230                                self._NDG_dataProvider = True
231                       
232                if self._harvest_home == "":
233                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
234               
235                logging.info("harvested records are in " + self._harvest_home)
236               
237                if self._datacentre_groups == "":
238                    logging.info("No groups/keywords set for datacentre " + datacentre)
239                else:
240                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
241               
242                if self._datacentre_format == "":
243                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
244               
245                logging.info("format being harvested: " + self._datacentre_format)
246               
247                if self._datacentre_namespace == "":
248                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
249               
250                logging.info("datacentre namespace: " + self._datacentre_namespace)
251               
252                if self._NDG_dataProvider:
253                        logging.info("Datacentre classified as an NDG data provider")
254                else:
255                        logging.info("Datacentre is not classified as an NDG data provider")
256                logging.info(self.lineSeparator)
257
258
259        def _convertIngestFiles(self, originals_dir, discovery_dir):
260                '''
261                Processes/renames the files (changed 08/01/07 to get id from inside file)
262                 - also replace any namespace declarations with a standard one which we know works in NDG
263                 NB, this copies files from the original dir to the discovery dir
264                 @param originals_dir: directory to convert files from
265                 @param discovery_dir: directory in which converted files will end up
266                 @return numfilesproc: counter of number of files processed
267                '''
268                numfilesproc = 0
269               
270                self.inputFileOrigFinal = {}
271               
272                logging.info(self.lineSeparator)
273                logging.info("Renaming files:")
274               
275                for filename in os.listdir(originals_dir):
276                        if not filename.endswith('.xml'):
277                                logging.warning('File %s is not xml format. Not processed'  %(filename))
278                                continue
279                       
280                        original_filename = originals_dir + filename
281                       
282                       
283                        #convert urls within original xml input file to NDG redirect URLS
284                       
285                        ''' This xquery method superceded by new python class in Utilities.py that uses
286                        elementtree to do the same job but with better handling of character sets...'''
287                        '''
288                        if self._datacentre_format == 'DIF':           
289                                xQueryType = 'transform_DIF_URL'
290                               
291                        elif self._datacentre_format == 'MDIP':
292                                xQueryType = 'transform_MDIP_URL'
293                               
294                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/'
295                       
296                        #filename_rd = filename.replace(".xml","_redirect.xml")
297                        #original_filename_urlRedirect = originals_dir + filename_rd
298                       
299                        ndgRedirectTransform(xQueryType,redirect_url,original_filename)
300                        '''
301                       
302                        #call new class in Utilities.py --will replace original file...
303                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
304                       
305               
306                                               
307                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
308                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
309                                   
310                        try:
311                                #ident=self.getID(original_filename)
312                                ident=basicParameters.datasetID
313                                logging.info("Dataset ID is " + ident)
314                                       
315                        except Exception, detail:
316                                logging.error("Could not retrieve ID from file, %s" %filename)
317                                logging.error("Detail: %s" %detail)
318                                logging.info("Continue with next file")
319                                continue
320                       
321                        if self._NDG_dataProvider:
322                                new_filename = discovery_dir + ident.replace(":", "__")+".xml"
323                                new_filename_short = ident.replace(":", "__")+".xml"
324                               
325                        else:
326                                ident = ident.replace(":", "-")
327                                ident = ident.replace("/", "-")
328                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
329                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
330                               
331                               
332                        logging.info("original file = " + original_filename)
333                        logging.info("newfile = " + new_filename)
334                               
335                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
336                        #this links a derived filename in processing dir with original filename
337                        #get basic filename from the path+filename passed to this function to use as key               
338                        self.inputFileOrigFinal[new_filename_short]=filename
339                       
340                        # now correct any namespace issues
341                        try:
342                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
343                        except Exception, detail:
344                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
345                                logging.error("Detail: %s" %detail)
346                                logging.info("Continue with next file")
347                                continue
348                        numfilesproc += 1
349               
350                logging.info("File renaming and converting completed")
351                logging.info(self.lineSeparator)
352                return numfilesproc
353
354               
355        def _getPostgresDBConnection(self):
356                '''
357                Get the default postgres DB connection - by reading in data from the db config file
358                '''
359               
360                logging.debug("Setting up connection to postgres DB")
361                self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
362                logging.info("Postgres DB connection now set up")
363
364
365
366        def     _backupAndCleanData(self):
367                '''
368                Back up ingested data for specified data centre, then clean any of
369                the ingest dirs
370                '''
371                logging.info("Backing up ingested data")
372                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
373                  strftime("%y%m%d_%H%M") + "_originals/"
374               
375                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
376                logging.info("Data backed up - now clearing ingest directories")
377                #Clear out the original harvest records area and discovery dir
378                #FileUtilities.cleanDir(self.originals_dir)
379                #FileUtilities.cleanDir(self.discovery_dir)
380                logging.info("Ingest directories cleared")
381
382
383        def     _setupDataCentreDirs(self):
384                '''
385                Set up directories appropriate for the current data centre
386                '''
387                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
388                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
389               
390                # the following dirs define where the specific documents should go
391                self.originals_dir = data_dir + "/oai/originals/"
392                self.discovery_dir = data_dir + "/discovery/"
393               
394                # Create/clear the 'in' and 'out' directories
395                FileUtilities.setUpDir(self.originals_dir)
396                FileUtilities.setUpDir(self.discovery_dir)
397               
398                logging.info("Ingest directories for data centre set up")
399
400
401        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed):
402                '''
403                Convert files from originals dir to discovery one, then
404                ingest, backup and clean
405                @param originals_dir: directory to ingest docs from
406                @param discovery_dir: directory to use to process ingested docs
407                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
408                '''
409               
410                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
411               
412                filenames = os.listdir(self.discovery_dir)
413               
414                #generate a list of files ALREADY in database so can compare with what has been ingested
415                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
416               
417               
418                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
419                filesPresentList=[]
420                if filePresentListArr:
421                        for fileArr in filePresentListArr:
422                                filesPresentList.append(fileArr[0])
423                               
424                                #TODO - is above relevant - duplicating functionlaity?
425       
426                #create list to to hold files ingest failed on.
427                self.updateFailList = []
428                self.deletedFailList = []
429               
430               
431                for filename in filenames:
432                        fullPath = self.discovery_dir + filename
433                       
434                        if os.path.isfile(fullPath):
435                                thisIngestedID = self.addFileToPostgresDB(fullPath)
436                               
437                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
438                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
439                                        if thisIngestedID in filesPresentList: 
440                                                filesPresentList.remove(thisIngestedID)                 
441                                               
442                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
443                #will need to be removed.
444               
445                #only do this if not in single file mode (or else wverything else gets deleted!)               
446                if ((self.indFileToIngest == "") & (feed)):
447                        for item in filesPresentList:
448                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
449                                #DeleteRecord(item)
450                                self.deletedFailList.append(item)
451                                self._no_files_deleted += 1
452                       
453                       
454                self._backupAndCleanData()
455               
456                #at this stage put the reporting code in (will work for oai or atom feed)
457                #create a summary file for each data centre ingest
458                data_dir = self._base_dir + "data/"
459                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
460                recOpFile = open(recOpFileName,'w')
461               
462                logging.info("oai_document_ingest processing complete:")
463               
464                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
465                message = "Ingest report for data centre: " + datacentre + "\n"
466                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
467                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
468                message = message + "TOTAL PROCESSED " + str(numfilesproc) + "\n"
469                message = message + "INGESTED (Created) " + str(self._no_files_changed)  + "\n"
470                message = message + "INGESTED (Updated) " + str(self._no_files_ingested)  + "\n"
471                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
472                message = message + "PROBLEM FILES " + str(self._no_problem_files)  + "\n"
473               
474                '''recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
475                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
476                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
477                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
478                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
479                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
480                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
481                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")'''
482               
483               
484                for badFile in self.updateFailList:
485                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
486                        message = message +"PROBLEM_FILE " + badFile + "\n"
487                         
488                recOpFile.write(message)
489                               
490                return numfilesproc, message
491
492
493
494        def _setupXQueries(self):
495                '''
496                Set up the required XQueries
497                - NB, extract the xquery libraries locally for easy reference
498                '''
499                self._xq = ndgResources()
500                for libFile in self._xq.xqlib:
501                        # NB, we don't want the full path to the files - just the filename
502                        fileName = libFile.split('/')[-1]
503                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.