source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 5537

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@5537
Revision 5537, 17.5 KB checked in by sdonegan, 10 years ago (diff)

Updated to allow inclusion of end date ordering

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22
23class AbstractDocumentIngester(object):
24        '''
25        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
26        - including running the various transforms and parsings to get all doc types and spatiotemporal
27        data in the correct form in the DB
28        '''
29        lineSeparator = "-----------------------------"
30                       
31        # The directory to put things for a tape backup (should already exist)
32        BACKUP_DIR = '/disks/glue1/oaiBackup/'
33       
34        NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
35
36        def _setupCmdLineOptions(self):
37                '''
38                Determine the logging level to use and configure this appropriately
39                @return args: any input arguments - excluding options
40                '''
41               
42                # check for verbose option
43               
44                try:
45                        opts, args = getopt.getopt(sys.argv[1:], "vdi")
46                except getopt.GetoptError, err:
47                    # print help information and exit:
48                    print str(err) # will print something like "option -a not recognized"
49                    sys.exit(2)
50                   
51                if len(args) < 1:
52                        self.usage()
53                   
54                loggingLevel = logging.WARNING
55                for o, a in opts:
56                    if o == "-v":
57                        print " - Verbose mode ON"
58                        loggingLevel = logging.INFO
59                    elif o == "-d":
60                        print " - Debug mode ON"
61                        loggingLevel = logging.DEBUG
62                   
63               
64                # set up any keywords on the object
65                # NB, be careful to keep the instance variables the same name as the keywords!
66                for arg in args:
67                       
68                        bits = arg.split('=')
69                        if len(bits) == 2:
70                                if bits[0] == 'ingestFromDate':
71                                        self.setIngestFromDate(bits[1])
72                                elif bits[0] == 'interval':
73                                        self.setPollInterval(bits[1])
74                                elif bits[0] == 'individualFile':
75                                        print " - Running in single file ingestion mode!"
76                                        self.setIndFileToIngest(bits[1])
77                                else:
78                                        setattr(self, bits[0], bits[1])
79               
80                print self.lineSeparator
81                # NB, this is a slight fudge as cannot get the detailed logging to work
82                # without setting up a new logger here - which means we get two loggers
83                # outputing data. The initial call to logging needs to be tracked down
84                # and configured correctly, so this can be avoided...
85#               self.logger = logging.getLogger()
86#               self.logger.setLevel(loggingLevel)
87
88                # create console handler and set level to debug
89#               ch = logging.StreamHandler()
90#               ch.setLevel(loggingLevel)
91                # create formatter
92#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
93                # add formatter to ch
94#               ch.setFormatter(formatter)
95                # add ch to logger
96#               self.logger.addHandler(ch)
97                return args
98
99
100        def getID(self, filename):
101                '''
102                Gets the identifier out of an input metadata xml record.
103                @param filename - name of document file being processed
104                @return: ID - id to use to refer to the document
105                '''
106                logging.info("Retrieving identifier for metadata record " + filename)
107                xml=file(filename).read()
108                ID = idget(xml)
109                return ID
110       
111       
112        def addFileToPostgresDB(self, filename):
113                '''
114                Add a file to the postgres DB - extracting and storing all the required
115                data in the process
116                @param filename: full path of file to add to postgres DB
117                '''
118                logging.info("Adding file, " + filename + ", to postgres DB")
119               
120                numSlash = len(filename.split('/'))
121               
122                shortFilename = filename.split('/')[numSlash - 1]
123               
124                self.recordAttemptToIngestDiscoveryID = ""                             
125               
126                # first of all create a PostgresRecord - this object represents all the data required
127                # for a DB entry
128                dao = None
129               
130                try:
131                        #discoveryID = self.getID(filename)
132                       
133                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format)
134                                               
135                        discoveryID = basicParameters.datasetID                 
136                        datasetName = basicParameters.datasetName
137                        datacentreName = basicParameters.datacentreName
138                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate
139                        datasetStartDateNom = basicParameters.datasetStartDateNom
140                        datasetEndDateNom = basicParameters.datasetEndDateNom
141                       
142                        #record whats attempting to be ingested
143                        self.recordAttemptToIngestDiscoveryID = discoveryID
144                       
145                                                                                                           
146                        record = PostgresRecord(filename, self._NDG_dataProvider, \
147                                                            self._datacentre_groups, self._datacentre_namespace, \
148                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom,datasetEndDateNom, \
149                                                            self._xq, self._datacentre_format)
150                       
151                       
152                        # Now create the data access object to interface to the DB
153                        dao = PostgresDAO(record, pgClient = self.pgc)
154                       
155                        # Finally, write the new record
156                        # 0 if failed, 1 if update, 2 if create
157                        returnCode = dao.createOrUpdateRecord() 
158                        if returnCode == 2:
159                                self._no_files_ingested += 1
160                        elif returnCode == 1:
161                                self._no_files_changed += 1
162                        #if 0 nothing incremented
163                       
164                except:
165                       
166                        #if error encountered, add to failure lisr
167                        logging.error("Could not update: " + filename)                 
168                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
169                        self.updateFailList.append(originalRecordFilename)
170                       
171                        logging.error("Exception thrown - detail: ")
172                        errors = sys.exc_info()
173                        logging.error(errors)
174                        self._error_messages += "%s\n" %str(errors[1])
175                       
176                        if dao:
177                                logging.info("Removing record and its associated info from DB")
178                                logging.info("- to allow clean ingestion on rerun")
179                                try:
180                                        dao.deleteOriginalRecord()
181                                except:
182                                        logging.error("Problem encountered when removing record: ")
183                                        logging.error(sys.exc_info())
184                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
185
186                        self._no_problem_files += 1
187                       
188                        logging.info("Continue processing other files")
189                       
190                return self.recordAttemptToIngestDiscoveryID
191                       
192       
193        def getConfigDetails(self, datacentre):
194                '''
195                Get the harvested records directory and groups for this datacentre from the
196                datacentre specific config file.  The harvested records directory depends on the
197                datacentres OAI base url, the set and format. These have to be know up-front.
198                The groups denote which 'portal groups' they belong to - for limiting searches to
199                say NERC-only datacentres records.
200                Groups are added to the intermediate MOLES when it is created.
201                @param datacentre: datacentre to use when looking up config file
202                '''
203                # initialise the variables to retrieve from the config file
204                self._harvest_home = ""
205                self._datacentre_groups = ""
206                self._datacentre_format = ""
207                self._datacentre_namespace = ""
208                self._NDG_dataProvider = False
209
210                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
211                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
212               
213
214                file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
215               
216                for line in file.split('\n'):
217                        words = line.split()
218                        if len(words) == 0:
219                                continue
220                        elif words[0] == 'host_path':
221                                self._harvest_home = words[1].rstrip()
222                        elif words[0] == 'groups':
223                                self._datacentre_groups = words[1:]
224                        elif words[0] == 'format':
225                                self._datacentre_format = words[1]
226                        elif words[0] == 'namespace':
227                                self._datacentre_namespace = words[1]
228                        elif words[0] == 'NDG_dataProvider':
229                                self._NDG_dataProvider = True
230                       
231                if self._harvest_home == "":
232                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
233               
234                logging.info("harvested records are in " + self._harvest_home)
235               
236                if self._datacentre_groups == "":
237                    logging.info("No groups/keywords set for datacentre " + datacentre)
238                else:
239                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
240               
241                if self._datacentre_format == "":
242                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
243               
244                logging.info("format being harvested: " + self._datacentre_format)
245               
246                if self._datacentre_namespace == "":
247                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
248               
249                logging.info("datacentre namespace: " + self._datacentre_namespace)
250               
251                if self._NDG_dataProvider:
252                        logging.info("Datacentre classified as an NDG data provider")
253                else:
254                        logging.info("Datacentre is not classified as an NDG data provider")
255                logging.info(self.lineSeparator)
256
257
258        def _convertIngestFiles(self, originals_dir, discovery_dir):
259                '''
260                Processes/renames the files (changed 08/01/07 to get id from inside file)
261                 - also replace any namespace declarations with a standard one which we know works in NDG
262                 NB, this copies files from the original dir to the discovery dir
263                 @param originals_dir: directory to convert files from
264                 @param discovery_dir: directory in which converted files will end up
265                 @return numfilesproc: counter of number of files processed
266                '''
267                numfilesproc = 0
268               
269                self.inputFileOrigFinal = {}
270               
271                logging.info(self.lineSeparator)
272                logging.info("Renaming files:")
273               
274                for filename in os.listdir(originals_dir):
275                        if not filename.endswith('.xml'):
276                                logging.warning('File %s is not xml format. Not processed'  %(filename))
277                                continue
278                       
279                        original_filename = originals_dir + filename
280                       
281                       
282                        #convert urls within original xml input file to NDG redirect URLS
283                       
284                        ''' This xquery method superceded by new python class in Utilities.py that uses
285                        elementtree to do the same job but with better handling of character sets...'''
286                        '''
287                        if self._datacentre_format == 'DIF':           
288                                xQueryType = 'transform_DIF_URL'
289                               
290                        elif self._datacentre_format == 'MDIP':
291                                xQueryType = 'transform_MDIP_URL'
292                               
293                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/'
294                       
295                        #filename_rd = filename.replace(".xml","_redirect.xml")
296                        #original_filename_urlRedirect = originals_dir + filename_rd
297                       
298                        ndgRedirectTransform(xQueryType,redirect_url,original_filename)
299                        '''
300                       
301                        #call new class in Utilities.py --will replace original file...
302                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
303                       
304               
305                                               
306                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method)
307                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format)
308                                   
309                        try:
310                                #ident=self.getID(original_filename)
311                                ident=basicParameters.datasetID
312                                logging.info("Dataset ID is " + ident)
313                                       
314                        except Exception, detail:
315                                logging.error("Could not retrieve ID from file, %s" %filename)
316                                logging.error("Detail: %s" %detail)
317                                logging.info("Continue with next file")
318                                continue
319                       
320                        if self._NDG_dataProvider:
321                                new_filename = discovery_dir + ident.replace(":", "__")+".xml"
322                                new_filename_short = ident.replace(":", "__")+".xml"
323                               
324                        else:
325                                ident = ident.replace(":", "-")
326                                ident = ident.replace("/", "-")
327                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
328                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
329                               
330                               
331                        logging.info("original file = " + original_filename)
332                        logging.info("newfile = " + new_filename)
333                               
334                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
335                        #this links a derived filename in processing dir with original filename
336                        #get basic filename from the path+filename passed to this function to use as key               
337                        self.inputFileOrigFinal[new_filename_short]=filename
338                       
339                        # now correct any namespace issues
340                        try:
341                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
342                        except Exception, detail:
343                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
344                                logging.error("Detail: %s" %detail)
345                                logging.info("Continue with next file")
346                                continue
347                        numfilesproc += 1
348               
349                logging.info("File renaming and converting completed")
350                logging.info(self.lineSeparator)
351                return numfilesproc
352
353               
354        def _getPostgresDBConnection(self):
355                '''
356                Get the default postgres DB connection - by reading in data from the db config file
357                '''
358               
359                logging.debug("Setting up connection to postgres DB")
360                self.pgc = pgc(configFile = 'ingest.config')
361                logging.info("Postgres DB connection now set up")
362
363
364
365        def     _backupAndCleanData(self):
366                '''
367                Back up ingested data for specified data centre, then clean any of
368                the ingest dirs
369                '''
370                logging.info("Backing up ingested data")
371                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
372                  strftime("%y%m%d_%H%M") + "_originals/"
373               
374                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
375                logging.info("Data backed up - now clearing ingest directories")
376                #Clear out the original harvest records area and discovery dir
377                #FileUtilities.cleanDir(self.originals_dir)
378                FileUtilities.cleanDir(self.discovery_dir)
379                logging.info("Ingest directories cleared")
380
381
382        def     _setupDataCentreDirs(self):
383                '''
384                Set up directories appropriate for the current data centre
385                '''
386                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
387                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
388               
389                # the following dirs define where the specific documents should go
390                self.originals_dir = data_dir + "/oai/originals/"
391                self.discovery_dir = data_dir + "/discovery/"
392               
393                # Create/clear the 'in' and 'out' directories
394                FileUtilities.setUpDir(self.originals_dir)
395                FileUtilities.setUpDir(self.discovery_dir)
396               
397                logging.info("Ingest directories for data centre set up")
398
399
400        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre):
401                '''
402                Convert files from originals dir to discovery one, then
403                ingest, backup and clean
404                @param originals_dir: directory to ingest docs from
405                @param discovery_dir: directory to use to process ingested docs
406                '''
407               
408                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir)
409               
410                filenames = os.listdir(self.discovery_dir)
411               
412                #generate a list of files ALREADY in database so can compare with what has been ingested
413                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
414                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
415                filesPresentList=[]
416                if filePresentListArr:
417                        for fileArr in filePresentListArr:
418                                filesPresentList.append(fileArr[0])
419                               
420                                #TODO - is above relevant - duplicating functionlaity?
421       
422                #create list to to hold files ingest failed on.
423                self.updateFailList = []
424                self.deletedFailList = []
425               
426               
427                for filename in filenames:
428                        fullPath = self.discovery_dir + filename
429                       
430                        if os.path.isfile(fullPath):
431                                thisIngestedID = self.addFileToPostgresDB(fullPath)
432                               
433                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
434                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
435                                        if thisIngestedID in filesPresentList: 
436                                                filesPresentList.remove(thisIngestedID)                 
437                                               
438                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
439                #will need to be removed.
440               
441                #only do this if not in single file mode (or else wverything else gets deleted!)
442                if self.indFileToIngest == "":
443                        for item in filesPresentList:
444                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
445                                DeleteRecord(item)
446                                self.deletedFailList.append(item)
447                                self._no_files_deleted += 1
448                       
449                       
450                self._backupAndCleanData()
451               
452                #at this stage put the reporting code in (will work for oai or atom feed)
453                #create a summary file for each data centre ingest
454                data_dir = self._base_dir + "data/"
455                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
456                recOpFile = open(recOpFileName,'w')
457               
458                logging.info("oai_document_ingest processing complete:")
459               
460                recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
461                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
462                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
463                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
464                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
465                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
466                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
467                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")
468               
469                for badFile in self.updateFailList:
470                        recOpFile.write("PROBLEM_FILE " + badFile + "\n")
471               
472                return numfilesproc
473
474
475
476        def _setupXQueries(self):
477                '''
478                Set up the required XQueries
479                - NB, extract the xquery libraries locally for easy reference
480                '''
481                self._xq = ndgResources()
482                for libFile in self._xq.xqlib:
483                        # NB, we don't want the full path to the files - just the filename
484                        fileName = libFile.split('/')[-1]
485                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.