source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5846

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5846
Revision 5846, 19.1 KB checked in by sdonegan, 10 years ago (diff)

Fixed bug so handles special chars in dataset title and handling of invalid keyword args

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22
23class AbstractDocumentIngester(object):
24        '''
25        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
26        - including running the various transforms and parsings to get all doc types and spatiotemporal
27        data in the correct form in the DB
28        '''
29        lineSeparator = "------------------------------"
30                       
31        # The directory to put things for a tape backup (should already exist)
32        BACKUP_DIR = '/disks/glue1/oaiBackup/'
33       
34        NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
35
36        def _setupCmdLineOptions(self):
37                '''
38                Determine the logging level to use and configure this appropriately
39                @return args: any input arguments - excluding options
40                '''
41               
42                # check for verbose option
43               
44                try:
45                        opts, args = getopt.getopt(sys.argv[1:], "vd")
46                except getopt.GetoptError, err:
47                    # print help information and exit:
48                    print str(err) # will print something like "option -a not recognized"
49                    sys.exit(2)
50                   
51                if len(args) < 1:
52                        self.usage()
53                   
54                loggingLevel = logging.WARNING
55                for o, a in opts:
56                    if o == "-v":
57                        print " - Verbose mode ON"
58                        loggingLevel = logging.INFO
59                    elif o == "-d":
60                        print " - Debug mode ON"
61                        loggingLevel = logging.DEBUG
62                   
63               
64                # set up any keywords on the object
65                # NB, be careful to keep the instance variables the same name as the keywords!
66                for arg in args:
67                       
68                        keywordArgs=['ingestFromDate','interval','individualFile']
69                       
70                        bits = arg.split('=')
71                        if len(bits) == 2:
72                                if bits[0] == keywordArgs[0]:
73                                        self.setIngestFromDate(bits[1])
74                                elif bits[0] == keywordArgs[1]:
75                                        self.setPollInterval(bits[1])
76                                elif bits[0] == keywordArgs[2]:
77                                        print " - Running in single file ingestion mode!"
78                                        self.setIndFileToIngest(bits[1])
79                                elif bits[0] not in keywordArgs:
80                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
81                                        sys.exit(2)
82                                else:
83                                        setattr(self, bits[0], bits[1])
84                       
85                print self.lineSeparator
86               
87                # NB, this is a slight fudge as cannot get the detailed logging to work
88                # without setting up a new logger here - which means we get two loggers
89                # outputing data. The initial call to logging needs to be tracked down
90                # and configured correctly, so this can be avoided...
91#               self.logger = logging.getLogger()
92#               self.logger.setLevel(loggingLevel)
93
94                # create console handler and set level to debug
95#               ch = logging.StreamHandler()
96#               ch.setLevel(loggingLevel)
97                # create formatter
98#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
99                # add formatter to ch
100#               ch.setFormatter(formatter)
101                # add ch to logger
102#               self.logger.addHandler(ch)
103                return args
104
105
106        def getID(self, filename):
107                '''
108                Gets the identifier out of an input metadata xml record.
109                @param filename - name of document file being processed
110                @return: ID - id to use to refer to the document
111                '''
112                logging.info("Retrieving identifier for metadata record " + filename)
113                xml=file(filename).read()
114                ID = idget(xml)
115                return ID
116       
117       
118        def addFileToPostgresDB(self, filename):
119                '''
120                Add a file to the postgres DB - extracting and storing all the required
121                data in the process
122                @param filename: full path of file to add to postgres DB
123                '''
124                logging.info("Adding file, " + filename + ", to postgres DB")
125               
126                numSlash = len(filename.split('/'))
127               
128                shortFilename = filename.split('/')[numSlash - 1]
129               
130                self.recordAttemptToIngestDiscoveryID = ""                             
131               
132                # first of all create a PostgresRecord - this object represents all the data required
133                # for a DB entry
134                dao = None
135               
136                try:
137                        #discoveryID = self.getID(filename)
138                       
139                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
140                                               
141                        discoveryID = basicParameters.datasetID                 
142                        datasetName = basicParameters.datasetName
143                        datacentreName = basicParameters.datacentreName
144                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
145                        datasetStartDateNom = basicParameters.datasetStartDateNom
146                        datasetEndDateNom = basicParameters.datasetEndDateNom
147                       
148                        #record whats attempting to be ingested
149                        self.recordAttemptToIngestDiscoveryID = discoveryID
150                       
151                                                                                                           
152                        record = PostgresRecord(filename, self._NDG_dataProvider, \
153                                                            self._datacentre_groups, self._datacentre_namespace, \
154                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom,datasetEndDateNom, \
155                                                            self._xq, self._datacentre_format)
156                        #import pdb
157                        #pdb.set_trace()
158                       
159                        # Now create the data access object to interface to the DB
160                        dao = PostgresDAO(record, pgClient = self.pgc)
161                       
162                        # Finally, write the new record
163                        # 0 if failed, 1 if update, 2 if create
164                        returnCode = dao.createOrUpdateRecord() 
165                        if returnCode == 2:
166                                self._no_files_ingested += 1
167                        elif returnCode == 1:
168                                self._no_files_changed += 1
169                        #if 0 nothing incremented
170                       
171                except:
172                       
173                        #if error encountered, add to failure lisr
174                        logging.error("Could not update: " + filename)                 
175                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
176                        self.updateFailList.append(originalRecordFilename)
177                       
178                        logging.error("Exception thrown - detail: ")
179                        errors = sys.exc_info()
180                        logging.error(errors)
181                        self._error_messages += "%s\n" %str(errors[1])
182                       
183                        if dao:
184                                logging.info("Removing record and its associated info from DB")
185                                logging.info("- to allow clean ingestion on rerun")
186                                try:
187                                        dao.deleteOriginalRecord()
188                                except:
189                                        logging.error("Problem encountered when removing record: ")
190                                        logging.error(sys.exc_info())
191                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
192
193                        self._no_problem_files += 1
194                       
195                        logging.info("Continue processing other files")
196                       
197                return self.recordAttemptToIngestDiscoveryID
198                       
199       
200        def getConfigDetails(self, datacentre):
201                '''
202                Get the harvested records directory and groups for this datacentre from the
203                datacentre specific config file.  The harvested records directory depends on the
204                datacentres OAI base url, the set and format. These have to be know up-front.
205                The groups denote which 'portal groups' they belong to - for limiting searches to
206                say NERC-only datacentres records.
207                Groups are added to the intermediate MOLES when it is created.
208                @param datacentre: datacentre to use when looking up config file
209                '''
210                # initialise the variables to retrieve from the config file
211                self._harvest_home = ""
212                self._datacentre_groups = ""
213                self._datacentre_format = ""
214                self._datacentre_namespace = ""
215                self._NDG_dataProvider = False
216
217                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
218                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
219               
220
221                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
222               
223                for line in file.split('\n'):
224                        words = line.split()
225                        if len(words) == 0:
226                                continue
227                        elif words[0] == 'host_path':
228                                self._harvest_home = words[1].rstrip()
229                        elif words[0] == 'groups':
230                                self._datacentre_groups = words[1:]
231                        elif words[0] == 'format':
232                                self._datacentre_format = words[1]
233                        elif words[0] == 'namespace':
234                                self._datacentre_namespace = words[1]
235                        elif words[0] == 'NDG_dataProvider':
236                                self._NDG_dataProvider = True
237                       
238                if self._harvest_home == "":
239                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
240               
241                logging.info("harvested records are in " + self._harvest_home)
242               
243                if self._datacentre_groups == "":
244                    logging.info("No groups/keywords set for datacentre " + datacentre)
245                else:
246                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
247               
248                if self._datacentre_format == "":
249                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
250               
251                logging.info("format being harvested: " + self._datacentre_format)
252               
253                if self._datacentre_namespace == "":
254                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
255               
256                logging.info("datacentre namespace: " + self._datacentre_namespace)
257               
258                if self._NDG_dataProvider:
259                        logging.info("Datacentre classified as an NDG data provider")
260                else:
261                        logging.info("Datacentre is not classified as an NDG data provider")
262                logging.info(self.lineSeparator)
263
264
265        def _convertIngestFiles(self, originals_dir, discovery_dir):
266                '''
267                Processes/renames the files (changed 08/01/07 to get id from inside file)
268                 - also replace any namespace declarations with a standard one which we know works in NDG
269                 NB, this copies files from the original dir to the discovery dir
270                 @param originals_dir: directory to convert files from
271                 @param discovery_dir: directory in which converted files will end up
272                 @return numfilesproc: counter of number of files processed
273                '''
274                numfilesproc = 0
275               
276                self.inputFileOrigFinal = {}
277               
278                logging.info(self.lineSeparator)
279                logging.info("Renaming files:")
280               
281                for filename in os.listdir(originals_dir):
282                       
283                        if not filename.endswith('.xml'):
284                                logging.warning('File %s is not xml format. Not processed'  %(filename))
285                                continue
286                       
287                        original_filename = originals_dir + filename
288                       
289                       
290                       
291                        #convert urls within original xml input file to NDG redirect URLS
292                       
293                        ''' This xquery method superceded by new python class in Utilities.py that uses
294                        elementtree to do the same job but with better handling of character sets...'''
295                        '''
296                        if self._datacentre_format == 'DIF':           
297                                xQueryType = 'transform_DIF_URL'
298                               
299                        elif self._datacentre_format == 'MDIP':
300                                xQueryType = 'transform_MDIP_URL'
301                               
302                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/'
303                       
304                        #filename_rd = filename.replace(".xml","_redirect.xml")
305                        #original_filename_urlRedirect = originals_dir + filename_rd
306                       
307                        ndgRedirectTransform(xQueryType,redirect_url,original_filename)
308                        '''
309                       
310                        #call new class in Utilities.py --will replace original file...
311                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
312                       
313               
314                                               
315                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
316                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
317                                   
318                        try:
319                                #ident=self.getID(original_filename)
320                                ident=basicParameters.datasetID
321                                logging.info("Dataset ID is " + ident)
322                                       
323                        except Exception, detail:
324                                logging.error("Could not retrieve ID from file, %s" %filename)
325                                logging.error("Detail: %s" %detail)
326                                logging.info("Continue with next file")
327                                continue
328                       
329                        if self._NDG_dataProvider:
330                               
331                                #new_filename = discovery_dir + ident.replace(":", "__")+".xml"
332                                #new_filename_short = ident.replace(":", "__")+".xml"
333                               
334                                #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
335                                new_filename = discovery_dir + self._datacentre_namespace + "__" +self._datacentre_format + "__"+ ident.replace(":", "-")+".xml"
336                                new_filename_short = self._datacentre_namespace + "__" +self._datacentre_format + "__" + ident.replace(":", "-")+".xml"
337                               
338                        else:
339                               
340                                ident = ident.replace(":", "-")
341                                ident = ident.replace("/", "-")
342                               
343                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
344                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
345                               
346                               
347                        logging.info("original file = " + original_filename)
348                        logging.info("newfile = " + new_filename)
349                               
350                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
351                        #this links a derived filename in processing dir with original filename
352                        #get basic filename from the path+filename passed to this function to use as key               
353                        self.inputFileOrigFinal[new_filename_short]=filename
354                       
355                        # now correct any namespace issues
356                        try:
357                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
358                        except Exception, detail:
359                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
360                                logging.error("Detail: %s" %detail)
361                                logging.info("Continue with next file")
362                                continue
363                        numfilesproc += 1
364               
365                logging.info("File renaming and converting completed")
366                logging.info(self.lineSeparator)
367               
368                #sys.exit()
369               
370                return numfilesproc
371
372               
373        def _getPostgresDBConnection(self):
374                '''
375                Get the default postgres DB connection - by reading in data from the db config file
376                '''
377               
378                logging.debug("Setting up connection to postgres DB")
379                self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
380                logging.info("Postgres DB connection now set up")
381
382
383
384        def     _backupAndCleanData(self):
385                '''
386                Back up ingested data for specified data centre, then clean any of
387                the ingest dirs
388                '''
389                logging.info("Backing up ingested data")
390                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
391                  strftime("%y%m%d_%H%M") + "_originals/"
392               
393                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
394                logging.info("Data backed up - now clearing ingest directories")
395                #Clear out the original harvest records area and discovery dir
396                #FileUtilities.cleanDir(self.originals_dir)
397                #FileUtilities.cleanDir(self.discovery_dir)
398                logging.info("Ingest directories cleared")
399
400
401        def     _setupDataCentreDirs(self):
402                '''
403                Set up directories appropriate for the current data centre
404                '''
405                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
406                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
407               
408                # the following dirs define where the specific documents should go
409                self.originals_dir = data_dir + "/oai/originals/"
410                self.discovery_dir = data_dir + "/discovery/"
411               
412                # Create/clear the 'in' and 'out' directories
413                FileUtilities.setUpDir(self.originals_dir)
414                FileUtilities.setUpDir(self.discovery_dir)
415               
416                logging.info("Ingest directories for data centre set up")
417
418
419        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed):
420                '''
421                Convert files from originals dir to discovery one, then
422                ingest, backup and clean
423                @param originals_dir: directory to ingest docs from
424                @param discovery_dir: directory to use to process ingested docs
425                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
426                '''
427               
428                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
429               
430                filenames = os.listdir(self.discovery_dir)
431               
432                #generate a list of files ALREADY in database so can compare with what has been ingested
433                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
434               
435               
436                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
437                filesPresentList=[]
438                if filePresentListArr:
439                        for fileArr in filePresentListArr:
440                                filesPresentList.append(fileArr[0])
441                               
442                                #TODO - is above relevant - duplicating functionlaity?
443       
444                #create list to to hold files ingest failed on.
445                self.updateFailList = []
446                self.deletedFailList = []
447               
448               
449                for filename in filenames:
450                        fullPath = self.discovery_dir + filename
451                       
452                        if os.path.isfile(fullPath):
453                                thisIngestedID = self.addFileToPostgresDB(fullPath)
454                               
455                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
456                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
457                                        if thisIngestedID in filesPresentList: 
458                                                filesPresentList.remove(thisIngestedID)                 
459                                               
460                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
461                #will need to be removed.
462               
463                #only do this if not in single file mode (or else wverything else gets deleted!)               
464                if ((self.indFileToIngest == "") & (feed)):
465                        for item in filesPresentList:
466                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
467                                DeleteRecord(item)
468                                self.deletedFailList.append(item)
469                                self._no_files_deleted += 1
470                       
471                       
472                self._backupAndCleanData()
473               
474                #at this stage put the reporting code in (will work for oai or atom feed)
475                #create a summary file for each data centre ingest
476                data_dir = self._base_dir + "data/"
477                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
478                recOpFile = open(recOpFileName,'w')
479               
480                logging.info("oai_document_ingest processing complete:")
481               
482                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
483                message = "Ingest report for data centre: " + datacentre + "\n"
484                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
485                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
486                message = message + "TOTAL PROCESSED " + str(numfilesproc) + "\n"
487                message = message + "INGESTED (Created) " + str(self._no_files_changed)  + "\n"
488                message = message + "INGESTED (Updated) " + str(self._no_files_ingested)  + "\n"
489                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
490                message = message + "PROBLEM FILES " + str(self._no_problem_files)  + "\n"
491               
492                '''recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
493                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
494                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
495                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
496                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
497                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
498                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
499                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")'''
500               
501               
502                for badFile in self.updateFailList:
503                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
504                        message = message +"PROBLEM_FILE " + badFile + "\n"
505                         
506                recOpFile.write(message)
507                               
508                return numfilesproc, message
509
510
511
512        def _setupXQueries(self):
513                '''
514                Set up the required XQueries
515                - NB, extract the xquery libraries locally for easy reference
516                '''
517                self._xq = ndgResources()
518                for libFile in self._xq.xqlib:
519                        # NB, we don't want the full path to the files - just the filename
520                        fileName = libFile.split('/')[-1]
521                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.