source: cows_wps/trunk/cows_wps/controllers/wps.py @ 5615

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/controllers/wps.py@5615
Revision 5615, 22.4 KB checked in by spascoe, 11 years ago (diff)

COWS WPS package copied from
 http://proj.badc.rl.ac.uk/svn/dcip/cows-wps/trunk.

This is a stripped down version of the DDP WPS. Some features are
removed and others have been deactivated until we reimplement them in a
more generic way.

Line 
1#!TODO: CEDA license agreement.  See cows/add_license.py for script to do this.
2
3import logging
4import sys
5import imp
6import traceback
7import urllib
8import os
9import time
10from datetime import datetime
11import random
12from pylons import response
13
14#import from cows
15from cows.model import PossibleValues, WGS84BoundingBox, BoundingBox, Contents
16from cows.pylons import ows_controller
17from cows.exceptions import *
18from cows import helpers
19from cows.builder import loadConfigFile
20
21# Import local modules
22from cows_wps.controllers import *
23from cows_wps.utils.create_process_config import createProcessConfig
24from routes import url_for
25from cows_wps.process_handler import validate_arguments
26from cows_wps.process_handler.context.process_context import ProcessContext
27from cows_wps.process_handler.context.process_status import STATUS
28from cows_wps.model.managers import requestManager, cacheManager, caching_enabled, workerPool
29from cows_wps.model.process_dir import processDirManager
30from cows_wps.renderer import xml_renderer
31from cows_wps.process_handler.fileset import FLAG
32from cows_wps.sge.sge_py_int import SgePyInterface
33
34import cows_wps.utils.common as common_utils
35
36#import config files
37from cows_wps.utils.parse_wps_config import wps_config_dict
38from cows_wps.utils.parse_capabilities_config import caps_config_dict
39from cows_wps.utils.parse_outputs_rules import outputs_rules_config_dict
40
41log = logging.getLogger(__name__)
42#-----------------------------------------------------------------------------
43
44
45# Define global variables
46
47class Operation(object):
48    def __init__(self, name, href):
49        self.name = name
50        self.href = href
51
52   
53def addOperation(ops, name):
54    href=helpers.operation(url_for(qualified=True, action="index")+'?',[]).get.href
55    ops.append(Operation(name,href))
56
57
58ops =[]
59log.info("Building config file")
60addOperation(ops, "GetCapabilities")
61addOperation(ops,"DescribeProcess")
62addOperation(ops, "Execute") 
63
64#-----------------------------------------------------------------------------
65
66
67
68class WpsController(ows_controller.OWSController):
69    #-------------------------------------------------------------------------
70    # Attributes required by OWSController
71
72    service = 'WPS'
73    owsOperations = (ows_controller.OWSController.owsOperations +
74                     ['DescribeProcess', 'Execute'])
75    validVersions = ['0.4.0']
76   
77
78    #-------------------------------------------------------------------------
79
80    # Cababilities information is built-up in I{ops} above, therefore no need
81    # for _loadCapabilities()
82    #
83    #def _loadCapabilities(self):
84    #
85   
86    def _renderCapabilities(self, version, format):
87        t= templateLoader.load("wps_getcapabilitiesresponse_template.xml")
88   
89        return t.generate(caps_config_dict=caps_config_dict, ops=ops, c=c).render()
90
91    def DescribeProcess(self):
92        ids = caps_config_dict['Identifiers']
93        t = templateLoader.load('wps_describeprocess_response.xml')
94           
95        # Get identifier
96        identifier = self.getOwsParam('identifier')
97
98        # Handle the case where all descriptions are requested
99        if identifier == 'all':
100           
101            return t.generate(ids=ids).render()
102        # Handle specific process
103        if identifier in ids:
104            response.content_type = 'text/xml'
105            return t.generate(caps_config_dict=caps_config_dict, ids=[identifier]).render()
106        else:
107            raise InvalidParameterValue('Identifier not found', 'identifier')
108
109
110    def _parseDataInputs(self, dataInputs):
111        # If dataInputs is blank return empty dictionary
112        if not dataInputs:
113            return {}
114
115        arg_dict = {}
116        args = dataInputs.split(",")
117        if len(args) %2 != 0:
118            raise ValueError("Must provide even number of arguments in query string for datainput argument (all comma-separated).")
119
120        if len(args) == 1 and args[0] == "": args = []
121
122        while len(args) > 0:
123            arg,value = args[:2]
124            args = args[2:]
125
126            value = urllib.unquote(value)
127
128            if value.find("|") > -1:
129                value = value.split("|")
130
131            arg_dict[arg] = value
132
133        return arg_dict
134
135       
136    def Execute(self):
137        log.info("entering controllers.wps.Execute")
138
139        st = time.time()
140       
141        identifier = self.getOwsParam('Identifier')
142       
143        if identifier not in caps_config_dict['Identifiers']:
144            raise InvalidParameterValue('Identifier not found', 'identifier')
145       
146        process_conf = caps_config_dict[identifier]
147        process_type = process_conf["wps_interface"]["process_type"]
148
149        # Since we generate the execute response document on demand we can ignore the store option.
150        # It might be necessary to reflect it's value in ExecuteResponse for WPS compliance
151        store = self._getBooleanOwsParam('StoreExecuteResponse')
152        if store == True:
153            if not caps_config_dict[identifier]['wps_interface']['store']:
154                raise InvalidParameterValue('Storing output is not supported for this process')
155           
156        #!TODO: Clarify this.  We can always support status because the ExecuteResponse document is created on-demand
157        status = self._getBooleanOwsParam('Status')
158        if status == True:
159            if not caps_config_dict[identifier]['wps_interface']['status']:
160                raise InvalidParameterValue('Status not supported for this process')
161            elif not store:
162                raise InvalidParameterValue('Both status and store must be true')
163
164        #!NOTE: Lineage effects what is put in the ExecuteResponse document.  It should end up in the
165        #    config section of the ProcessContext.
166        self.lineage = self._getBooleanOwsParam('lineage', default='true')
167       
168        #!TODO: Check these custom arguments are in the capabilities document.
169        costonly = self._getBooleanOwsParam('Costonly')
170        inform = self._getBooleanOwsParam('Inform')           
171       
172        #######################################################################
173        # Parse and validate arguments.  First the DataInputs are extracted from
174        # the OWS params then they are validated and deserialised into python
175        # objects.
176        #
177        #   input_dict: plain string values of DataInputs
178        #   arg_dict:   validated against process.ini and deserialised
179        dataInputs = self.getOwsParam('DataInputs')
180        log.debug("dataInputs = %s" % (dataInputs,))
181        input_dict = self._parseDataInputs(dataInputs)
182        va = validate_arguments.ValidateArguments(identifier, input_dict)
183               
184        # Exceptions here will be caught by COWS
185        #!TODO: include more info about the process in the exception by catching and
186        #       rethrowing the exception.
187        try:
188            arg_dict = va.validate()
189        except Exception, err:
190            exc_info = sys.exc_info()
191            log.warning("exception thrown from validator:")
192            log.warning("traceback.format_exc() = %s" % (traceback.format_exc(),))
193           
194            raise InvalidParameterValue('Invalid parameters',err)
195       
196       
197        #######################################################################
198       
199        #######################################################################
200        #!NOTE: RawDataOutput and ResponseDocument handling has been removed from
201        #    the code.  They aren't needed for DCIP and more thought is needed.
202        #######################################################################
203
204        requestId = common_utils.getUniqueId()
205           
206        # Username is a DCIP-specific parameter
207        if arg_dict.has_key('Username'):
208            username = arg_dict['Username']
209        else:
210            raise ValueError('Username not specified')
211       
212       
213        #!TODO: Reimplement user management
214        (loggedIn, email) = (True, "nobody@nowhere.com")
215       
216        if loggedIn == False:
217            raise OwsError("User %s is not logged into the system." % (username))
218
219        log.debug("identifier=%s, username=%s requestId=%s costonly=%s lineage=%s inform=%s" % \
220                  (identifier, username, requestId, costonly, self.lineage, inform))
221
222        # Update the cache with any completed requests before consulting the cache
223        requestManager.updateRunningRequests()
224       
225        # check if the process results already exist in the cache
226        if caching_enabled:
227            cachedJobId = cacheManager.getCachedJobId(identifier, arg_dict)
228        else:
229            cachedJobId = None
230
231        log.debug("cachedJobId = %s" % (cachedJobId,))
232
233        if cachedJobId:
234            log.debug("request is cached, cacheJobId=%s" % cachedJobId)
235            context = requestManager.addCachedRequest(requestId, cachedJobId, username)
236           
237            context.outputs['job_details']['lineage'] = self.lineage
238           
239            #!TODO: The new requestId will not show up on the jobs controller because it doesn't really exist.
240            #    I.e. it doesn't have a processDir.  We need to create process records that point to the
241            #    cached values.
242           
243            responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
244                                                           self.validVersions[0], 
245                                                           input_dict, requestId,
246                                                           is_cached=True)
247       
248
249            context.close()
250            response.content_type = 'text/xml'
251            return responseXML
252
253       
254       
255       
256        context = self._makeProcessContext(identifier, requestId, process_type, arg_dict)
257        processCallable = self._loadProcess(context, identifier)
258       
259        if process_type in ["async", "async_l", "async_s"]:
260
261            if not hasattr(processCallable, 'dryRun'):
262                raise RuntimeError('Dry Run not supported for asynchronous process %s.' % processCallable)
263
264
265            context.config['Inform'] = str(inform)
266            context.config['notify_email_to'] = email
267           
268            log.info("Running dryrun for non cached async job")
269            self._runProcessDryRun(processCallable, context, requestId)
270           
271            #if the dryRun ran successfully
272            if context.status == STATUS.ACCEPTED:
273               
274                #after dryRun we can deduce the correct process type           
275                if process_type == 'async':
276                    process_type = self._deduceProcessType(context)
277           
278                (accept, message) = self._checkAsyncQueueLimit(context, 
279                                            requestId, process_type, username)
280           
281                if costonly:
282                    if accept:
283                        context.outputs['job_details']['costonly_status'] = ['cleared', 'The request is ready  for execution']
284                    else:
285                        context.outputs['job_details']['costonly_status'] = ['withheld', message]
286                       
287                    context.save()
288
289                else:
290               
291                    if accept:
292                        #run the async process
293                        log.info("Running %s job, callable = %s" % (process_type, processCallable,))
294                        processQueue = self._selectQueue(identifier, process_type)
295                       
296                        self._submitAsyncProcess(identifier, requestId, context, processQueue)
297                    else:
298                        context.setStatus(STATUS.FAILED, message)
299                        context.save()
300       
301        else: # process type is sync
302           
303            if inform == True:
304                log.warning("inform=True found for syncronouse process, ignoring it. \n(identifier = %s request_id = %s, inputs=%s" \
305                            % (identifier, requestId, arg_dict))
306           
307               
308            requestManager.addSyncRequest(requestId, identifier, context)
309                             
310            if hasattr(processCallable, 'dryRun'):
311                self._runProcessDryRun(processCallable, context, requestId)
312           
313            log.debug("Context status = %s" % (context.status,) )
314           
315            if context.status != STATUS.FAILED:
316
317                #!NOTE: Now all sync processes run in a worker pool
318                if process_type == 'sync':
319                    log.info("Running sync job")
320                    pcs = self._getProcessCallableString(identifier)
321                    self._runSubprocProcess(pcs, context, identifier, requestId)
322                                       
323            log.debug("updating request status to context.status( = %s)" % (context.status,))
324           
325            context.save()
326            #update the status in the database
327            requestManager.updateRequestStatus(requestId, context.status)
328           
329            #!TODO:Will this caching be done by the status change listener in requestManager?
330            if caching_enabled and context.status == STATUS.COMPLETED:
331                cacheManager.cacheRequest(requestId)
332           
333        responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
334                                                       self.validVersions[0], 
335                                                       input_dict, requestId,
336                                                       is_cached=False)
337       
338
339        context.close()
340       
341        #remove the context of a costonly process
342        if costonly:
343            context.delete()
344       
345        log.debug("Finished Execute in %.2fs" % (time.time() - st,))
346       
347        response.content_type = 'text/xml'
348        return responseXML
349
350    def _getBooleanOwsParam(self, paramName, default='false' ):
351       
352        value = self.getOwsParam(paramName, default=default)
353        if value == 'true':
354            return True
355        else:
356            return False
357
358    def _getProcessCallableString(self, identifier):
359        """
360        Parse the configuration dict to find the process module and callable names
361       
362        @return: (module_name, callable_name) where both are strings.
363       
364        """
365        log.debug("process callable string = %s" % (caps_config_dict[identifier]["wps_interface"]["process_callable"],))
366        return caps_config_dict[identifier]["wps_interface"]["process_callable"]
367
368    def _loadProcess(self, context, identifier):
369        # Get the module and function needed...
370        processModuleString = self._getProcessCallableString(identifier)
371
372        processCallable = common_utils.buildProcessCallable(processModuleString)
373
374        return processCallable
375   
376           
377    def _deduceProcessType(self, context):
378        """Work-out whether to queue an asynchronous process
379        based on the result of dryRun.
380       
381        """
382        max_duration = int(wps_config_dict['max_proc_duration'])
383        job_details = context.outputs['job_details']
384        log.debug("context.status = %s" % (context.status,))
385        if job_details['duration'] > max_duration:
386            r = 'async_l'
387        else:
388            r = 'async_s'
389
390        log.debug('Selected queue %s for process with directory %s' % (r, context.processDir))
391        return r
392
393    def _makeProcessContext(self, identifier, requestId, process_type, arg_dict):
394        """
395        Creates a context object for a new job
396        """
397       
398        log.info("Creating process context")
399       
400        proc_conf = createProcessConfig(identifier)
401        procDir = processDirManager.makeProcessDir(requestId, identifier)
402       
403        context = ProcessContext(procDir, proc_conf)
404       
405        context.create(arg_dict)           
406        context.setupJobDetails(requestId, self.lineage)
407       
408        return context
409       
410    def _selectQueue(self, identifier, processType):
411        """
412        Return which SGE queue to send a job to.
413       
414        """
415        if processType == 'async_l':
416            processQueue = wps_config_dict['sge_queue_l']
417        elif processType == 'async_s':
418            processQueue = wps_config_dict['sge_queue_s']
419        else:
420            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
421                           
422        if processDirManager.isStripedProcess(identifier):
423            if '@' in processQueue:
424                raise Exception("SGE queue cannot contain a host identifier in striped mode")
425            processQueue = '%s@%s' % (processQueue, processDirManager.getStripedSGEHost())
426       
427        return processQueue
428       
429    def _submitAsyncProcess(self, identifier, jobId, context, queue):
430        """
431        Submits an asynchronous process and returns updated Exec Resp doc.
432        """
433
434        processCallableString = caps_config_dict[identifier]["wps_interface"]["process_callable"]
435       
436        process_dir = os.path.abspath(context.processDir)
437        context.save()
438
439        log.debug("queue name...... %s" % queue)
440       
441        sge = SgePyInterface()
442        try:
443           
444            #!NOTE: I have removed os.path.abspath('python') because we can't be sure
445            #    python is on the same path on an SGE node as on the WPS machine.
446            #    We must make sure python is in on PATH on the SGE node.
447            runproc = wps_config_dict['runproc_path']
448            sge_job_id = sge.submitJob(runproc, [processCallableString, process_dir], 
449                                       process_dir, queue)
450        except:
451            log.error("Exception occurred submitting process to SGE \n callable=%s\n runproc=%s\n process_dir=%s" % (processCallableString, runproc, process_dir))
452            log.exception('Process Exception follows')
453           
454            # catch the exception and wrap it as an OWSError which should get
455            # turned into an error report in cows.
456            exceptionMessage = self._getExceptionMessage()
457           
458            raise OwsError("Error occurred while submitting asynconous job.\n Message=%s" % exceptionMessage)
459       
460            context.save()
461        else:
462            log.debug('submit sge_job_id to requestManager to add it to the job id mapping file')
463            requestManager.addAsyncRequest(jobId, sge_job_id, identifier, context, queue)
464           
465        finally:
466            sge.close()
467       
468    def _runSubprocProcess(self, processCallableString, context, identifier, requestId):
469        """
470        Run a process in the WorkerPool
471        """
472       
473        log.debug("Executing process callable %s in WorkerPool" % (processCallableString,))
474
475        context.setStatus(STATUS.STARTED, 'Job has started', 0)
476        context.log.info("Executing process %s in WorkerPool on HOSTNAME = %s" \
477                  % (processCallableString, os.getenv('HOSTNAME')))
478        try:
479            workerPool.execute(processCallableString, context)
480        except Exception, e:
481
482            log.exception('Process Exception follows')
483
484            # Note: context related cleanup is done in workerPool.  It should be correctly
485            #     synced with the filesystem on return
486           
487            log.warning('WorkerPool job %s failed' % processCallableString)
488           
489        else:
490            log.debug('WorkerPool job %s returned' % processCallableString)
491            if context.status in (STATUS.ACCEPTED, STATUS.STARTED):
492                context.setStatus(STATUS.COMPLETED, '', 100)
493           
494           
495    def _runProcessDryRun(self, processCallable, context, requestId):
496        """
497        Runs the dryRun method of an asyncronous process. If any Exceptions
498        occurr while running the method the context.status will be set to
499        FAILED and the process should not be submitted to SGE.
500        """
501        try:
502            processCallable.dryRun(context)
503           
504        except Exception, e:
505            log.debug("Exception occurred while running %s" % (processCallable.dryRun,))
506            log.exception('Process Exception follows')
507           
508            exceptionMessage = self._getExceptionMessage()
509            context.setStatus(STATUS.FAILED, exceptionMessage)
510
511            #log the exception in the process log as well.
512            context.log.exception('Process Exception follows')
513            context.save()
514        else:
515           
516            if context.status in (STATUS.STARTED, STATUS.COMPLETED):
517                log.error("Process in %s status after running dryRun, processCallable=%s" % (context.status, processCallable,))
518                log.error("Correcting process status to ACCEPTED")
519                context.setStatus(STATUS.ACCEPTED, '')
520               
521    def _checkAsyncQueueLimit(self, context, requestId, processType, username):
522        """
523        Checks with the requestManager how many jobs a user has running on each
524        of the SGE queues. Then comparis the number of jobs running with the
525        maximum limit for the queue and returns if the job should be accepted.
526       
527        returns (accepted, message)
528        """
529       
530        if processType == 'async_l':
531            queueName = wps_config_dict['sge_queue_l']
532            maxProcesses = int(wps_config_dict['max_l_proc'])
533           
534        elif processType == 'async_s':
535            queueName = wps_config_dict['sge_queue_s']
536            maxProcesses = int(wps_config_dict['max_s_proc'])
537        else:
538            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
539       
540        currentJobs = requestManager.countSGEJobsRunning(username, queueName)
541                     
542        if  currentJobs >= maxProcesses:
543            accept = False
544            message = 'You cannot start this request until one of your current processes completes.'
545           
546            log.debug("request denied for user %s who has %s requests running on the %s queue",
547                       username, currentJobs, queueName)
548           
549        else:
550            accept = True
551            message = ""
552       
553        return (accept, message)
554
555    def _getExceptionMessage(self):
556#        log.debug("sys.exc_info() = %s" % (sys.exc_info(),))
557
558        #get the type + info + traceback from the last exception to occurr
559        t, i, tb = sys.exc_info()
560#        log.debug("t = %s" % (t,))
561#        log.debug("i = %s" % (i,))
562               
563        #update the status in the context         
564        exceptionMessage = traceback.format_exception_only(t, i)[0]
565        exceptionMessage = exceptionMessage.strip()       
566        return exceptionMessage
Note: See TracBrowser for help on using the repository browser.