source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 6367

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@6367
Revision 6367, 20.3 KB checked in by sdonegan, 10 years ago (diff)

completion of synchronising...

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from Utilities import idget, IngestTracking
15from Utilities import DatasetBasicParameters_MEDIN_v01,DatasetBasicParameters_Original
16from Utilities import ndgRedirectTransform,redirectUrlChanger
17from DeleteRecord import DeleteRecord
18import datetime,time
19from ndg.common.src.models.ndgObject import ndgObject
20from ndg.common.src.lib.ndgresources import ndgResources
21from AbstractFormatIngester_original import AbstractFormatIngesterOriginal
22from ndg.common.src.models.ndgObject import ndgObject
23
24SAXON_JAR_FILE = '/disks/glue1/sdonegan/NDG3_workspace/buildouts/oai_document_ingester_MEDIN/ingestAutomation-upgrade/OAIBatch/lib/saxon9.jar'
25       
26
27class AbstractDocumentIngester(object):
28        '''
29        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
30        - including running the various transforms and parsings to get all doc types and spatiotemporal
31        data in the correct form in the DB
32        '''
33        lineSeparator = "------------------------------"
34                       
35        # The directory to put things for a tape backup (should already exist)
36        BACKUP_DIR = '/disks/glue1/oaiBackup/'
37               
38        #keep ndg3beta service for testing MEDIN upgrades
39        NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
40       
41       
42        #NDG_redirect_URL = 'http://triton.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
43       
44       
45
46        def _setupCmdLineOptions(self):
47                '''
48                Determine the logging level to use and configure this appropriately
49                @return args: any input arguments - excluding options
50                '''
51               
52                # check for verbose option
53               
54                try:
55                        opts, args = getopt.getopt(sys.argv[1:], "vd")
56                except getopt.GetoptError, err:
57                    # print help information and exit:
58                    print str(err) # will print something like "option -a not recognized"
59                    sys.exit(2)
60                   
61                if len(args) < 1:
62                        self.usage()
63                   
64                loggingLevel = logging.WARNING
65                for o, a in opts:
66                    if o == "-v":
67                        print " - Verbose mode ON"
68                        loggingLevel = logging.INFO
69                    elif o == "-d":
70                        print " - Debug mode ON"
71                        loggingLevel = logging.DEBUG
72                   
73               
74                # set up any keywords on the object
75                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
76                for arg in args:
77                       
78                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll']
79                       
80                       
81                       
82                        bits = arg.split('=')
83                        if len(bits) == 2:
84                                if bits[0] == keywordArgs[0]:
85                                        self.setIngestFromDate(bits[1])
86                                elif bits[0] == keywordArgs[1]:
87                                        self.setPollInterval(bits[1])
88                                elif bits[0] == keywordArgs[2]:
89                                        print " - Running in single file ingestion mode!"
90                                        self.setIndFileToIngest(bits[1])
91                                elif bits[0] not in keywordArgs:
92                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
93                                        sys.exit(2)
94                                else:
95                                        setattr(self, bits[0], bits[1])
96                       
97                print self.lineSeparator
98               
99                # NB, this is a slight fudge as cannot get the detailed logging to work
100                # without setting up a new logger here - which means we get two loggers
101                # outputing data. The initial call to logging needs to be tracked down
102                # and configured correctly, so this can be avoided...
103#               self.logger = logging.getLogger()
104#               self.logger.setLevel(loggingLevel)
105
106                # create console handler and set level to debug
107#               ch = logging.StreamHandler()
108#               ch.setLevel(loggingLevel)
109                # create formatter
110#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
111                # add formatter to ch
112#               ch.setFormatter(formatter)
113                # add ch to logger
114#               self.logger.addHandler(ch)
115                return args
116
117
118        def getID(self, filename):
119                '''
120                Gets the identifier out of an input metadata xml record.
121                @param filename - name of document file being processed
122                @return: ID - id to use to refer to the document
123                '''
124                logging.info("Retrieving identifier for metadata record " + filename)
125                xml=file(filename).read()
126                ID = idget(xml)
127                return ID
128       
129       
130       
131                       
132       
133        def getConfigDetails(self, datacentre):
134                '''
135                Get the harvested records directory and groups for this datacentre from the
136                datacentre specific config file.  The harvested records directory depends on the
137                datacentres OAI base url, the set and format. These have to be know up-front.
138                The groups denote which 'portal groups' they belong to - for limiting searches to
139                say NERC-only datacentres records.
140                Groups are added to the intermediate MOLES when it is created.
141                @param datacentre: datacentre to use when looking up config file
142                '''
143                # initialise the variables to retrieve from the config file
144                self._harvest_home = ""
145                self._datacentre_groups = ""
146                self._datacentre_format = ""
147                self._datacentre_namespace = ""
148                self._NDG_dataProvider = False
149
150                datacentre_config_filename = 'datacentre_config/' + datacentre + "_config.properties"
151                logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename)
152               
153
154                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
155                file = open(datacentre_config_filename,"r")
156               
157                for line in file.readlines():
158                        words = line.split()
159                        if len(words) == 0:
160                                continue
161                        elif words[0] == 'host_path':
162                                self._harvest_home = words[1].rstrip()
163                        elif words[0] == 'groups':
164                                self._datacentre_groups = words[1:]
165                        elif words[0] == 'format':
166                                self._datacentre_format = words[1]
167                        elif words[0] == 'namespace':
168                                self._datacentre_namespace = words[1]
169                        elif words[0] == 'NDG_dataProvider':
170                                self._NDG_dataProvider = True
171               
172                if self._harvest_home == "":
173                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
174               
175                logging.info("harvested records are in " + self._harvest_home)
176               
177                if self._datacentre_groups == "":
178                    logging.info("No groups/keywords set for datacentre " + datacentre)
179                else:
180                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
181               
182                if self._datacentre_format == "":
183                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
184               
185                logging.info("format being harvested: " + self._datacentre_format)
186               
187                if self._datacentre_namespace == "":
188                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
189               
190                logging.info("datacentre namespace: " + self._datacentre_namespace)
191               
192                if self._NDG_dataProvider:
193                        logging.info("Datacentre classified as an NDG data provider")
194                else:
195                        logging.info("Datacentre is not classified as an NDG data provider")
196                logging.info(self.lineSeparator)
197
198
199        def convertDIFtoISO(self,metadataFileLoc,repositoryName,metadataID,metadataFilename):
200                '''
201                   New to MEDIN ingest Upgrade:
202                   This converts the still supported DIF to ISO format - all extractions will come from ISO,
203                   whether original ISO format or conversions to ISO.
204                   
205                   Uses the new "dif2stubISO" xquery
206                '''
207                xQueryType = 'dif2stubISO'
208               
209                self._repository_local_id = repositoryName #'neodc.nerc.ac.uk'
210                self.discovery_id = metadataID
211                self._local_id = metadataFilename
212                self._repository = repositoryName #'neodc.nerc.ac.uk'
213                               
214                self.xqueryLib = ndgResources()                                                 
215                xquery = self.xqueryLib.createXQuery(xQueryType,metadataFileLoc, self._repository_local_id, self._local_id)
216     
217                # sort out the input ID stuff
218                xquery=xquery.replace('Input_Entry_ID', self.discovery_id)
219                xquery=xquery.replace('repository_localid', self._repository)
220
221        # strip out the eXist reference to the libraries; these files should be available in the
222                # running dir - as set up by oai_ingest.py
223                xquery=xquery.replace('xmldb:exist:///db/xqueryLib/Vocabs/', '')
224                xquery=xquery.replace('xmldb:exist:///db/xqueryLib/Utilities/', '')
225
226        # write the query to file, to make it easier to input
227        # NB, running directly at the command line leads to problems with the interpretation of $ characters
228                xqFile = "currentQuery" + xQueryType + ".xq" 
229               
230                FileUtilities.createFile(xqFile, xquery)
231                               
232                # ensure the jar file is available - NB, this may be running from a different
233        # location - e.g. the OAIInfoEditor.lib.harvester - and this won't have the
234        # saxon file directly on its filesystem
235               
236                #jarFile = pkg_resources.resource_filename('OAIBatch', SAXON_JAR_FILE)
237                jarFile = SAXON_JAR_FILE
238        # Now do the transform
239                os.putenv ('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.')
240                xqCommand = "java -cp %s net.sf.saxon.Query %s !omit-xml-declaration=yes" %(jarFile, xqFile)
241                               
242                logging.debug("Running saxon command: " + xqCommand)
243                pipe = os.popen(xqCommand + " 2>&1")
244                output = pipe.read()
245                status = pipe.close()
246
247                if status is not None:
248                        raise SystemError, 'Failed at running the XQuery'
249               
250               
251                # now remove the temp xquery file
252                '''status = os.unlink(xqFile)
253                if status is not None:
254                        raise OSError, 'Failed to remove the temporary xquery file, ' + xqFile'''
255
256                logging.info("Transform completed successfully")
257
258                return output
259                               
260
261
262        def _convertIngestFiles(self, originals_dir, discovery_dir, stubIso_dir, format):
263                '''
264                Processes/renames the files (changed 08/01/07 to get id from inside file)
265                 - also replace any namespace declarations with a standard one which we know works in NDG
266                 NB, this copies files from the original dir to the discovery dir
267                 @param originals_dir: directory to convert files from
268                 @param discovery_dir: directory in which converted files will end up
269                 @return numfilesproc: counter of number of files processed
270                '''
271                numfilesproc = 0                       
272               
273                self.inputFileOrigFinal = {}
274               
275                logging.info(self.lineSeparator)
276                logging.info("Renaming files:")
277               
278                for filename in os.listdir(originals_dir):
279                       
280                        if not filename.endswith('.xml'):
281                                logging.warning('File %s is not xml format. Not processed'  %(filename))
282                                continue
283                       
284                        original_filename = originals_dir + filename
285                       
286                                               
287                        #convert urls within original xml input file to NDG redirect URLS                                               
288                        #call new class in Utilities.py --will replace original file...
289                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL)
290                       
291                        if format == 'DIF':
292                               
293                                #NOTE MDIP now deprecated!!  easier to leave all those crappy dif or mdip code but will mainly be replaced anyway
294                               
295                                logging.info("Converting DIF to stub ISO...")
296                                                               
297                                #gets the metadata id from the xml
298                                metadataID=self.getID(original_filename)
299                               
300                                repositoryName = self._datacentre_namespace
301                               
302                                #get the name of the file to be used in the xquery
303                                metadataFilename = filename.replace('.xml', '')
304                               
305                                #where is the file to be ingested located?
306                                metadataFileLoc = originals_dir
307                               
308                                #generate a new stubISO filename
309                                isoFormat= "stubISO"
310                               
311                                if self._NDG_dataProvider:
312                                                               
313                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
314                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
315                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
316                               
317                                else:
318                               
319                                        ident = ident.replace(":", "-")
320                                        ident = ident.replace("/", "-")
321                               
322                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
323                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
324                               
325                               
326                                self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
327                               
328                                #now create this stub ISO file on system so can access it
329                                self.stubIsoFile = stubIso_dir + new_filename_short
330                                FileUtilities.createFile(self.stubIsoFile, self.isoXML)
331                        logging.info("Stub ISO file created - at %s" %stubIso_dir)
332                       
333                        #elif format == 'MEDIN_v0.1':
334               
335                                #basicParameters=DatasetBasicParameters_MEDIN_v01(original_filename,self._datacentre_format)
336                             
337                               
338                        logging.info("original file = " + original_filename)
339                        logging.info("newfile = " + new_filename)
340                               
341                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
342                        #this links a derived filename in processing dir with original filename
343                        #get basic filename from the path+filename passed to this function to use as key               
344                        self.inputFileOrigFinal[new_filename_short]=filename
345                       
346                        # now correct any namespace issues
347                        try:
348                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
349                        except Exception, detail:
350                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
351                                logging.error("Detail: %s" %detail)
352                                logging.info("Continue with next file")
353                                continue
354                        numfilesproc += 1
355               
356                logging.info("File renaming and converting completed")
357                logging.info(self.lineSeparator)
358                       
359                return numfilesproc
360
361               
362        def _getPostgresDBConnection(self):
363                '''
364                Get the default postgres DB connection - by reading in data from the db config file
365                '''
366               
367                logging.debug("Setting up connection to postgres DB")
368                self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
369                logging.info("Postgres DB connection now set up")
370
371
372
373        def     _backupAndCleanData(self):
374                '''
375                Back up ingested data for specified data centre, then clean any of
376                the ingest dirs
377                '''
378                logging.info("Backing up ingested data")
379                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
380                  strftime("%y%m%d_%H%M") + "_originals/"
381               
382                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
383                logging.info("Data backed up - now clearing ingest directories")
384                #Clear out the original harvest records area and discovery dir
385                #FileUtilities.cleanDir(self.originals_dir)
386                #FileUtilities.cleanDir(self.discovery_dir)
387                logging.info("Ingest directories cleared")
388
389
390        def     _setupDataCentreDirs(self):
391                '''
392                Set up directories appropriate for the current data centre
393                '''
394                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
395                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
396               
397                # the following dirs define where the specific documents should go
398                self.originals_dir = data_dir + "/oai/originals/"
399                self.discovery_dir = data_dir + "/discovery/"
400                self.stubISO_dir = data_dir + "/stub_iso/"
401               
402                # Create/clear the 'in' and 'out' directories
403                FileUtilities.setUpDir(self.originals_dir)
404                FileUtilities.setUpDir(self.discovery_dir)
405                FileUtilities.setUpDir(self.stubISO_dir)
406               
407                logging.info("Ingest directories for data centre set up")
408
409
410        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
411                '''
412                Convert files from originals dir to discovery one, then
413                ingest, backup and clean
414                @param originals_dir: directory to ingest docs from
415                @param discovery_dir: directory to use to process ingested docs
416                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
417                '''
418                               
419                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir, self.stubISO_dir, format)
420                               
421                filenames = os.listdir(self.discovery_dir)
422               
423                #generate a list of files ALREADY in database so can compare with what has been ingested
424                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
425               
426               
427                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
428                filesPresentList=[]
429                if filePresentListArr:
430                        for fileArr in filePresentListArr:
431                                filesPresentList.append(fileArr[0])
432                               
433                                #TODO - is above relevant - duplicating functionlaity?
434       
435                #create list to to hold files ingest failed on.
436                self.thisIngestMonitor = IngestTracking()
437               
438                #self.updateFailList = []
439                #self.deletedFailList = []
440                #no_problem_files = 0
441               
442                for filename in filenames:
443                        fullPath = self.discovery_dir + filename
444                       
445                        if os.path.isfile(fullPath):
446
447                                #ingest file intelligently according to input formatfilename,datacentre_format,datacentre_groups,datacentre_groups                             
448                                if format == 'DIF' or format == 'MDIP':
449                                             
450                                        #thisIngestedID = ingestOriginal.addFileToPostgresDB_original(fullPath)
451                                       
452                                       
453                                        thisIngest = AbstractFormatIngesterOriginal()
454                                        thisIngestedID = thisIngest.ingest(fullPath,format,self._datacentre_groups,self._datacentre_namespace,self._xq,self.pgc,self.inputFileOrigFinal,self.thisIngestMonitor, self._NDG_dataProvider,self.stubIsoFile)
455                                        sys.exit()
456                                       
457                                                                                                       
458                                elif format == 'MEDIN_v0.1':
459                                        thisIngestedID, errorMsg = AbstractFormatIngester.addFileToPostgresDB_Medin_v01(fullPath)
460                               
461                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
462                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
463                                        if thisIngestedID in filesPresentList: 
464                                                filesPresentList.remove(thisIngestedID)                 
465                                               
466                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
467                #will need to be removed.
468               
469                #only do this if not in single file mode (or else wverything else gets deleted!)               
470                if ((self.indFileToIngest == "") & (feed)):
471                        for item in filesPresentList:
472                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
473                                DeleteRecord(item)
474                               
475                                #self.deletedFailList.append(item)
476                                #self._no_files_deleted += 1
477                                self.thisIngestMonitor.appendDeletedList(item)
478                                self.thisIngestMonitor.incrementDeletedFile()
479                       
480                       
481                self._backupAndCleanData()
482               
483                #at this stage put the reporting code in (will work for oai or atom feed)
484                #create a summary file for each data centre ingest
485                data_dir = self._base_dir + "data/"
486                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
487                recOpFile = open(recOpFileName,'w')
488               
489                logging.info("oai_document_ingest processing complete:")
490               
491                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
492                message = "Ingest report for data centre: " + datacentre + "\n"
493                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
494                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
495                message = message + "TOTAL PROCESSED " + str(numfilesproc) + "\n"
496                message = message + "INGESTED (Created) " + str(self.thisIngestMonitor.getIngestedFileNum())  + "\n"
497                message = message + "INGESTED (Updated) " + str(self.thisIngestMonitor.getChangedFileNum())  + "\n"
498                message = message + "DELETED " + str(self.thisIngestMonitor.getDeletedFileNum())  + "\n"
499                message = message + "PROBLEM FILES " + str(self.thisIngestMonitor.getProblemFileNum())  + "\n"
500               
501                '''recOpFile.write("Ingest report for data centre: " + datacentre + "\n")
502                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n")
503                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n")
504                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n")
505                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n")
506                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n")
507                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n")
508                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n")'''
509               
510               
511                #for badFile in self.updateFailList:
512                for badFile in self.thisIngestMonitor.getFailList():
513                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
514                        message = message +"PROBLEM_FILE " + badFile + "\n"
515                         
516                recOpFile.write(message)
517                               
518                return numfilesproc, message
519
520
521
522        def _setupXQueries(self):
523                '''
524                Set up the required XQueries
525                - NB, extract the xquery libraries locally for easy reference
526                '''
527                self._xq = ndgResources()
528                for libFile in self._xq.xqlib:
529                        # NB, we don't want the full path to the files - just the filename
530                        fileName = libFile.split('/')[-1]
531                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.