source: TI01-discovery/trunk/OAIInfoEditor/oai_info_editor/lib/harvester.py @ 5539

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/trunk/OAIInfoEditor/oai_info_editor/lib/harvester.py@5539
Revision 5539, 15.4 KB checked in by sdonegan, 10 years ago (diff)

adjusted to correct dodgy urls caused by mod_wsgi install in links

Line 
1'''
2 Class wrapping the jOAI Harvester API - to harvest specified provider repository data
3 
4 @author: C Byrom, Tessella Feb 2009
5'''
6import logging, commands, os, sys
7from oai_info_editor.model.repositoryinfo import RepositoryInfo
8from oai_info_editor.dal.providerinfodao import *
9from ndg.common.src.models.myconfig import myConfig
10from OAIBatch.oai_document_ingester import oai_document_ingester
11from ndg.common.src.lib.mailer import mailHandler
12import oai_info_editor.lib.constants as constants
13from threading import Thread
14
15class IngestThread(Thread):
16    '''
17    Class to allow ingesting of docs asynchronously - i.e. in a new thread
18    NB, the results of this are reported by email
19    '''
20   
21    def __init__ (self, harvester, providerName, harvestDir, format):
22        '''
23        Constructor for setting up thread for asynchronous ingest of docs
24        @param harvester: Harvester instance to do the ingesting
25        @param providerName: name of provider to ingest docs from
26        @param harvestDir: directory to ingest files from
27        @param format: format of docs to ingest
28        '''
29        logging.info("Setting up thread to ingest data for datacentre, '%s'" %providerName)
30        Thread.__init__(self)
31        self.harvester = harvester
32        self.providerName = providerName
33        self.harvestDir = harvestDir
34        self.format = format
35        logging.info("- finished setting up thread")
36
37     
38    def run(self):
39        logging.info("Running thread to ingest datacentre docs")
40        isSuccess, ingestMessage = self.harvester.ingestDocuments(self.providerName, 
41                                                                  self.harvestDir,
42                                                                  self.format)
43        logging.info("- finished ingesting data")
44        if self.harvester.userEmail:
45            logging.info("Sending notification mail to '%s'" %self.harvester.userEmail)
46            status, message = mailHandler([self.harvester.userEmail], 
47                                          constants.INGEST_RESULTS_TITLE %self.providerName, 
48                                          ingestMessage,
49                                          server = self.harvester.mailServer)
50            logging.info("- email sent")
51        logging.info("Ingest procedure complete")
52           
53
54class Harvester(object):
55
56    # this is the command used to run the java client
57    # NB, the inputs required are, outdir, baseURL and format
58    # SJD - wsgi buildout screws up these relative paths, - use absolute for testing
59    #JAVA_COMMAND = 'java %s -cp lib/DLESETools.jar:lib/jdom-b7.jar:lib/xercesImpl.jar:lib/xml-apis.jar org.dlese.dpc.oai.harvester.Harvester %s %s %s'
60    JAVA_COMMAND = 'java %s -cp /usr/local/ndg-oai-info-editor/lib/DLESETools.jar:/usr/local/ndg-oai-info-editor/lib/jdom-b7.jar:/usr/local/ndg-oai-info-editor/lib/xercesImpl.jar:/usr/local/ndg-oai-info-editor/lib/xml-apis.jar org.dlese.dpc.oai.harvester.Harvester %s %s %s'
61   
62    def __init__(self, mailServer, outDir = None, 
63                  proxyHost = '130.246.135.176',
64                  proxyPort = '8080', configFile = None):
65        '''
66        Constructor - initialise the Harvester class
67        @param mailServer: server for sending notification mails from the asynch ingest
68        @keyword outDir: directory to harvest files to - NB,
69        this typically doesn't change for different harvests - which is why it
70        @keyword proxyHost: proxy host to use in comms - defaults to wwwcache.rl.ac.uk IP address
71        @keyword proxyPort: port for the proxy host to use - defaults to '8080'
72        is set in the constructor
73        @keyword configFile: ini file to define the outDir and the data for accessing the
74        OAI info editor data.  NB, the settings in configFile override the outDir keyword
75        '''
76        logging.debug("Initialising Harvester object")
77        self.mailServer = mailServer
78        self.outDir = outDir
79        self.proxyHost = proxyHost
80        self.proxyPort = proxyPort
81        self.cf = None
82        if configFile:
83            self.cf = myConfig(configFile)
84            outDir = self.cf.get('DATA_STORE', 'harvestDir')
85            if outDir:
86                self.outDir = outDir
87
88        self.dao = None # data access object for retrieving all providers info
89        self.ingester = None # ingester script for adding the harvested data to the Discovery service
90
91        logging.info("ProviderInfoDAO initialised")
92   
93       
94    def harvestRepository(self, repositoryInfo, outDir = None):
95        '''
96        Harvest docs from the specified repository
97        @param repositoryInfo: RepositoryInfo object with data on the repository to
98        be harvested
99        @keyword outDir: directory to harvest files to
100        @raise SystemError: if run harvester on a Windows machine
101        @raise ValueError: if repositoryInfo is not a RepositoryInfo object
102        @return status, outMessage, harvestDir: Status = True, if successful, False otherwise
103        outMessage = summary of harvest outcome, harvestDir = directory docs harvested to
104        '''
105        logging.info("Running data harvest")
106       
107        # check we're not running on windows - this doesn't work since the commands
108        # library for running system commands is only unix systems compatible
109        # NB, this would be better placed in the constructor - since this would stop
110        # the app from running in the first place; put it here temporarily to allow
111        # testing of the app on windows
112        if sys.platform.lower().startswith('win'):
113            raise SystemError("Harvest functionality does not work on Windows machines " + \
114                              "- application should be running on a Unix system.")
115
116        localDir = self.outDir
117        if outDir:
118            localDir = outDir
119           
120        if not isinstance(repositoryInfo, RepositoryInfo):
121            raise ValueError("Input object, '%s' is not of type 'RepositoryInfo'" %repositoryInfo)
122        logging.info("- for data at, '%s'" %repositoryInfo.url)
123
124        # NB, the harvester done via the web interface automatically creates a
125        # local dir for the repository info - using the url and format; to keep
126        # things consistent, do this here
127        localHarvestDir = self.__getLocalRepositoryDir(repositoryInfo, localDir)
128       
129        harvestCMD = self.__constructHarvestCommand(localHarvestDir, repositoryInfo)
130       
131        logging.info(" - using command, '%s'" %harvestCMD)
132       
133        status, message = commands.getstatusoutput(harvestCMD)
134       
135        logging.info("Harvest output: '%s'" %message)
136       
137        if status or message.find('cannotDisseminateFormat') > -1:
138            logging.error("Problem occurred whilst running harvest: %s" %message)
139            # NB, the harvest format is case dependent and this can vary across services!
140            # - if there is an error suggesting this is the problem, retry with opposite
141            # case for format
142            if message.find('cannotDisseminateFormat') > -1:
143                oldFormat = repositoryInfo.dataFormat
144                if oldFormat.islower():
145                    repositoryInfo.dataFormat = oldFormat.upper()
146                else:
147                    repositoryInfo.dataFormat = oldFormat.lower()
148                   
149                logging.info("- retrying harvest using format with new casing ('%s' vs '%s')" \
150                             %(oldFormat, repositoryInfo.dataFormat))
151
152                harvestCMD = self.__constructHarvestCommand(localHarvestDir, repositoryInfo)
153                logging.debug(" - using command, '%s'" %harvestCMD)
154               
155                status, message = commands.getstatusoutput(harvestCMD)
156                logging.debug("Harvest output: '%s'" %message)
157                if status or message.find('cannotDisseminateFormat') > -1:
158                    logging.error("Problem occurred whilst running harvest: %s" %message)
159
160        data = message.split('\n')
161        if status or message.find('cannotDisseminateFormat') > -1:
162            # retrieve pertinent part of error message to return to user
163            # NB, usually the penulimate line has the clearest digest of the error on it
164            # - just incase there are exceptions to this, return just the last line
165            outMessage = data[-1]
166            if len(data) > 1 and message.find('cannotDisseminateFormat') == -1:
167                outMessage = data[-2]
168               
169            # NB, the message will be displayed in a javascript pop up - this doesn't like
170            # apostrophes so remove any in the message string
171            outMessage = outMessage.replace('\'', '')
172            return False, outMessage, localHarvestDir
173
174        logging.info("- harvest completed successfully")
175        # NB, the last line has the summary of the harvest - so just return this
176        return True, data[-1], localHarvestDir
177
178
179    def ingestDocuments(self, providerName, harvestDir, dataFormat):
180        '''
181        Ingest harvested documents into the discovery service
182        @param providerName: Name of provider whose documents should be ingested
183        @param harvestDir: Directory to ingest docs from
184        @param dataFormat: format of data to ingest
185        @return status, outMessage: Status = True, if successful, False otherwise
186        outMessage = summary of ingest outcome
187        '''
188        logging.debug("Running ingest of documents")
189        if not self.ingester:
190            self.ingester = oai_document_ingester()
191
192        try:
193            logging.info("Harvest dir= " + harvestDir + "  data format= " + dataFormat)
194            isSuccess, result = self.ingester.processDataCentre(providerName, 
195                                                                harvestDir = harvestDir,
196                                                                dataFormat = dataFormat)
197        except Exception, e:
198            isSuccess = False
199            logging.error(e)
200            result = "Unexpected error occurred during ingest: %s" %str(e)
201
202        logging.debug(result)
203        logging.debug("Document ingest complete")
204        return isSuccess, result
205
206
207    def runHarvestAndIngest(self, providerName, repositoryInfo, outDir = None,
208                                                          ingestAsynch = False, userEmail = None):
209        '''
210        Harvest and ingest documents into the discovery service
211        @param providerName: Name of provider whose documents should be ingested
212        @param repositoryInfo: RepositoryInfo object with data on the repository to
213        be harvested
214        @keyword outDir: directory to harvest files to
215        @keyword ingestAsynch: if True, run the ingest stage in a separate thread. Default = False
216        @keyword userEmail: mail address to send notification mails to. Default = None
217        @return status, outMessage: Status = True, if successful, False otherwise
218        outMessage = summary of ingest outcome
219        '''
220        logging.debug("Running harvest then ingest")
221        self.userEmail = userEmail
222        isSuccess, outMessage, harvestDir = self.harvestRepository(repositoryInfo, outDir = outDir)
223        if isSuccess:
224            ingestMessage = ""
225            logging.debug("- harvest successful, so running ingest")
226            if ingestAsynch:
227                logging.debug("- running ingest asynchronously")
228                thread = IngestThread(self, providerName, harvestDir, 
229                                                  repositoryInfo.dataFormat)
230                thread.start()
231                ingestMessage += "  Document ingest is now running"
232                if not self.userEmail:
233                    ingestMessage += " - NB, if you wish to see the results, please include an email addresss with the provider data."
234                else:
235                    ingestMessage += " - once complete, results will be emailed to %s" %userEmail
236            else:
237                logging.debug("- running ingest synchronously")
238                isSuccess, ingestMessage = self.ingestDocuments(providerName, 
239                                                                                                                        harvestDir,
240                                                                                                                        repositoryInfo.dataFormat)
241           
242            outMessage += ingestMessage
243        else:
244            logging.debug(" - harvest failed so avoid document ingest")
245       
246        return isSuccess, outMessage
247       
248
249    def __getLocalRepositoryDir(self, repositoryInfo, baseDir):
250        '''
251        Use the input repository info to create a local dir for the harvested data
252        to be stored in - NB, this matches what is done by the jOAI web interface
253        - which oddly isn't replicated by the jOAI Harvester API
254        @param repositoryInfo: RepositoryInfo for the repository being harvested
255        @param baseDir: base directory for harvesting from
256        @return localHarvestDir: string dir representing the directory to harvest
257        the repository records to
258        '''
259        logging.debug("Determining local directory for harvest")
260        localHarvestDir = repositoryInfo.url
261        localHarvestDir = localHarvestDir.replace('http://', '')
262        localHarvestDir = localHarvestDir.split('/')[0]
263        localHarvestDir = localHarvestDir.replace('.', '-')
264        localHarvestDir = localHarvestDir.replace(':', '-')
265        localHarvestDir += '-%s' %repositoryInfo.dataFormat
266        localHarvestDir = baseDir + os.sep + localHarvestDir
267        logging.debug(" - harvesting to '%s'" %localHarvestDir)
268        return localHarvestDir
269   
270   
271    def __constructHarvestCommand(self, localHarvestDir, repositoryInfo):
272        '''
273        Construct the java command required to do the appropriate harvest
274        @param localHarvestDir: the directory to harvest to
275        @param repositoryInfo: the RepositoryInfo object representing the repository
276        to be harvested
277        '''
278        proxyInfo = ''
279        if self.proxyHost:
280            proxyInfo = '-DproxySet=true -DproxyHost=%s -DproxyPort=%s' \
281                %(self.proxyHost, self.proxyPort)
282
283        harvestCMD = self.JAVA_COMMAND %(proxyInfo, localHarvestDir, repositoryInfo.url, 
284                                         repositoryInfo.dataFormat)
285        if repositoryInfo.setSpec:
286            harvestCMD += ' -set:%s' %repositoryInfo.setSpec
287
288        if repositoryInfo.splitBySet:
289            harvestCMD += ' -splitBySet:%s' %repositoryInfo.splitBySet
290        return harvestCMD
291
292
293    def harvestAll(self):
294        '''
295        Harvest all data specified in the oai info editor files
296        '''
297        logging.info("Harvesting all available provider info")
298        if not self.cf:
299            raise ValueError("No config file available - cannot get info to harvest all provider info.")
300
301        if not self.dao:
302            self.dao = createDAOWithClient(client = FILE_CLIENT_TYPE, 
303                                           configFile = self.cf)
304       
305        pis = self.dao.getAllProviderInfo()
306        for pi in pis:
307            logging.info("Harvesting info for provider, '%s'" %pi.name)
308            for ri in pi.repositoryInfos:
309                logging.info(" - harvesting repository info, '%s'" %ri.name)
310                self.runHarvestAndIngest(pi.name, ri)
311               
312        logging.info("- harvesting complete")
313   
314       
315# entry point for running as a script - e.g. via crontab
316if __name__=="__main__":
317       
318    loggingLevel = logging.DEBUG
319    logging.basicConfig(level = loggingLevel,
320                        format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
321   
322    if len(sys.argv) < 2:
323        raise ValueError("Usage: harvester <configFile>\n\n- NB, config file should be the ini file used by the oai info editor.")
324    h = Harvester(configFile = sys.argv[1])
325    h.harvestAll()
326   
Note: See TracBrowser for help on using the repository browser.