source: TI01-discovery/trunk/OAIInfoEditor/oai_info_editor/lib/harvester.py @ 6426

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/trunk/OAIInfoEditor/oai_info_editor/lib/harvester.py@6426
Revision 6426, 18.4 KB checked in by sdonegan, 10 years ago (diff)

Updated to handle OAI ingest configuration file

Line 
1'''
2 Class wrapping the jOAI Harvester API - to harvest specified provider repository data
3 
4 @author: C Byrom, Tessella Feb 2009
5'''
6import logging, commands, os, sys,string
7from oai_info_editor.model.repositoryinfo import RepositoryInfo
8from oai_info_editor.dal.providerinfodao import *
9from ndg.common.src.models.myconfig import myConfig
10from OAIBatch.oai_document_ingester import oai_document_ingester
11from ndg.common.src.lib.mailer import mailHandler
12import oai_info_editor.lib.constants as constants
13from threading import Thread
14
15class IngestThread(Thread):
16    '''
17    Class to allow ingesting of docs asynchronously - i.e. in a new thread
18    NB, the results of this are reported by email
19    '''
20   
21    def __init__ (self, harvester, providerName, harvestDir, format, configurationFile):
22        '''
23        Constructor for setting up thread for asynchronous ingest of docs
24        @param harvester: Harvester instance to do the ingesting
25        @param providerName: name of provider to ingest docs from
26        @param harvestDir: directory to ingest files from
27        @param format: format of docs to ingest
28        '''
29        logging.info("Setting up thread to ingest data for datacentre, '%s'" %providerName)
30       
31        Thread.__init__(self)
32        self.harvester = harvester
33        self.providerName = providerName
34        self.harvestDir = harvestDir
35        self.format = format
36        self.processingConf = configurationFile
37        logging.info("- finished setting up thread")
38               
39     
40    def run(self):
41        logging.info("Running thread to ingest datacentre docs")
42       
43        isSuccess, ingestMessage = self.harvester.ingestDocuments(self.providerName, 
44                                                                  self.harvestDir,
45                                                                  self.format, self.processingConf)
46        logging.info("- finished ingesting data")
47        if self.harvester.userEmail:
48            logging.info("Sending notification mail to '%s'" %self.harvester.userEmail)
49            status, message = mailHandler([self.harvester.userEmail], 
50                                          constants.INGEST_RESULTS_TITLE %self.providerName, 
51                                          ingestMessage,
52                                          server = self.harvester.mailServer)
53            logging.info("- email sent")
54        logging.info("Ingest procedure complete")
55           
56
57class Harvester(object):
58
59    # this is the command used to run the java client
60    # NB, the inputs required are, outdir, baseURL and format
61    # SJD - wsgi buildout screws up these relative paths, - use absolute for testing
62    #JAVA_COMMAND = 'java %s -cp lib/DLESETools.jar:lib/jdom-b7.jar:lib/xercesImpl.jar:lib/xml-apis.jar org.dlese.dpc.oai.harvester.Harvester %s %s %s'
63    JAVA_COMMAND = 'java %s -cp /usr/local/ndg-oai-info-editor/lib/DLESETools.jar:/usr/local/ndg-oai-info-editor/lib/jdom-b7.jar:/usr/local/ndg-oai-info-editor/lib/xercesImpl.jar:/usr/local/ndg-oai-info-editor/lib/xml-apis.jar org.dlese.dpc.oai.harvester.Harvester %s %s %s'
64   
65    def __init__(self, mailServer, outDir = None, 
66                  proxyHost = '130.246.135.176',
67                  proxyPort = '8080', configFile = None):
68        '''
69        Constructor - initialise the Harvester class
70        @param mailServer: server for sending notification mails from the asynch ingest
71        @keyword outDir: directory to harvest files to - NB,
72        this typically doesn't change for different harvests - which is why it
73        @keyword proxyHost: proxy host to use in comms - defaults to wwwcache.rl.ac.uk IP address
74        @keyword proxyPort: port for the proxy host to use - defaults to '8080'
75        is set in the constructor
76        @keyword configFile: ini file to define the outDir and the data for accessing the
77        OAI info editor data.  NB, the settings in configFile override the outDir keyword
78        '''
79       
80        logging.debug("Initialising Harvester object")
81        self.mailServer = mailServer
82        #self.outDir = outDir
83        self.proxyHost = proxyHost
84        self.proxyPort = proxyPort
85        self.cf = None
86       
87        self.oaiProcessingConfigFile = configFile
88       
89        if configFile:
90               
91                self.processingDict = self.getProcessingConfig(configFile)
92               
93                self.outDir = self.processingDict['base_directory']
94                logging.info("Will harvest data to: " + self.outDir)
95       
96        else:
97                logging.warn("Problem extracting location to harvest data to..")
98                self.outDir = '.' #temp fudge
99               
100                '''
101            self.cf = myConfig(configFile)
102            outDir = self.cf.get('DATA_STORE', 'harvestDir')
103            if outDir:
104                self.outDir = outDir
105            '''
106       
107        #get the location of the oai_document_ingester.config file so can properly run this
108                #ddd = self.cf.get('DATA_STORE', 'oaiConfigFile')
109
110       
111
112        self.dao = None # data access object for retrieving all providers info
113        self.ingester = None # ingester script for adding the harvested data to the Discovery service
114
115        logging.info("ProviderInfoDAO initialised")
116       
117       
118    def getProcessingConfig(self,configFilePath):
119               
120                '''
121                Fed up with messing about with hardcoded directory paths. 
122                This method to get relevant values out of the oai_document_ingester.config file
123               
124                Returns a dictionary of values with keys:
125               
126                #directory in which the code resides
127                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
128
129                #base directory in which metadata is extracted to and converted to
130                base_directory /home/badc/discovery_docs/ingestDocs/
131
132                #directory in which to write reports to
133                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
134
135                #path to the passwords file
136                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
137
138                #datacentre config file directory path
139                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
140                '''
141               
142                # Check this file exists; if not, assume an invalid datacentre has been specified
143               
144                if not os.path.isfile(configFilePath):
145                    sys.exit("ERROR: Could not find the processing config file")
146                   
147                processingConfig = {}
148                   
149                processing_config_file = open(configFilePath, "r")
150               
151                for line in processing_config_file.readlines():
152                        words  = string.split(line)
153                        if len(words) == 0:
154                                continue
155                        elif words[0] == 'code_directory':
156                                processingConfig['code_directory'] = words[1]                                                           
157                        elif words[0] == 'base_directory':
158                                processingConfig['base_directory'] = words[1]
159                        elif words[0] == 'reporting_directory':
160                                processingConfig['reporting_directory'] = words[1]
161                        elif words[0] == 'passwords_file':
162                                processingConfig['passwords_file'] = words[1]
163                        elif words[0] == 'datcentre_configs':
164                                processingConfig['datcentre_configs'] = words[1]       
165                        elif words[0] == 'NDG_redirect_URL':
166                                processingConfig['NDG_redirect_URL'] = words[1]
167                        elif words[0] == 'ingestConfig':
168                                processingConfig['ingestConfig'] = words[1]
169                                                                                                       
170                return processingConfig
171       
172   
173       
174    def harvestRepository(self, repositoryInfo, outDir = None):
175        '''
176        Harvest docs from the specified repository
177        @param repositoryInfo: RepositoryInfo object with data on the repository to
178        be harvested
179        @keyword outDir: directory to harvest files to
180        @raise SystemError: if run harvester on a Windows machine
181        @raise ValueError: if repositoryInfo is not a RepositoryInfo object
182        @return status, outMessage, harvestDir: Status = True, if successful, False otherwise
183        outMessage = summary of harvest outcome, harvestDir = directory docs harvested to
184        '''
185       
186       
187                       
188        logging.info("Running data harvest")
189       
190        # check we're not running on windows - this doesn't work since the commands
191        # library for running system commands is only unix systems compatible
192        # NB, this would be better placed in the constructor - since this would stop
193        # the app from running in the first place; put it here temporarily to allow
194        # testing of the app on windows
195        if sys.platform.lower().startswith('win'):
196            raise SystemError("Harvest functionality does not work on Windows machines " + \
197                              "- application should be running on a Unix system.")
198
199        localDir = self.outDir
200        if outDir:
201            localDir = outDir
202           
203        if not isinstance(repositoryInfo, RepositoryInfo):
204            raise ValueError("Input object, '%s' is not of type 'RepositoryInfo'" %repositoryInfo)
205        logging.info("- for data at, '%s'" %repositoryInfo.url)
206
207        # NB, the harvester done via the web interface automatically creates a
208        # local dir for the repository info - using the url and format; to keep
209        # things consistent, do this here
210        localHarvestDir = self.__getLocalRepositoryDir(repositoryInfo, localDir)
211       
212        harvestCMD = self.__constructHarvestCommand(localHarvestDir, repositoryInfo)
213       
214        logging.info(" - using command, '%s'" %harvestCMD)
215       
216        status, message = commands.getstatusoutput(harvestCMD)
217       
218        logging.info("Harvest output: '%s'" %message)
219       
220        if status or message.find('cannotDisseminateFormat') > -1:
221            logging.error("Problem occurred whilst running harvest: %s" %message)
222            # NB, the harvest format is case dependent and this can vary across services!
223            # - if there is an error suggesting this is the problem, retry with opposite
224            # case for format
225            if message.find('cannotDisseminateFormat') > -1:
226                oldFormat = repositoryInfo.dataFormat
227                if oldFormat.islower():
228                    repositoryInfo.dataFormat = oldFormat.upper()
229                else:
230                    repositoryInfo.dataFormat = oldFormat.lower()
231                   
232                logging.info("- retrying harvest using format with new casing ('%s' vs '%s')" \
233                             %(oldFormat, repositoryInfo.dataFormat))
234
235                harvestCMD = self.__constructHarvestCommand(localHarvestDir, repositoryInfo)
236                logging.debug(" - using command, '%s'" %harvestCMD)
237               
238                status, message = commands.getstatusoutput(harvestCMD)
239                logging.debug("Harvest output: '%s'" %message)
240                if status or message.find('cannotDisseminateFormat') > -1:
241                    logging.error("Problem occurred whilst running harvest: %s" %message)
242
243        data = message.split('\n')
244        if status or message.find('cannotDisseminateFormat') > -1:
245            # retrieve pertinent part of error message to return to user
246            # NB, usually the penulimate line has the clearest digest of the error on it
247            # - just incase there are exceptions to this, return just the last line
248            outMessage = data[-1]
249            if len(data) > 1 and message.find('cannotDisseminateFormat') == -1:
250                outMessage = data[-2]
251               
252            # NB, the message will be displayed in a javascript pop up - this doesn't like
253            # apostrophes so remove any in the message string
254            outMessage = outMessage.replace('\'', '')
255            return False, outMessage, localHarvestDir
256
257        logging.info("- harvest completed successfully")
258        # NB, the last line has the summary of the harvest - so just return this
259        return True, data[-1], localHarvestDir
260
261
262    def ingestDocuments(self, providerName, harvestDir, dataFormat, processingConfig):
263        '''
264        Ingest harvested documents into the discovery service
265        @param providerName: Name of provider whose documents should be ingested
266        @param harvestDir: Directory to ingest docs from
267        @param dataFormat: format of data to ingest
268        @return status, outMessage: Status = True, if successful, False otherwise
269        outMessage = summary of ingest outcome
270        '''
271        logging.debug("Running ingest of documents")
272       
273       
274       
275        if not self.ingester:
276            self.ingester = oai_document_ingester()
277
278        try:
279            logging.info("Harvest dir= " + harvestDir + "  data format= " + dataFormat)
280           
281            isSuccess, result = self.ingester.processDataCentre(providerName, 
282                                                                harvestDir = harvestDir,
283                                                                dataFormat = dataFormat,
284                                                                configFileName = processingConfig)
285        except Exception, e:
286            isSuccess = False
287            logging.error(e)
288            result = "Unexpected error occurred during ingest: %s" %str(e)
289
290        logging.debug(result)
291        logging.debug("Document ingest complete")
292        return isSuccess, result
293
294
295    def runHarvestAndIngest(self, providerName, repositoryInfo, outDir = None,
296                                                          ingestAsynch = False, userEmail = None):
297        '''
298        Harvest and ingest documents into the discovery service
299        @param providerName: Name of provider whose documents should be ingested
300        @param repositoryInfo: RepositoryInfo object with data on the repository to
301        be harvested
302        @keyword outDir: directory to harvest files to
303        @keyword ingestAsynch: if True, run the ingest stage in a separate thread. Default = False
304        @keyword userEmail: mail address to send notification mails to. Default = None
305        @return status, outMessage: Status = True, if successful, False otherwise
306        outMessage = summary of ingest outcome
307        '''
308        logging.debug("Running harvest then ingest")
309        self.userEmail = userEmail
310       
311        isSuccess, outMessage, harvestDir = self.harvestRepository(repositoryInfo, outDir = outDir)
312       
313        if isSuccess:
314            ingestMessage = ""
315            logging.debug("- harvest successful, so running ingest")
316            if ingestAsynch:
317                logging.debug("- running ingest asynchronously")
318                thread = IngestThread(self, providerName, harvestDir, 
319                                                  repositoryInfo.dataFormat, self.oaiProcessingConfigFile)
320                thread.start()
321                ingestMessage += "  Document ingest is now running"
322                if not self.userEmail:
323                    ingestMessage += " - NB, if you wish to see the results, please include an email addresss with the provider data."
324                else:
325                    ingestMessage += " - once complete, results will be emailed to %s" %userEmail
326            else:
327                logging.debug("- running ingest synchronously")
328                isSuccess, ingestMessage = self.ingestDocuments(providerName, 
329                                                                                                                        harvestDir,
330                                                                                                                        repositoryInfo.dataFormat)
331           
332            outMessage += ingestMessage
333        else:
334            logging.debug(" - harvest failed so avoid document ingest")
335       
336        return isSuccess, outMessage
337       
338
339    def __getLocalRepositoryDir(self, repositoryInfo, baseDir):
340        '''
341        Use the input repository info to create a local dir for the harvested data
342        to be stored in - NB, this matches what is done by the jOAI web interface
343        - which oddly isn't replicated by the jOAI Harvester API
344        @param repositoryInfo: RepositoryInfo for the repository being harvested
345        @param baseDir: base directory for harvesting from
346        @return localHarvestDir: string dir representing the directory to harvest
347        the repository records to
348        '''
349        logging.debug("Determining local directory for harvest")
350        localHarvestDir = repositoryInfo.url
351        localHarvestDir = localHarvestDir.replace('http://', '')
352        localHarvestDir = localHarvestDir.split('/')[0]
353        localHarvestDir = localHarvestDir.replace('.', '-')
354        localHarvestDir = localHarvestDir.replace(':', '-')
355        localHarvestDir += '-%s' %repositoryInfo.dataFormat
356        localHarvestDir = baseDir + os.sep + localHarvestDir
357        logging.debug(" - harvesting to '%s'" %localHarvestDir)
358        return localHarvestDir
359   
360   
361    def __constructHarvestCommand(self, localHarvestDir, repositoryInfo):
362        '''
363        Construct the java command required to do the appropriate harvest
364        @param localHarvestDir: the directory to harvest to
365        @param repositoryInfo: the RepositoryInfo object representing the repository
366        to be harvested
367        '''
368        proxyInfo = ''
369        if self.proxyHost:
370            proxyInfo = '-DproxySet=true -DproxyHost=%s -DproxyPort=%s' \
371                %(self.proxyHost, self.proxyPort)
372
373        harvestCMD = self.JAVA_COMMAND %(proxyInfo, localHarvestDir, repositoryInfo.url, 
374                                         repositoryInfo.dataFormat)
375        if repositoryInfo.setSpec:
376            harvestCMD += ' -set:%s' %repositoryInfo.setSpec
377
378        if repositoryInfo.splitBySet:
379            harvestCMD += ' -splitBySet:%s' %repositoryInfo.splitBySet
380        return harvestCMD
381
382
383    def harvestAll(self):
384        '''
385        Harvest all data specified in the oai info editor files
386        '''
387        logging.info("Harvesting all available provider info")
388        if not self.cf:
389            raise ValueError("No config file available - cannot get info to harvest all provider info.")
390
391        if not self.dao:
392            self.dao = createDAOWithClient(client = FILE_CLIENT_TYPE, 
393                                           configFile = self.cf)
394       
395        pis = self.dao.getAllProviderInfo()
396        for pi in pis:
397            logging.info("Harvesting info for provider, '%s'" %pi.name)
398            for ri in pi.repositoryInfos:
399                logging.info(" - harvesting repository info, '%s'" %ri.name)
400                self.runHarvestAndIngest(pi.name, ri)
401               
402        logging.info("- harvesting complete")
403   
404       
405# entry point for running as a script - e.g. via crontab
406if __name__=="__main__":
407       
408    loggingLevel = logging.DEBUG
409    logging.basicConfig(level = loggingLevel,
410                        format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
411   
412    if len(sys.argv) < 2:
413        raise ValueError("Usage: harvester <configFile>\n\n- NB, config file should be the ini file used by the oai info editor.")
414    h = Harvester(configFile = sys.argv[1])
415    h.harvestAll()
416   
Note: See TracBrowser for help on using the repository browser.