source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5687

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5687
Revision 5687, 18.6 KB checked in by sdonegan, 10 years ago (diff)

Updated to handle lower case dif format spec from info editor..

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22
23class AbstractDocumentIngester(object):
24        '''
25        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
26        - including running the various transforms and parsings to get all doc types and spatiotemporal
27        data in the correct form in the DB
28        '''
29        lineSeparator = "-----------------------------"
30                       
31        # The directory to put things for a tape backup (should already exist)
32        BACKUP_DIR = '/disks/glue1/oaiBackup/'
33       
34        NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
35
36        def _setupCmdLineOptions(self):
37                '''
38                Determine the logging level to use and configure this appropriately
39                @return args: any input arguments - excluding options
40                '''
41               
42                # check for verbose option
43               
44                try:
45                        opts, args = getopt.getopt(sys.argv[1:], "vd")
46                except getopt.GetoptError, err:
47                    # print help information and exit:
48                    print str(err) # will print something like "option -a not recognized"
49                    sys.exit(2)
50                   
51                if len(args) < 1:
52                        self.usage()
53                   
54                loggingLevel = logging.WARNING
55                for o, a in opts:
56                    if o == "-v":
57                        print " - Verbose mode ON"
58                        loggingLevel = logging.INFO
59                    elif o == "-d":
60                        print " - Debug mode ON"
61                        loggingLevel = logging.DEBUG
62                   
63               
64                # set up any keywords on the object
65                # NB, be careful to keep the instance variables the same name as the keywords!
66                for arg in args:
67                       
68                        bits = arg.split('=')
69                        if len(bits) == 2:
70                                if bits[0] == 'ingestFromDate':
71                                        self.setIngestFromDate(bits[1])
72                                elif bits[0] == 'interval':
73                                        self.setPollInterval(bits[1])
74                                elif bits[0] == 'individualFile':
75                                        print " - Running in single file ingestion mode!"
76                                        self.setIndFileToIngest(bits[1])
77                                else:
78                                        setattr(self, bits[0], bits[1])
79               
80                print self.lineSeparator
81               
82                # NB, this is a slight fudge as cannot get the detailed logging to work
83                # without setting up a new logger here - which means we get two loggers
84                # outputing data. The initial call to logging needs to be tracked down
85                # and configured correctly, so this can be avoided...
86#               self.logger = logging.getLogger()
87#               self.logger.setLevel(loggingLevel)
88
89                # create console handler and set level to debug
90#               ch = logging.StreamHandler()
91#               ch.setLevel(loggingLevel)
92                # create formatter
93#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
94                # add formatter to ch
95#               ch.setFormatter(formatter)
96                # add ch to logger
97#               self.logger.addHandler(ch)
98                return args
99
100
101        def getID(self, filename):
102                '''
103                Gets the identifier out of an input metadata xml record.
104                @param filename - name of document file being processed
105                @return: ID - id to use to refer to the document
106                '''
107                logging.info("Retrieving identifier for metadata record " + filename)
108                xml=file(filename).read()
109                ID = idget(xml)
110                return ID
111       
112       
113        def addFileToPostgresDB(self, filename):
114                '''
115                Add a file to the postgres DB - extracting and storing all the required
116                data in the process
117                @param filename: full path of file to add to postgres DB
118                '''
119                logging.info("Adding file, " + filename + ", to postgres DB")
120               
121                numSlash = len(filename.split('/'))
122               
123                shortFilename = filename.split('/')[numSlash - 1]
124               
125                self.recordAttemptToIngestDiscoveryID = ""                             
126               
127                # first of all create a PostgresRecord - this object represents all the data required
128                # for a DB entry
129                dao = None
130               
131                try:
132                        #discoveryID = self.getID(filename)
133                       
134                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
135                                               
136                        discoveryID = basicParameters.datasetID                 
137                        datasetName = basicParameters.datasetName
138                        datacentreName = basicParameters.datacentreName
139                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
140                        datasetStartDateNom = basicParameters.datasetStartDateNom
141                        datasetEndDateNom = basicParameters.datasetEndDateNom
142                       
143                        #record whats attempting to be ingested
144                        self.recordAttemptToIngestDiscoveryID = discoveryID
145                       
146                                                                                                           
147                        record = PostgresRecord(filename, self._NDG_dataProvider, \
148                                                            self._datacentre_groups, self._datacentre_namespace, \
149                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom,datasetEndDateNom, \
150                                                            self._xq, self._datacentre_format)
151                       
152                       
153                        # Now create the data access object to interface to the DB
154                        dao = PostgresDAO(record, pgClient = self.pgc)
155                       
156                        # Finally, write the new record
157                        # 0 if failed, 1 if update, 2 if create
158                        returnCode = dao.createOrUpdateRecord() 
159                        if returnCode == 2:
160                                self._no_files_ingested += 1
161                        elif returnCode == 1:
162                                self._no_files_changed += 1
163                        #if 0 nothing incremented
164                       
165                except:
166                       
167                        #if error encountered, add to failure lisr
168                        logging.error("Could not update: " + filename)                 
169                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
170                        self.updateFailList.append(originalRecordFilename)
171                       
172                        logging.error("Exception thrown - detail: ")
173                        errors = sys.exc_info()
174                        logging.error(errors)
175                        self._error_messages += "%s\n" %str(errors[1])
176                       
177                        if dao:
178                                logging.info("Removing record and its associated info from DB")
179                                logging.info("- to allow clean ingestion on rerun")
180                                try:
181                                        dao.deleteOriginalRecord()
182                                except:
183                                        logging.error("Problem encountered when removing record: ")
184                                        logging.error(sys.exc_info())
185                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
186
187                        self._no_problem_files += 1
188                       
189                        logging.info("Continue processing other files")
190                       
191                return self.recordAttemptToIngestDiscoveryID
192                       
193       
194        def getConfigDetails(self, datacentre):
195                '''
196                Get the harvested records directory and groups for this datacentre from the
197                datacentre specific config file.  The harvested records directory depends on the
198                datacentres OAI base url, the set and format. These have to be know up-front.
199                The groups denote which 'portal groups' they belong to - for limiting searches to
200                say NERC-only datacentres records.
201                Groups are added to the intermediate MOLES when it is created.
202                @param datacentre: datacentre to use when looking up config file
203                '''
204                # initialise the variables to retrieve from the config file
205                self._harvest_home = ""
206                self._datacentre_groups = ""
207                self._datacentre_format = ""
208                self._datacentre_namespace = ""
209                self._NDG_dataProvider = False
210
211                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
212                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
213               
214
215                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
216               
217                for line in file.split('\n'):
218                        words = line.split()
219                        if len(words) == 0:
220                                continue
221                        elif words[0] == 'host_path':
222                                self._harvest_home = words[1].rstrip()
223                        elif words[0] == 'groups':
224                                self._datacentre_groups = words[1:]
225                        elif words[0] == 'format':
226                                self._datacentre_format = words[1]
227                        elif words[0] == 'namespace':
228                                self._datacentre_namespace = words[1]
229                        elif words[0] == 'NDG_dataProvider':
230                                self._NDG_dataProvider = True
231                       
232                if self._harvest_home == "":
233                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
234               
235                logging.info("harvested records are in " + self._harvest_home)
236               
237                if self._datacentre_groups == "":
238                    logging.info("No groups/keywords set for datacentre " + datacentre)
239                else:
240                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
241               
242                if self._datacentre_format == "":
243                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
244               
245                logging.info("format being harvested: " + self._datacentre_format)
246               
247                if self._datacentre_namespace == "":
248                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
249               
250                logging.info("datacentre namespace: " + self._datacentre_namespace)
251               
252                if self._NDG_dataProvider:
253                        logging.info("Datacentre classified as an NDG data provider")
254                else:
255                        logging.info("Datacentre is not classified as an NDG data provider")
256                logging.info(self.lineSeparator)
257
258
259        def _convertIngestFiles(self, originals_dir, discovery_dir):
260                '''
261                Processes/renames the files (changed 08/01/07 to get id from inside file)
262                 - also replace any namespace declarations with a standard one which we know works in NDG
263                 NB, this copies files from the original dir to the discovery dir
264                 @param originals_dir: directory to convert files from
265                 @param discovery_dir: directory in which converted files will end up
266                 @return numfilesproc: counter of number of files processed
267                '''
268                numfilesproc = 0
269               
270                self.inputFileOrigFinal = {}
271               
272                logging.info(self.lineSeparator)
273                logging.info("Renaming files:")
274               
275                for filename in os.listdir(originals_dir):
276                        if not filename.endswith('.xml'):
277                                logging.warning('File %s is not xml format. Not processed'  %(filename))
278                                continue
279                       
280                        original_filename = originals_dir + filename
281                       
282                       
283                        #convert urls within original xml input file to NDG redirect URLS
284                       
285                        ''' This xquery method superceded by new python class in Utilities.py that uses
286                        elementtree to do the same job but with better handling of character sets...'''
287                        '''
288                        if self._datacentre_format == 'DIF':           
289                                xQueryType = 'transform_DIF_URL'
290                               
291                        elif self._datacentre_format == 'MDIP':
292                                xQueryType = 'transform_MDIP_URL'
293                               
294                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/'
295                       
296                        #filename_rd = filename.replace(".xml","_redirect.xml")
297                        #original_filename_urlRedirect = originals_dir + filename_rd
298                       
299                        ndgRedirectTransform(xQueryType,redirect_url,original_filename)
300                        '''
301                       
302                        #call new class in Utilities.py --will replace original file...
303                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
304                                       
305                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
306                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
307                                   
308                        try:
309                                #ident=self.getID(original_filename)
310                                ident=basicParameters.datasetID
311                                logging.info("Dataset ID is " + ident)
312                                       
313                        except Exception, detail:
314                                logging.error("Could not retrieve ID from file, %s" %filename)
315                                logging.error("Detail: %s" %detail)
316                                logging.info("Continue with next file")
317                                continue
318                       
319                        #put in catch to change lower case dif to DIF - problems created by oai_info_editor!!
320                        if self._datacentre_format == 'dif':
321                                self._datacentre_format = 'DIF'
322                       
323                       
324                        if self._NDG_dataProvider:
325                                new_filename = discovery_dir + ident.replace(":", "__")+".xml"
326                                new_filename_short = ident.replace(":", "__")+".xml"
327                               
328                        else:
329                                ident = ident.replace(":", "-")
330                                ident = ident.replace("/", "-")
331                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
332                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
333                               
334                       
335                               
336                        logging.info("original file = " + original_filename)
337                        logging.info("newfile = " + new_filename)
338                               
339                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
340                        #this links a derived filename in processing dir with original filename
341                        #get basic filename from the path+filename passed to this function to use as key               
342                        self.inputFileOrigFinal[new_filename_short]=filename
343                       
344                        # now correct any namespace issues
345                        try:
346                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
347                        except Exception, detail:
348                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
349                                logging.error("Detail: %s" %detail)
350                                logging.info("Continue with next file")
351                                continue
352                        numfilesproc += 1
353               
354                logging.info("File renaming and converting completed")
355                logging.info(self.lineSeparator)
356                return numfilesproc
357
358               
359        def _getPostgresDBConnection(self):
360                '''
361                Get the default postgres DB connection - by reading in data from the db config file
362                '''
363               
364                logging.debug("Setting up connection to postgres DB")
365                self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
366                logging.info("Postgres DB connection now set up")
367
368
369
370        def     _backupAndCleanData(self):
371                '''
372                Back up ingested data for specified data centre, then clean any of
373                the ingest dirs
374                '''
375                logging.info("Backing up ingested data")
376                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
377                  strftime("%y%m%d_%H%M") + "_originals/"
378               
379                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
380                logging.info("Data backed up - now clearing ingest directories")
381                #Clear out the original harvest records area and discovery dir
382                #FileUtilities.cleanDir(self.originals_dir)
383                #FileUtilities.cleanDir(self.discovery_dir)
384                logging.info("Ingest directories cleared")
385
386
387        def     _setupDataCentreDirs(self):
388                '''
389                Set up directories appropriate for the current data centre
390                '''
391                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
392                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
393               
394                # the following dirs define where the specific documents should go
395                self.originals_dir = data_dir + "/oai/originals/"
396                self.discovery_dir = data_dir + "/discovery/"
397               
398                # Create/clear the 'in' and 'out' directories
399                FileUtilities.setUpDir(self.originals_dir)
400                FileUtilities.setUpDir(self.discovery_dir)
401               
402                logging.info("Ingest directories for data centre set up")
403
404
405        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed):
406                '''
407                Convert files from originals dir to discovery one, then
408                ingest, backup and clean
409                @param originals_dir: directory to ingest docs from
410                @param discovery_dir: directory to use to process ingested docs
411                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
412                '''
413               
414                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
415               
416                filenames = os.listdir(self.discovery_dir)
417               
418                #generate a list of files ALREADY in database so can compare with what has been ingested
419                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
420               
421               
422                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
423                filesPresentList=[]
424                if filePresentListArr:
425                        for fileArr in filePresentListArr:
426                                filesPresentList.append(fileArr[0])
427                               
428                                #TODO - is above relevant - duplicating functionlaity?
429       
430                #create list to to hold files ingest failed on.
431                self.updateFailList = []
432                self.deletedFailList = []
433               
434               
435                for filename in filenames:
436                        fullPath = self.discovery_dir + filename
437                       
438                        if os.path.isfile(fullPath):
439                                thisIngestedID = self.addFileToPostgresDB(fullPath)
440                               
441                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
442                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
443                                        if thisIngestedID in filesPresentList: 
444                                                filesPresentList.remove(thisIngestedID)                 
445                                               
446                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
447                #will need to be removed.
448               
449                #only do this if not in single file mode (or else wverything else gets deleted!)               
450                if ((self.indFileToIngest == "") & (feed)):
451                        for item in filesPresentList:
452                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
453                                #DeleteRecord(item)
454                                self.deletedFailList.append(item)
455                                self._no_files_deleted += 1
456                       
457                       
458                self._backupAndCleanData()
459               
460                #at this stage put the reporting code in (will work for oai or atom feed)
461                #create a summary file for each data centre ingest
462                data_dir = self._base_dir + "data/"
463                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
464                recOpFile = open(recOpFileName,'w')
465               
466                logging.info("oai_document_ingest processing complete:")
467               
468                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
469                message = "Ingest report for data centre: " + datacentre + "\n"
470                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
471                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
472                message = message + "TOTAL PROCESSED " + str(numfilesproc) + "\n"
473                message = message + "INGESTED (Created) " + str(self._no_files_changed)  + "\n"
474                message = message + "INGESTED (Updated) " + str(self._no_files_ingested)  + "\n"
475                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
476                message = message + "PROBLEM FILES " + str(self._no_problem_files)  + "\n"
477               
478                '''recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
479                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
480                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
481                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
482                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
483                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
484                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
485                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")'''
486               
487               
488                for badFile in self.updateFailList:
489                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
490                        message = message +"PROBLEM_FILE " + badFile + "\n"
491                         
492                recOpFile.write(message)
493                               
494                return numfilesproc, message
495
496
497
498        def _setupXQueries(self):
499                '''
500                Set up the required XQueries
501                - NB, extract the xquery libraries locally for easy reference
502                '''
503                self._xq = ndgResources()
504                for libFile in self._xq.xqlib:
505                        # NB, we don't want the full path to the files - just the filename
506                        fileName = libFile.split('/')[-1]
507                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.