source: cows_wps/trunk/cows_wps/controllers/wps.py @ 7514

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/controllers/wps.py@7514
Revision 7514, 22.8 KB checked in by astephen, 11 years ago (diff)

tidied old code

Line 
1#!TODO: CEDA license agreement.  See cows/add_license.py for script to do this.
2
3import logging
4import sys
5import imp
6import traceback
7import urllib
8import os
9import time
10from datetime import datetime
11import random
12
13from pylons import response
14from routes import url_for
15
16#import from cows
17from cows.model import PossibleValues, WGS84BoundingBox, BoundingBox, Contents
18from cows.pylons import ows_controller
19from cows.exceptions import *
20from cows import helpers
21from cows.builder import loadConfigFile
22
23# Import local modules
24from cows_wps.controllers import *
25from cows_wps.utils.create_process_config import createProcessConfig
26from cows_wps.lib.user_manager import UserManager
27from cows_wps.process_handler import validate_arguments
28from cows_wps.process_handler.context.process_context import ProcessContext
29from cows_wps.process_handler.context.process_status import STATUS
30from cows_wps.model.managers import requestManager, cacheManager, caching_enabled, workerPool
31from cows_wps.model.process_dir import processDirManager
32from cows_wps.renderer import xml_renderer
33from cows_wps.process_handler.fileset import FLAG
34from cows_wps.sge.sge_py_int import SgePyInterface
35
36import cows_wps.utils.common as common_utils
37
38#import config files
39from cows_wps.utils.parse_wps_config import wps_config_dict
40from cows_wps.utils.parse_capabilities_config import caps_config_dict
41from cows_wps.utils.parse_outputs_rules import outputs_rules_config_dict
42
43log = logging.getLogger(__name__)
44#-----------------------------------------------------------------------------
45
46
47# Define global variables
48
49class Operation(object):
50    def __init__(self, name, href):
51        self.name = name
52        self.href = href
53
54   
55def addOperation(ops, name):
56    href=helpers.operation(url_for(qualified=True, action="index")+'?',[]).get.href
57    ops.append(Operation(name,href))
58
59
60ops =[]
61log.info("Building config file")
62addOperation(ops, "GetCapabilities")
63addOperation(ops,"DescribeProcess")
64addOperation(ops, "Execute") 
65
66#-----------------------------------------------------------------------------
67
68
69
70class WpsController(ows_controller.OWSController):
71    #-------------------------------------------------------------------------
72    # Attributes required by OWSController
73
74    service = 'WPS'
75    owsOperations = (ows_controller.OWSController.owsOperations +
76                     ['DescribeProcess', 'Execute'])
77    validVersions = ['0.4.0']
78   
79
80    #-------------------------------------------------------------------------
81
82    # Cababilities information is built-up in I{ops} above, therefore no need
83    # for _loadCapabilities()
84    #
85    #def _loadCapabilities(self):
86    #
87   
88    def _renderCapabilities(self, version, format):
89        t= templateLoader.load("wps_getcapabilitiesresponse_template.xml")
90   
91        return t.generate(caps_config_dict=caps_config_dict, ops=ops, c=c).render()
92
93    def DescribeProcess(self):
94        ids = caps_config_dict['Identifiers']
95        t = templateLoader.load('wps_describeprocess_response.xml')
96           
97        # Get identifier
98        identifier = self.getOwsParam('identifier')
99
100        # Handle the case where all descriptions are requested
101        if identifier == 'all':
102           
103            return t.generate(ids=ids).render()
104        # Handle specific process
105        if identifier in ids:
106            response.content_type = 'text/xml'
107            return t.generate(caps_config_dict=caps_config_dict, ids=[identifier]).render()
108        else:
109            raise InvalidParameterValue('Identifier not found', 'identifier')
110
111
112    def _parseDataInputs(self, dataInputs):
113        """
114        Parses inputs, returns dictionary.
115        Applies the following rules:
116         * Any values sent with "|" are split into a list.
117         * Any values containing a "+" sign will be replaced with spaces in the value.
118        """
119        # If dataInputs is blank return empty dictionary
120        if not dataInputs:
121            return {}
122
123        arg_dict = {}
124        args = dataInputs.split(",")
125
126        if len(args) %2 != 0:
127            raise ValueError("Must provide even number of arguments in query string for datainput argument (all comma-separated).")
128
129        if len(args) == 1 and args[0] == "": args = []
130
131        while len(args) > 0:
132            (arg, value) = args[:2]
133            args = args[2:]
134
135#            log.warn("VALUE: %s" % value)
136            value = urllib.unquote(value)
137            value = value.replace("+", " ")
138#            log.warn("VALUE2: %s" % value)
139
140            if value.find("|") > -1:
141                value = value.split("|")
142
143            arg_dict[arg] = value
144
145        return arg_dict
146
147       
148    def Execute(self):
149        if request.environ.get("REQUEST_METHOD") == "POST":
150            log.warn("POST request was rejected because we do not allow this yet.")
151            abort(405)
152
153        log.info("entering controllers.wps.Execute")
154
155        st = time.time()
156       
157        identifier = self.getOwsParam('Identifier')
158       
159        if identifier not in caps_config_dict['Identifiers']:
160            raise InvalidParameterValue('Identifier not found', 'identifier')
161       
162        process_conf = caps_config_dict[identifier]
163        process_type = process_conf["wps_interface"]["process_type"]
164
165        # Since we generate the execute response document on demand we can ignore the store option.
166        # It might be necessary to reflect it's value in ExecuteResponse for WPS compliance
167        store = self._getBooleanOwsParam('StoreExecuteResponse')
168        if store == True:
169            if not caps_config_dict[identifier]['wps_interface']['store']:
170                raise InvalidParameterValue('Storing output is not supported for this process')
171           
172        #!TODO: Clarify this.  We can always support status because the ExecuteResponse document is created on-demand
173        status = self._getBooleanOwsParam('Status')
174        if status == True:
175            if not caps_config_dict[identifier]['wps_interface']['status']:
176                raise InvalidParameterValue('Status not supported for this process')
177            elif not store:
178                raise InvalidParameterValue('Both status and store must be true')
179
180        #!NOTE: Lineage effects what is put in the ExecuteResponse document.  It should end up in the
181        #    config section of the ProcessContext.
182        self.lineage = self._getBooleanOwsParam('lineage', default='true')
183       
184        #!TODO: Check these custom arguments are in the capabilities document.
185        costonly = self._getBooleanOwsParam('Costonly')
186        inform = self._getBooleanOwsParam('Inform')           
187       
188        #######################################################################
189        # Parse and validate arguments.  First the DataInputs are extracted from
190        # the OWS params then they are validated and deserialised into python
191        # objects.
192        #
193        #   input_dict: plain string values of DataInputs
194        #   arg_dict:   validated against process.ini and deserialised
195        dataInputs = self.getOwsParam('DataInputs')
196        log.debug("dataInputs = %s" % (dataInputs,))
197        input_dict = self._parseDataInputs(dataInputs)
198        va = validate_arguments.ValidateArguments(identifier, input_dict)
199               
200        # Exceptions here will be caught by COWS
201        try:
202            arg_dict = va.validate()
203        except Exception, err:
204            exc_info = sys.exc_info()
205            log.warning("exception thrown from validator:")
206            log.warning("traceback.format_exc() = %s" % (traceback.format_exc(),))
207           
208            raise InvalidParameterValue('Invalid parameters',err)
209       
210       
211        #######################################################################
212       
213        #######################################################################
214        #!NOTE: RawDataOutput and ResponseDocument handling has been removed from
215        #    the code.  They aren't needed for DCIP and more thought is needed.
216        #######################################################################
217
218        requestId = common_utils.getUniqueId()
219           
220        # Get username and e-mail address from secure session
221        try:
222            um = UserManager()
223            email = um.getEmailAddress()
224            username = um.getUserName()
225        except:
226            log.warn("Setting user to 'anon' because security implies this requires no login.")
227            username = "anon"
228            email = ""
229
230        log.debug("identifier=%s, username=%s, requestId=%s costonly=%s lineage=%s inform=%s" % \
231                  (identifier, username, requestId, costonly, self.lineage, inform))
232
233        # Update the cache with any completed requests before consulting the cache
234        requestManager.updateRunningRequests()
235       
236        # check if the process results already exist in the cache
237        if caching_enabled:
238            cachedJobId = cacheManager.getCachedJobId(identifier, arg_dict)
239        else:
240            cachedJobId = None
241
242        log.debug("cachedJobId = %s" % (cachedJobId,))
243
244        if cachedJobId:
245            log.debug("request is cached, cacheJobId=%s" % cachedJobId)
246            context = requestManager.addCachedRequest(requestId, cachedJobId, username)
247           
248            context.outputs['job_details']['lineage'] = self.lineage
249           
250            #!TODO: The new requestId will not show up on the jobs controller because it doesn't really exist.
251            #    I.e. it doesn't have a processDir.  We need to create process records that point to the
252            #    cached values.
253           
254            responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
255                                                           self.validVersions[0], 
256                                                           input_dict, requestId,
257                                                           is_cached=True)
258
259            context.close()
260            response.content_type = 'text/xml'
261            return responseXML
262
263       
264       
265       
266        context = self._makeProcessContext(identifier, requestId, process_type, arg_dict)
267        processCallable = self._loadProcess(context, identifier)
268       
269        if process_type in ["async", "async_l", "async_s"]:
270
271            if not hasattr(processCallable, 'dryRun'):
272                raise RuntimeError('Dry Run not supported for asynchronous process %s.' % processCallable)
273
274
275            context.config['Inform'] = str(inform)
276            context.config['notify_email_to'] = email
277           
278            log.info("Running dryrun for non cached async job")
279            self._runProcessDryRun(processCallable, context, requestId)
280           
281            #if the dryRun ran successfully
282            if context.status == STATUS.ACCEPTED:
283               
284                #after dryRun we can deduce the correct process type           
285                if process_type == 'async':
286                    process_type = self._deduceProcessType(context)
287           
288                (accept, message) = self._checkAsyncQueueLimit(context, 
289                                            requestId, process_type, username)
290           
291                if costonly:
292                    if accept:
293                        context.outputs['job_details']['costonly_status'] = ['cleared', 'The request is ready  for execution']
294                    else:
295                        context.outputs['job_details']['costonly_status'] = ['withheld', message]
296                       
297                    context.save()
298
299                else:
300               
301                    if accept:
302                        #run the async process
303                        log.info("Running %s job, callable = %s" % (process_type, processCallable,))
304                        processQueue = self._selectQueue(identifier, process_type)
305                       
306                        self._submitAsyncProcess(identifier, requestId, username, context, processQueue)
307                    else:
308                        context.setStatus(STATUS.FAILED, message)
309                        context.save()
310       
311        else: # process type is sync
312           
313            if inform == True:
314                log.warning("inform=True found for syncronouse process, ignoring it. \n(identifier = %s request_id = %s, inputs=%s" \
315                            % (identifier, requestId, arg_dict))
316           
317               
318            requestManager.addSyncRequest(requestId, username, identifier, context)
319                             
320            if hasattr(processCallable, 'dryRun'):
321                self._runProcessDryRun(processCallable, context, requestId)
322           
323            log.debug("Context status = %s" % (context.status,) )
324           
325            if context.status != STATUS.FAILED:
326
327                #!NOTE: Now all sync processes run in a worker pool
328                if process_type == 'sync':
329                    log.info("Running sync job")
330                    pcs = self._getProcessCallableString(identifier)
331                    self._runSubprocProcess(pcs, context, identifier, requestId)
332                                       
333            log.debug("updating request status to context.status( = %s)" % (context.status,))
334           
335            context.save()
336            #update the status in the database
337            requestManager.updateRequestStatus(requestId, context.status)
338           
339            #!TODO:Will this caching be done by the status change listener in requestManager?
340            if caching_enabled and context.status == STATUS.COMPLETED:
341                cacheManager.cacheRequest(requestId)
342           
343        responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
344                                                       self.validVersions[0], 
345                                                       input_dict, requestId,
346                                                       is_cached=False)
347       
348
349        context.close()
350       
351        #remove the context of a costonly process
352        if costonly:
353            context.delete()
354       
355        log.debug("Finished Execute in %.2fs" % (time.time() - st,))
356       
357        response.content_type = 'text/xml'
358        return responseXML
359
360    def _getBooleanOwsParam(self, paramName, default='false' ):
361       
362        value = self.getOwsParam(paramName, default=default)
363        if value == 'true':
364            return True
365        else:
366            return False
367
368    def _getProcessCallableString(self, identifier):
369        """
370        Parse the configuration dict to find the process module and callable names
371       
372        @return: (module_name, callable_name) where both are strings.
373       
374        """
375        log.debug("process callable string = %s" % (caps_config_dict[identifier]["wps_interface"]["process_callable"],))
376        return caps_config_dict[identifier]["wps_interface"]["process_callable"]
377
378    def _loadProcess(self, context, identifier):
379        # Get the module and function needed...
380        processModuleString = self._getProcessCallableString(identifier)
381
382        processCallable = common_utils.buildProcessCallable(processModuleString)
383
384        return processCallable
385   
386           
387    def _deduceProcessType(self, context):
388        """Work-out whether to queue an asynchronous process
389        based on the result of dryRun.
390       
391        """
392        max_duration = int(wps_config_dict['max_proc_duration'])
393        job_details = context.outputs['job_details']
394        log.debug("context.status = %s" % (context.status,))
395        if job_details['duration'] > max_duration:
396            r = 'async_l'
397        else:
398            r = 'async_s'
399
400        log.debug('Selected queue %s for process with directory %s' % (r, context.processDir))
401        return r
402
403    def _makeProcessContext(self, identifier, requestId, process_type, arg_dict):
404        """
405        Creates a context object for a new job
406        """
407       
408        log.info("Creating process context")
409       
410        proc_conf = createProcessConfig(identifier)
411        procDir = processDirManager.makeProcessDir(requestId, identifier)
412       
413        context = ProcessContext(procDir, proc_conf)
414       
415        context.create(arg_dict)           
416        context.setupJobDetails(requestId, self.lineage)
417       
418        return context
419       
420    def _selectQueue(self, identifier, processType):
421        """
422        Return which SGE queue to send a job to.
423       
424        """
425        if processType == 'async_l':
426            processQueue = wps_config_dict['sge_queue_l']
427        elif processType == 'async_s':
428            processQueue = wps_config_dict['sge_queue_s']
429        else:
430            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
431                           
432        if processDirManager.isStripedProcess(identifier):
433            if '@' in processQueue:
434                raise Exception("SGE queue cannot contain a host identifier in striped mode")
435            processQueue = '%s@%s' % (processQueue, processDirManager.getStripedSGEHost())
436       
437        return processQueue
438       
439    def _submitAsyncProcess(self, identifier, jobId, username, context, queue):
440        """
441        Submits an asynchronous process and returns updated Exec Resp doc.
442        """
443
444        processCallableString = caps_config_dict[identifier]["wps_interface"]["process_callable"]
445       
446        process_dir = os.path.abspath(context.processDir)
447        context.save()
448
449        log.debug("queue name...... %s" % queue)
450       
451        sge = SgePyInterface()
452        try:
453           
454            #!NOTE: I have removed os.path.abspath('python') because we can't be sure
455            #    python is on the same path on an SGE node as on the WPS machine.
456            #    We must make sure python is in on PATH on the SGE node.
457            runproc = wps_config_dict['runproc_path']
458            sge_job_id = sge.submitJob(runproc, [processCallableString, process_dir], 
459                                       process_dir, queue)
460        except:
461            log.error("Exception occurred submitting process to SGE \n callable=%s\n runproc=%s\n process_dir=%s" % (processCallableString, runproc, process_dir))
462            log.exception('Process Exception follows')
463           
464            # catch the exception and wrap it as an OWSError which should get
465            # turned into an error report in cows.
466            exceptionMessage = self._getExceptionMessage()
467           
468            raise OwsError("Error occurred while submitting asynconous job.\n Message=%s" % exceptionMessage)
469       
470            context.save()
471        else:
472            log.debug('submit sge_job_id to requestManager to add it to the job id mapping file')
473            requestManager.addAsyncRequest(jobId, username, sge_job_id, identifier, context, queue)
474           
475        finally:
476            sge.close()
477       
478    def _runSubprocProcess(self, processCallableString, context, identifier, requestId):
479        """
480        Run a process in the WorkerPool
481        """
482       
483        log.debug("Executing process callable %s in WorkerPool" % (processCallableString,))
484
485        context.setStatus(STATUS.STARTED, 'Job has started', 0)
486        context.log.info("Executing process %s in WorkerPool on HOSTNAME = %s" \
487                  % (processCallableString, os.getenv('HOSTNAME')))
488        try:
489            workerPool.execute(processCallableString, context)
490        except Exception, e:
491
492            log.exception('Process Exception follows')
493
494            # Note: context related cleanup is done in workerPool.  It should be correctly
495            #     synced with the filesystem on return
496           
497            log.warning('WorkerPool job %s failed' % processCallableString)
498           
499        else:
500            log.debug('WorkerPool job %s returned' % processCallableString)
501            if context.status in (STATUS.ACCEPTED, STATUS.STARTED):
502                context.setStatus(STATUS.COMPLETED, '', 100)
503           
504           
505    def _runProcessDryRun(self, processCallable, context, requestId):
506        """
507        Runs the dryRun method of an asyncronous process. If any Exceptions
508        occurr while running the method the context.status will be set to
509        FAILED and the process should not be submitted to SGE.
510        """
511        try:
512            processCallable.dryRun(context)
513           
514        except Exception, e:
515            log.debug("Exception occurred while running %s" % (processCallable.dryRun,))
516            log.exception('Process Exception follows')
517           
518            exceptionMessage = self._getExceptionMessage()
519            context.setStatus(STATUS.FAILED, exceptionMessage)
520
521            #log the exception in the process log as well.
522            context.log.exception('Process Exception follows')
523            context.save()
524        else:
525           
526            if context.status in (STATUS.STARTED, STATUS.COMPLETED):
527                log.error("Process in %s status after running dryRun, processCallable=%s" % (context.status, processCallable,))
528                log.error("Correcting process status to ACCEPTED")
529                context.setStatus(STATUS.ACCEPTED, '')
530               
531    def _checkAsyncQueueLimit(self, context, requestId, processType, username):
532        """
533        Checks with the requestManager how many jobs a user has running on each
534        of the SGE queues. Then comparis the number of jobs running with the
535        maximum limit for the queue and returns if the job should be accepted.
536       
537        returns (accepted, message)
538        """
539       
540        if processType == 'async_l':
541            queueName = wps_config_dict['sge_queue_l']
542            maxProcesses = int(wps_config_dict['max_l_proc'])
543           
544        elif processType == 'async_s':
545            queueName = wps_config_dict['sge_queue_s']
546            maxProcesses = int(wps_config_dict['max_s_proc'])
547        else:
548            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
549       
550        currentJobs = requestManager.countSGEJobsRunning(username, queueName)
551                     
552        if  currentJobs >= maxProcesses:
553            accept = False
554            message = 'You cannot start this request until one of your current processes completes.'
555           
556            log.debug("request denied for user %s who has %s requests running on the %s queue",
557                       username, currentJobs, queueName)
558           
559        else:
560            accept = True
561            message = ""
562       
563        return (accept, message)
564
565    def _getExceptionMessage(self):
566#        log.debug("sys.exc_info() = %s" % (sys.exc_info(),))
567
568        #get the type + info + traceback from the last exception to occurr
569        t, i, tb = sys.exc_info()
570#        log.debug("t = %s" % (t,))
571#        log.debug("i = %s" % (i,))
572               
573        #update the status in the context         
574        exceptionMessage = traceback.format_exception_only(t, i)[0]
575        exceptionMessage = exceptionMessage.strip()       
576        return exceptionMessage
Note: See TracBrowser for help on using the repository browser.