source: cows_wps/trunk/cows_wps/controllers/wps.py @ 7535

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/controllers/wps.py@7535
Revision 7535, 23.1 KB checked in by astephen, 10 years ago (diff)

Fixed to async argument handler working.

Line 
1import logging
2import sys
3import imp
4import traceback
5import urllib
6import os
7import time
8from datetime import datetime
9import random
10
11from pylons import response
12from routes import url_for
13
14#import from cows
15from cows.model import PossibleValues, WGS84BoundingBox, BoundingBox, Contents
16from cows.pylons import ows_controller
17from cows.exceptions import *
18from cows import helpers
19from cows.builder import loadConfigFile
20
21# Import local modules
22from cows_wps.controllers import *
23from cows_wps.utils.create_process_config import createProcessConfig
24from cows_wps.lib.user_manager import UserManager
25from cows_wps.process_handler import validate_arguments
26from cows_wps.process_handler.context.process_context import ProcessContext
27from cows_wps.process_handler.context.process_status import STATUS
28from cows_wps.model.managers import requestManager, cacheManager, caching_enabled, workerPool
29from cows_wps.model.process_dir import processDirManager
30from cows_wps.renderer import xml_renderer
31from cows_wps.process_handler.fileset import FLAG
32from cows_wps.sge.sge_py_int import SgePyInterface
33
34import cows_wps.utils.common as common_utils
35
36#import config files
37from cows_wps.utils.parse_wps_config import wps_config_dict
38from cows_wps.utils.parse_capabilities_config import caps_config_dict
39from cows_wps.utils.parse_outputs_rules import outputs_rules_config_dict
40from cows_wps.utils.role_handler import RoleHandler
41
42log = logging.getLogger(__name__)
43#-----------------------------------------------------------------------------
44
45
46# Define global variables
47
48class Operation(object):
49    def __init__(self, name, href):
50        self.name = name
51        self.href = href
52
53   
54def addOperation(ops, name):
55    href=helpers.operation(url_for(qualified=True, action="index")+'?',[]).get.href
56    ops.append(Operation(name,href))
57
58
59ops =[]
60log.info("Building config file")
61addOperation(ops, "GetCapabilities")
62addOperation(ops,"DescribeProcess")
63addOperation(ops, "Execute") 
64
65#-----------------------------------------------------------------------------
66
67
68
69class WpsController(ows_controller.OWSController):
70    #-------------------------------------------------------------------------
71    # Attributes required by OWSController
72
73    service = 'WPS'
74    owsOperations = (ows_controller.OWSController.owsOperations +
75                     ['DescribeProcess', 'Execute'])
76    validVersions = ['0.4.0']
77   
78
79    #-------------------------------------------------------------------------
80
81    # Cababilities information is built-up in I{ops} above, therefore no need
82    # for _loadCapabilities()
83    #
84    #def _loadCapabilities(self):
85    #
86   
87    def _renderCapabilities(self, version, format):
88        t= templateLoader.load("wps_getcapabilitiesresponse_template.xml")
89   
90        return t.generate(caps_config_dict=caps_config_dict, ops=ops, c=c).render()
91
92    def DescribeProcess(self):
93        ids = caps_config_dict['Identifiers']
94        t = templateLoader.load('wps_describeprocess_response.xml')
95           
96        # Get identifier
97        identifier = self.getOwsParam('identifier')
98
99        # Handle the case where all descriptions are requested
100        if identifier == 'all':
101           
102            return t.generate(ids=ids).render()
103        # Handle specific process
104        if identifier in ids:
105            response.content_type = 'text/xml'
106            return t.generate(caps_config_dict=caps_config_dict, ids=[identifier]).render()
107        else:
108            raise InvalidParameterValue('Identifier not found', 'identifier')
109
110
111    def _parseDataInputs(self, dataInputs):
112        """
113        Parses inputs, returns dictionary.
114        Applies the following rules:
115         * Any values sent with "|" are split into a list.
116         * Any values containing a "+" sign will be replaced with spaces in the value.
117        """
118        # If dataInputs is blank return empty dictionary
119        if not dataInputs:
120            return {}
121
122        arg_dict = {}
123        args = dataInputs.split(",")
124
125        if len(args) %2 != 0:
126            raise ValueError("Must provide even number of arguments in query string for datainput argument (all comma-separated).")
127
128        if len(args) == 1 and args[0] == "": args = []
129
130        while len(args) > 0:
131            (arg, value) = args[:2]
132            args = args[2:]
133
134#            log.warn("VALUE: %s" % value)
135            value = urllib.unquote(value)
136            value = value.replace("+", " ")
137#            log.warn("VALUE2: %s" % value)
138
139            if value.find("|") > -1:
140                value = value.split("|")
141
142            arg_dict[arg] = value
143
144        return arg_dict
145
146       
147    def Execute(self):
148        if request.environ.get("REQUEST_METHOD") == "POST":
149            log.warn("POST request was rejected because we do not allow this yet.")
150            abort(405)
151
152        log.info("entering controllers.wps.Execute")
153
154        st = time.time()
155       
156        identifier = self.getOwsParam('Identifier')
157       
158        if identifier not in caps_config_dict['Identifiers']:
159            raise InvalidParameterValue('Identifier not found', 'identifier')
160       
161        process_conf = caps_config_dict[identifier]
162        process_type = process_conf["wps_interface"]["process_type"]
163
164        # Since we generate the execute response document on demand we can ignore the store option.
165        # It might be necessary to reflect it's value in ExecuteResponse for WPS compliance
166        store = self._getBooleanOwsParam('StoreExecuteResponse')
167        if store == True:
168            if not caps_config_dict[identifier]['wps_interface']['store']:
169                raise InvalidParameterValue('Storing output is not supported for this process')
170           
171        #!TODO: Clarify this.  We can always support status because the ExecuteResponse document is created on-demand
172        status = self._getBooleanOwsParam('Status')
173        if status == True:
174            if not caps_config_dict[identifier]['wps_interface']['status']:
175                raise InvalidParameterValue('Status not supported for this process')
176            elif not store:
177                raise InvalidParameterValue('Both status and store must be true')
178
179        #!NOTE: Lineage effects what is put in the ExecuteResponse document.  It should end up in the
180        #    config section of the ProcessContext.
181        self.lineage = self._getBooleanOwsParam('lineage', default='true')
182       
183        #!TODO: Check these custom arguments are in the capabilities document.
184        costonly = self._getBooleanOwsParam('Costonly')
185        inform = self._getBooleanOwsParam('Inform')           
186       
187        #######################################################################
188        # Parse and validate arguments.  First the DataInputs are extracted from
189        # the OWS params then they are validated and deserialised into python
190        # objects.
191        #
192        #   input_dict: plain string values of DataInputs
193        #   arg_dict:   validated against process.ini and deserialised
194        dataInputs = self.getOwsParam('DataInputs')
195        log.debug("dataInputs = %s" % (dataInputs,))
196        input_dict = self._parseDataInputs(dataInputs)
197        va = validate_arguments.ValidateArguments(identifier, input_dict)
198               
199        # Exceptions here will be caught by COWS
200        try:
201            arg_dict = va.validate()
202        except Exception, err:
203            exc_info = sys.exc_info()
204            log.warning("exception thrown from validator:")
205            log.warning("traceback.format_exc() = %s" % (traceback.format_exc(),))
206           
207            raise InvalidParameterValue('Invalid parameters',err)
208       
209       
210        #######################################################################
211       
212        #######################################################################
213        #!NOTE: RawDataOutput and ResponseDocument handling has been removed from
214        #    the code.  They aren't needed for DCIP and more thought is needed.
215        #######################################################################
216
217        requestId = common_utils.getUniqueId()
218           
219        # Get username and e-mail address from secure session
220        try:
221            um = UserManager()
222            email = um.getEmailAddress()
223            username = um.getUserName()
224        except:
225            log.warn("Setting user to 'anon' because security implies this requires no login.")
226            username = "anon"
227            email = ""
228
229        log.debug("identifier=%s, username=%s, requestId=%s costonly=%s lineage=%s inform=%s" % \
230                  (identifier, username, requestId, costonly, self.lineage, inform))
231
232        # Now work out which authorisedRoles are associated with this Process ID
233        # and this set of arguments
234        rh = RoleHandler()
235        authorisedRoles = rh.getRoleNumberFromProcessIDAndArgs(identifier, arg_dict) 
236
237        # Update the cache with any completed requests before consulting the cache
238        requestManager.updateRunningRequests()
239       
240        # check if the process results already exist in the cache
241        if caching_enabled:
242            cachedJobId = cacheManager.getCachedJobId(identifier, arg_dict)
243        else:
244            cachedJobId = None
245
246        log.debug("cachedJobId = %s" % (cachedJobId,))
247
248        if cachedJobId:
249            log.debug("request is cached, cacheJobId=%s" % cachedJobId)
250            context = requestManager.addCachedRequest(requestId, cachedJobId, username)
251           
252            context.outputs['job_details']['lineage'] = self.lineage
253           
254            #!TODO: The new requestId will not show up on the jobs controller because it doesn't really exist.
255            #    I.e. it doesn't have a processDir.  We need to create process records that point to the
256            #    cached values.
257           
258            responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
259                                                           self.validVersions[0], 
260                                                           input_dict, requestId,
261                                                           authorisedRoles = authorisedRoles,
262                                                           is_cached = True)
263
264            context.close()
265            response.content_type = 'text/xml'
266            return responseXML
267
268       
269       
270       
271        context = self._makeProcessContext(identifier, requestId, process_type, arg_dict)
272        processCallable = self._loadProcess(context, identifier)
273       
274        if process_type in ["async", "async_l", "async_s"]:
275
276            if not hasattr(processCallable, 'dryRun'):
277                raise RuntimeError('Dry Run not supported for asynchronous process %s.' % processCallable)
278
279
280            context.config['Inform'] = str(inform)
281            context.config['notify_email_to'] = email
282           
283            log.info("Running dryrun for non cached async job")
284            self._runProcessDryRun(processCallable, context, requestId)
285           
286            #if the dryRun ran successfully
287            if context.status == STATUS.ACCEPTED:
288               
289                #after dryRun we can deduce the correct process type           
290                if process_type == 'async':
291                    process_type = self._deduceProcessType(context)
292           
293                (accept, message) = self._checkAsyncQueueLimit(context, 
294                                            requestId, process_type, username)
295           
296                if costonly:
297                    if accept:
298                        context.outputs['job_details']['costonly_status'] = ['cleared', 'The request is ready  for execution']
299                    else:
300                        context.outputs['job_details']['costonly_status'] = ['withheld', message]
301                       
302                    context.save()
303
304                else:
305               
306                    if accept:
307                        #run the async process
308                        log.info("Running %s job, callable = %s" % (process_type, processCallable,))
309                        processQueue = self._selectQueue(identifier, process_type)
310                       
311                        self._submitAsyncProcess(identifier, requestId, username, context, processQueue)
312                    else:
313                        context.setStatus(STATUS.FAILED, message)
314                        context.save()
315       
316        else: # process type is sync
317           
318            if inform == True:
319                log.warning("inform=True found for syncronouse process, ignoring it. \n(identifier = %s request_id = %s, inputs=%s" \
320                            % (identifier, requestId, arg_dict))
321           
322               
323            requestManager.addSyncRequest(requestId, username, identifier, context)
324                             
325            if hasattr(processCallable, 'dryRun'):
326                self._runProcessDryRun(processCallable, context, requestId)
327           
328            log.debug("Context status = %s" % (context.status,) )
329           
330            if context.status != STATUS.FAILED:
331
332                #!NOTE: Now all sync processes run in a worker pool
333                if process_type == 'sync':
334                    log.info("Running sync job")
335                    pcs = self._getProcessCallableString(identifier)
336                    self._runSubprocProcess(pcs, context, identifier, requestId)
337                                       
338            log.debug("updating request status to context.status( = %s)" % (context.status,))
339           
340            context.save()
341            #update the status in the database
342            requestManager.updateRequestStatus(requestId, context.status)
343           
344            #!TODO:Will this caching be done by the status change listener in requestManager?
345            if caching_enabled and context.status == STATUS.COMPLETED:
346                cacheManager.cacheRequest(requestId)
347           
348        responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
349                                                       self.validVersions[0], 
350                                                       input_dict, requestId,
351                                                       authorisedRoles = authorisedRoles,
352                                                       is_cached=False)
353       
354
355        context.close()
356       
357        #remove the context of a costonly process
358        if costonly:
359            context.delete()
360       
361        log.debug("Finished Execute in %.2fs" % (time.time() - st,))
362       
363        response.content_type = 'text/xml'
364        return responseXML
365
366    def _getBooleanOwsParam(self, paramName, default='false' ):
367       
368        value = self.getOwsParam(paramName, default=default)
369        if value == 'true':
370            return True
371        else:
372            return False
373
374    def _getProcessCallableString(self, identifier):
375        """
376        Parse the configuration dict to find the process module and callable names
377       
378        @return: (module_name, callable_name) where both are strings.
379       
380        """
381        log.debug("process callable string = %s" % (caps_config_dict[identifier]["wps_interface"]["process_callable"],))
382        return caps_config_dict[identifier]["wps_interface"]["process_callable"]
383
384    def _loadProcess(self, context, identifier):
385        # Get the module and function needed...
386        processModuleString = self._getProcessCallableString(identifier)
387
388        processCallable = common_utils.buildProcessCallable(processModuleString)
389
390        return processCallable
391   
392           
393    def _deduceProcessType(self, context):
394        """Work-out whether to queue an asynchronous process
395        based on the result of dryRun.
396       
397        """
398        max_duration = int(wps_config_dict['max_proc_duration'])
399        job_details = context.outputs['job_details']
400        log.debug("context.status = %s" % (context.status,))
401        if job_details['duration'] > max_duration:
402            r = 'async_l'
403        else:
404            r = 'async_s'
405
406        log.debug('Selected queue %s for process with directory %s' % (r, context.processDir))
407        return r
408
409    def _makeProcessContext(self, identifier, requestId, process_type, arg_dict):
410        """
411        Creates a context object for a new job
412        """
413       
414        log.info("Creating process context")
415       
416        proc_conf = createProcessConfig(identifier)
417        procDir = processDirManager.makeProcessDir(requestId, identifier)
418       
419        context = ProcessContext(procDir, proc_conf)
420       
421        context.create(arg_dict)           
422        context.setupJobDetails(requestId, self.lineage)
423       
424        return context
425       
426    def _selectQueue(self, identifier, processType):
427        """
428        Return which SGE queue to send a job to.
429       
430        """
431        if processType == 'async_l':
432            processQueue = wps_config_dict['sge_queue_l']
433        elif processType == 'async_s':
434            processQueue = wps_config_dict['sge_queue_s']
435        else:
436            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
437                           
438        if processDirManager.isStripedProcess(identifier):
439            if '@' in processQueue:
440                raise Exception("SGE queue cannot contain a host identifier in striped mode")
441            processQueue = '%s@%s' % (processQueue, processDirManager.getStripedSGEHost())
442       
443        return processQueue
444       
445    def _submitAsyncProcess(self, identifier, jobId, username, context, queue):
446        """
447        Submits an asynchronous process and returns updated Exec Resp doc.
448        """
449
450        processCallableString = caps_config_dict[identifier]["wps_interface"]["process_callable"]
451       
452        process_dir = os.path.abspath(context.processDir)
453        context.save()
454
455        log.debug("queue name...... %s" % queue)
456       
457        sge = SgePyInterface()
458        try:
459           
460            #!NOTE: I have removed os.path.abspath('python') because we can't be sure
461            #    python is on the same path on an SGE node as on the WPS machine.
462            #    We must make sure python is in on PATH on the SGE node.
463            runproc = wps_config_dict['runproc_path']
464            sge_job_id = sge.submitJob(runproc, [processCallableString, process_dir], 
465                                       process_dir, queue)
466        except:
467            log.error("Exception occurred submitting process to SGE \n callable=%s\n runproc=%s\n process_dir=%s" % (processCallableString, runproc, process_dir))
468            log.exception('Process Exception follows')
469           
470            # catch the exception and wrap it as an OWSError which should get
471            # turned into an error report in cows.
472            exceptionMessage = self._getExceptionMessage()
473           
474            raise OwsError("Error occurred while submitting asynconous job.\n Message=%s" % exceptionMessage)
475       
476            context.save()
477        else:
478            log.debug('submit sge_job_id to requestManager to add it to the job id mapping file')
479            requestManager.addAsyncRequest(jobId, username, sge_job_id, identifier, context, queue)
480           
481        finally:
482            sge.close()
483       
484    def _runSubprocProcess(self, processCallableString, context, identifier, requestId):
485        """
486        Run a process in the WorkerPool
487        """
488       
489        log.debug("Executing process callable %s in WorkerPool" % (processCallableString,))
490
491        context.setStatus(STATUS.STARTED, 'Job has started', 0)
492        context.log.info("Executing process %s in WorkerPool on HOSTNAME = %s" \
493                  % (processCallableString, os.getenv('HOSTNAME')))
494        try:
495            workerPool.execute(processCallableString, context)
496        except Exception, e:
497
498            log.exception('Process Exception follows')
499
500            # Note: context related cleanup is done in workerPool.  It should be correctly
501            #     synced with the filesystem on return
502           
503            log.warning('WorkerPool job %s failed' % processCallableString)
504           
505        else:
506            log.debug('WorkerPool job %s returned' % processCallableString)
507            if context.status in (STATUS.ACCEPTED, STATUS.STARTED):
508                context.setStatus(STATUS.COMPLETED, '', 100)
509           
510           
511    def _runProcessDryRun(self, processCallable, context, requestId):
512        """
513        Runs the dryRun method of an asyncronous process. If any Exceptions
514        occurr while running the method the context.status will be set to
515        FAILED and the process should not be submitted to SGE.
516        """
517        try:
518            processCallable.dryRun(context)
519           
520        except Exception, e:
521            log.debug("Exception occurred while running %s" % (processCallable.dryRun,))
522            log.exception('Process Exception follows')
523           
524            exceptionMessage = self._getExceptionMessage()
525            context.setStatus(STATUS.FAILED, exceptionMessage)
526
527            #log the exception in the process log as well.
528            context.log.exception('Process Exception follows')
529            context.save()
530        else:
531           
532            if context.status in (STATUS.STARTED, STATUS.COMPLETED):
533                log.error("Process in %s status after running dryRun, processCallable=%s" % (context.status, processCallable,))
534                log.error("Correcting process status to ACCEPTED")
535                context.setStatus(STATUS.ACCEPTED, '')
536               
537    def _checkAsyncQueueLimit(self, context, requestId, processType, username):
538        """
539        Checks with the requestManager how many jobs a user has running on each
540        of the SGE queues. Then comparis the number of jobs running with the
541        maximum limit for the queue and returns if the job should be accepted.
542       
543        returns (accepted, message)
544        """
545       
546        if processType == 'async_l':
547            queueName = wps_config_dict['sge_queue_l']
548            maxProcesses = int(wps_config_dict['max_l_proc'])
549           
550        elif processType == 'async_s':
551            queueName = wps_config_dict['sge_queue_s']
552            maxProcesses = int(wps_config_dict['max_s_proc'])
553        else:
554            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
555       
556        currentJobs = requestManager.countSGEJobsRunning(username, queueName)
557                     
558        if  currentJobs >= maxProcesses:
559            accept = False
560            message = 'You cannot start this request until one of your current processes completes.'
561           
562            log.debug("request denied for user %s who has %s requests running on the %s queue",
563                       username, currentJobs, queueName)
564           
565        else:
566            accept = True
567            message = ""
568       
569        return (accept, message)
570
571    def _getExceptionMessage(self):
572#        log.debug("sys.exc_info() = %s" % (sys.exc_info(),))
573
574        #get the type + info + traceback from the last exception to occurr
575        t, i, tb = sys.exc_info()
576#        log.debug("t = %s" % (t,))
577#        log.debug("i = %s" % (i,))
578               
579        #update the status in the context         
580        exceptionMessage = traceback.format_exception_only(t, i)[0]
581        exceptionMessage = exceptionMessage.strip()       
582        return exceptionMessage
Note: See TracBrowser for help on using the repository browser.