source: TI01-discovery/trunk/OAIInfoEditor/oai_info_editor/lib/harvester.py @ 5253

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/trunk/OAIInfoEditor/oai_info_editor/lib/harvester.py@5253
Revision 5253, 15.0 KB checked in by cbyrom, 11 years ago (diff)

Add code to allow ingest of docs when harvest button clicked - NB,
harvest completes first of all, then the ingest is ran asynchronously
since it can take some time. If the user has provided an email
address for the provider, the results are then mailed to them.

Line 
1'''
2 Class wrapping the jOAI Harvester API - to harvest specified provider repository data
3 
4 @author: C Byrom, Tessella Feb 2009
5'''
6import logging, commands, os, sys
7from oai_info_editor.model.repositoryinfo import RepositoryInfo
8from oai_info_editor.dal.providerinfodao import *
9from ndg.common.src.models.myconfig import myConfig
10from OAIBatch.oai_document_ingester import oai_document_ingester
11from ndg.common.src.lib.mailer import mailHandler
12import oai_info_editor.lib.constants as constants
13from threading import Thread
14
15class IngestThread(Thread):
16    '''
17    Class to allow ingesting of docs asynchronously - i.e. in a new thread
18    NB, the results of this are reported by email
19    '''
20   
21    def __init__ (self, harvester, providerName, harvestDir, format):
22        '''
23        Constructor for setting up thread for asynchronous ingest of docs
24        @param harvester: Harvester instance to do the ingesting
25        @param providerName: name of provider to ingest docs from
26        @param harvestDir: directory to ingest files from
27        @param format: format of docs to ingest
28        '''
29        logging.info("Setting up thread to ingest data for datacentre, '%s'" %providerName)
30        Thread.__init__(self)
31        self.harvester = harvester
32        self.providerName = providerName
33        self.harvestDir = harvestDir
34        self.format = format
35        logging.info("- finished setting up thread")
36
37     
38    def run(self):
39        logging.info("Running thread to ingest datacentre docs")
40        isSuccess, ingestMessage = self.harvester.ingestDocuments(self.providerName, 
41                                                                  self.harvestDir,
42                                                                  self.format)
43        logging.info("- finished ingesting data")
44        if self.harvester.userEmail:
45            logging.info("Sending notification mail to '%s'" %self.harvester.userEmail)
46            status, message = mailHandler([self.harvester.userEmail], 
47                                          constants.INGEST_RESULTS_TITLE %self.providerName, 
48                                          ingestMessage,
49                                          server = self.harvester.mailServer)
50            logging.info("- email sent")
51        logging.info("Ingest procedure complete")
52           
53
54class Harvester(object):
55
56    # this is the command used to run the java client
57    # NB, the inputs required are, outdir, baseURL and format
58    JAVA_COMMAND = 'java %s -cp lib/DLESETools.jar:lib/jdom-b7.jar:lib/xercesImpl.jar:lib/xml-apis.jar org.dlese.dpc.oai.harvester.Harvester %s %s %s'
59
60    def __init__(self, mailServer, outDir = None, 
61                  proxyHost = '130.246.135.176',
62                  proxyPort = '8080', configFile = None):
63        '''
64        Constructor - initialise the Harvester class
65        @param mailServer: server for sending notification mails from the asynch ingest
66        @keyword outDir: directory to harvest files to - NB,
67        this typically doesn't change for different harvests - which is why it
68        @keyword proxyHost: proxy host to use in comms - defaults to wwwcache.rl.ac.uk IP address
69        @keyword proxyPort: port for the proxy host to use - defaults to '8080'
70        is set in the constructor
71        @keyword configFile: ini file to define the outDir and the data for accessing the
72        OAI info editor data.  NB, the settings in configFile override the outDir keyword
73        '''
74        logging.debug("Initialising Harvester object")
75        self.mailServer = mailServer
76        self.outDir = outDir
77        self.proxyHost = proxyHost
78        self.proxyPort = proxyPort
79        self.cf = None
80        if configFile:
81            self.cf = myConfig(configFile)
82            outDir = self.cf.get('DATA_STORE', 'harvestDir')
83            if outDir:
84                self.outDir = outDir
85
86        self.dao = None # data access object for retrieving all providers info
87        self.ingester = None # ingester script for adding the harvested data to the Discovery service
88
89        logging.info("ProviderInfoDAO initialised")
90   
91       
92    def harvestRepository(self, repositoryInfo, outDir = None):
93        '''
94        Harvest docs from the specified repository
95        @param repositoryInfo: RepositoryInfo object with data on the repository to
96        be harvested
97        @keyword outDir: directory to harvest files to
98        @raise SystemError: if run harvester on a Windows machine
99        @raise ValueError: if repositoryInfo is not a RepositoryInfo object
100        @return status, outMessage, harvestDir: Status = True, if successful, False otherwise
101        outMessage = summary of harvest outcome, harvestDir = directory docs harvested to
102        '''
103        logging.info("Running data harvest")
104       
105        # check we're not running on windows - this doesn't work since the commands
106        # library for running system commands is only unix systems compatible
107        # NB, this would be better placed in the constructor - since this would stop
108        # the app from running in the first place; put it here temporarily to allow
109        # testing of the app on windows
110        if sys.platform.lower().startswith('win'):
111            raise SystemError("Harvest functionality does not work on Windows machines " + \
112                              "- application should be running on a Unix system.")
113
114        localDir = self.outDir
115        if outDir:
116            localDir = outDir
117           
118        if not isinstance(repositoryInfo, RepositoryInfo):
119            raise ValueError("Input object, '%s' is not of type 'RepositoryInfo'" %repositoryInfo)
120        logging.info("- for data at, '%s'" %repositoryInfo.url)
121
122        # NB, the harvester done via the web interface automatically creates a
123        # local dir for the repository info - using the url and format; to keep
124        # things consistent, do this here
125        localHarvestDir = self.__getLocalRepositoryDir(repositoryInfo, localDir)
126       
127        harvestCMD = self.__constructHarvestCommand(localHarvestDir, repositoryInfo)
128       
129        logging.debug(" - using command, '%s'" %harvestCMD)
130       
131        status, message = commands.getstatusoutput(harvestCMD)
132       
133        logging.debug("Harvest output: '%s'" %message)
134       
135        if status or message.find('cannotDisseminateFormat') > -1:
136            logging.error("Problem occurred whilst running harvest: %s" %message)
137            # NB, the harvest format is case dependent and this can vary across services!
138            # - if there is an error suggesting this is the problem, retry with opposite
139            # case for format
140            if message.find('cannotDisseminateFormat') > -1:
141                oldFormat = repositoryInfo.dataFormat
142                if oldFormat.islower():
143                    repositoryInfo.dataFormat = oldFormat.upper()
144                else:
145                    repositoryInfo.dataFormat = oldFormat.lower()
146                   
147                logging.info("- retrying harvest using format with new casing ('%s' vs '%s')" \
148                             %(oldFormat, repositoryInfo.dataFormat))
149
150                harvestCMD = self.__constructHarvestCommand(localHarvestDir, repositoryInfo)
151                logging.debug(" - using command, '%s'" %harvestCMD)
152               
153                status, message = commands.getstatusoutput(harvestCMD)
154                logging.debug("Harvest output: '%s'" %message)
155                if status or message.find('cannotDisseminateFormat') > -1:
156                    logging.error("Problem occurred whilst running harvest: %s" %message)
157
158        data = message.split('\n')
159        if status or message.find('cannotDisseminateFormat') > -1:
160            # retrieve pertinent part of error message to return to user
161            # NB, usually the penulimate line has the clearest digest of the error on it
162            # - just incase there are exceptions to this, return just the last line
163            outMessage = data[-1]
164            if len(data) > 1 and message.find('cannotDisseminateFormat') == -1:
165                outMessage = data[-2]
166               
167            # NB, the message will be displayed in a javascript pop up - this doesn't like
168            # apostrophes so remove any in the message string
169            outMessage = outMessage.replace('\'', '')
170            return False, outMessage, localHarvestDir
171
172        logging.info("- harvest completed successfully")
173        # NB, the last line has the summary of the harvest - so just return this
174        return True, data[-1], localHarvestDir
175
176
177    def ingestDocuments(self, providerName, harvestDir, dataFormat):
178        '''
179        Ingest harvested documents into the discovery service
180        @param providerName: Name of provider whose documents should be ingested
181        @param harvestDir: Directory to ingest docs from
182        @param dataFormat: format of data to ingest
183        @return status, outMessage: Status = True, if successful, False otherwise
184        outMessage = summary of ingest outcome
185        '''
186        logging.debug("Running ingest of documents")
187        if not self.ingester:
188            self.ingester = oai_document_ingester()
189
190        try:
191            isSuccess, result = self.ingester.processDataCentre(providerName, 
192                                                                harvestDir = harvestDir,
193                                                                dataFormat = dataFormat)
194        except Exception, e:
195            isSuccess = False
196            logging.error(e)
197            result = "Unexpected error occurred during ingest: %s" %str(e)
198
199        logging.debug(result)
200        logging.debug("Document ingest complete")
201        return isSuccess, result
202
203
204    def runHarvestAndIngest(self, providerName, repositoryInfo, outDir = None,
205                                                          ingestAsynch = False, userEmail = None):
206        '''
207        Harvest and ingest documents into the discovery service
208        @param providerName: Name of provider whose documents should be ingested
209        @param repositoryInfo: RepositoryInfo object with data on the repository to
210        be harvested
211        @keyword outDir: directory to harvest files to
212        @keyword ingestAsynch: if True, run the ingest stage in a separate thread. Default = False
213        @keyword userEmail: mail address to send notification mails to. Default = None
214        @return status, outMessage: Status = True, if successful, False otherwise
215        outMessage = summary of ingest outcome
216        '''
217        logging.debug("Running harvest then ingest")
218        self.userEmail = userEmail
219        isSuccess, outMessage, harvestDir = self.harvestRepository(repositoryInfo, outDir = outDir)
220        if isSuccess:
221            ingestMessage = ""
222            logging.debug("- harvest successful, so running ingest")
223            if ingestAsynch:
224                logging.debug("- running ingest asynchronously")
225                thread = IngestThread(self, providerName, harvestDir, 
226                                                  repositoryInfo.dataFormat)
227                thread.start()
228                ingestMessage += "  Document ingest is now running"
229                if not self.userEmail:
230                    ingestMessage += " - NB, if you wish to see the results, please include an email addresss with the provider data."
231                else:
232                    ingestMessage += " - once complete, results will be emailed to %s" %userEmail
233            else:
234                logging.debug("- running ingest synchronously")
235                isSuccess, ingestMessage = self.ingestDocuments(providerName, 
236                                                                                                                        harvestDir,
237                                                                                                                        repositoryInfo.dataFormat)
238           
239            outMessage += ingestMessage
240        else:
241            logging.debug(" - harvest failed so avoid document ingest")
242       
243        return isSuccess, outMessage
244       
245
246    def __getLocalRepositoryDir(self, repositoryInfo, baseDir):
247        '''
248        Use the input repository info to create a local dir for the harvested data
249        to be stored in - NB, this matches what is done by the jOAI web interface
250        - which oddly isn't replicated by the jOAI Harvester API
251        @param repositoryInfo: RepositoryInfo for the repository being harvested
252        @param baseDir: base directory for harvesting from
253        @return localHarvestDir: string dir representing the directory to harvest
254        the repository records to
255        '''
256        logging.debug("Determining local directory for harvest")
257        localHarvestDir = repositoryInfo.url
258        localHarvestDir = localHarvestDir.replace('http://', '')
259        localHarvestDir = localHarvestDir.split('/')[0]
260        localHarvestDir = localHarvestDir.replace('.', '-')
261        localHarvestDir = localHarvestDir.replace(':', '-')
262        localHarvestDir += '-%s' %repositoryInfo.dataFormat
263        localHarvestDir = baseDir + os.sep + localHarvestDir
264        logging.debug(" - harvesting to '%s'" %localHarvestDir)
265        return localHarvestDir
266   
267   
268    def __constructHarvestCommand(self, localHarvestDir, repositoryInfo):
269        '''
270        Construct the java command required to do the appropriate harvest
271        @param localHarvestDir: the directory to harvest to
272        @param repositoryInfo: the RepositoryInfo object representing the repository
273        to be harvested
274        '''
275        proxyInfo = ''
276        if self.proxyHost:
277            proxyInfo = '-DproxySet=true -DproxyHost=%s -DproxyPort=%s' \
278                %(self.proxyHost, self.proxyPort)
279
280        harvestCMD = self.JAVA_COMMAND %(proxyInfo, localHarvestDir, repositoryInfo.url, 
281                                         repositoryInfo.dataFormat)
282        if repositoryInfo.setSpec:
283            harvestCMD += ' -set:%s' %repositoryInfo.setSpec
284
285        if repositoryInfo.splitBySet:
286            harvestCMD += ' -splitBySet:%s' %repositoryInfo.splitBySet
287        return harvestCMD
288
289
290    def harvestAll(self):
291        '''
292        Harvest all data specified in the oai info editor files
293        '''
294        logging.info("Harvesting all available provider info")
295        if not self.cf:
296            raise ValueError("No config file available - cannot get info to harvest all provider info.")
297
298        if not self.dao:
299            self.dao = createDAOWithClient(client = FILE_CLIENT_TYPE, 
300                                           configFile = self.cf)
301       
302        pis = self.dao.getAllProviderInfo()
303        for pi in pis:
304            logging.info("Harvesting info for provider, '%s'" %pi.name)
305            for ri in pi.repositoryInfos:
306                logging.info(" - harvesting repository info, '%s'" %ri.name)
307                self.runHarvestAndIngest(pi.name, ri)
308               
309        logging.info("- harvesting complete")
310   
311       
312# entry point for running as a script - e.g. via crontab
313if __name__=="__main__":
314       
315    loggingLevel = logging.DEBUG
316    logging.basicConfig(level = loggingLevel,
317                        format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
318   
319    if len(sys.argv) < 2:
320        raise ValueError("Usage: harvester <configFile>\n\n- NB, config file should be the ini file used by the oai info editor.")
321    h = Harvester(configFile = sys.argv[1])
322    h.harvestAll()
323   
Note: See TracBrowser for help on using the repository browser.