source: cows_wps/trunk/cows_wps/controllers/wps.py @ 6947

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/controllers/wps.py@6947
Revision 6947, 22.8 KB checked in by astephen, 10 years ago (diff)

lots of minor fixes. including removing username from process configs.

Line 
1#!TODO: CEDA license agreement.  See cows/add_license.py for script to do this.
2
3import logging
4import sys
5import imp
6import traceback
7import urllib
8import os
9import time
10from datetime import datetime
11import random
12
13from pylons import response
14from routes import url_for
15
16#import from cows
17from cows.model import PossibleValues, WGS84BoundingBox, BoundingBox, Contents
18from cows.pylons import ows_controller
19from cows.exceptions import *
20from cows import helpers
21from cows.builder import loadConfigFile
22
23# Import local modules
24from cows_wps.controllers import *
25from cows_wps.utils.create_process_config import createProcessConfig
26from cows_wps.lib.user_manager import UserManager
27from cows_wps.process_handler import validate_arguments
28from cows_wps.process_handler.context.process_context import ProcessContext
29from cows_wps.process_handler.context.process_status import STATUS
30from cows_wps.model.managers import requestManager, cacheManager, caching_enabled, workerPool
31from cows_wps.model.process_dir import processDirManager
32from cows_wps.renderer import xml_renderer
33from cows_wps.process_handler.fileset import FLAG
34from cows_wps.sge.sge_py_int import SgePyInterface
35
36import cows_wps.utils.common as common_utils
37
38#import config files
39from cows_wps.utils.parse_wps_config import wps_config_dict
40from cows_wps.utils.parse_capabilities_config import caps_config_dict
41from cows_wps.utils.parse_outputs_rules import outputs_rules_config_dict
42
43log = logging.getLogger(__name__)
44#-----------------------------------------------------------------------------
45
46
47# Define global variables
48
49class Operation(object):
50    def __init__(self, name, href):
51        self.name = name
52        self.href = href
53
54   
55def addOperation(ops, name):
56    href=helpers.operation(url_for(qualified=True, action="index")+'?',[]).get.href
57    ops.append(Operation(name,href))
58
59
60ops =[]
61log.info("Building config file")
62addOperation(ops, "GetCapabilities")
63addOperation(ops,"DescribeProcess")
64addOperation(ops, "Execute") 
65
66#-----------------------------------------------------------------------------
67
68
69
70class WpsController(ows_controller.OWSController):
71    #-------------------------------------------------------------------------
72    # Attributes required by OWSController
73
74    service = 'WPS'
75    owsOperations = (ows_controller.OWSController.owsOperations +
76                     ['DescribeProcess', 'Execute'])
77    validVersions = ['0.4.0']
78   
79
80    #-------------------------------------------------------------------------
81
82    # Cababilities information is built-up in I{ops} above, therefore no need
83    # for _loadCapabilities()
84    #
85    #def _loadCapabilities(self):
86    #
87   
88    def _renderCapabilities(self, version, format):
89        t= templateLoader.load("wps_getcapabilitiesresponse_template.xml")
90   
91        return t.generate(caps_config_dict=caps_config_dict, ops=ops, c=c).render()
92
93    def DescribeProcess(self):
94        ids = caps_config_dict['Identifiers']
95        t = templateLoader.load('wps_describeprocess_response.xml')
96           
97        # Get identifier
98        identifier = self.getOwsParam('identifier')
99
100        # Handle the case where all descriptions are requested
101        if identifier == 'all':
102           
103            return t.generate(ids=ids).render()
104        # Handle specific process
105        if identifier in ids:
106            response.content_type = 'text/xml'
107            return t.generate(caps_config_dict=caps_config_dict, ids=[identifier]).render()
108        else:
109            raise InvalidParameterValue('Identifier not found', 'identifier')
110
111
112    def _parseDataInputs(self, dataInputs):
113        # If dataInputs is blank return empty dictionary
114        if not dataInputs:
115            return {}
116
117        arg_dict = {}
118        args = dataInputs.split(",")
119        if len(args) %2 != 0:
120            raise ValueError("Must provide even number of arguments in query string for datainput argument (all comma-separated).")
121
122        if len(args) == 1 and args[0] == "": args = []
123
124        while len(args) > 0:
125            arg,value = args[:2]
126            args = args[2:]
127
128            value = urllib.unquote(value)
129
130            if value.find("|") > -1:
131                value = value.split("|")
132
133            arg_dict[arg] = value
134
135        return arg_dict
136
137       
138    def Execute(self):
139        if request.environ.get("REQUEST_METHOD") == "POST":
140            log.warn("POST request was rejected because we do not allow this yet.")
141            abort(405)
142
143        log.info("entering controllers.wps.Execute")
144
145        st = time.time()
146       
147        identifier = self.getOwsParam('Identifier')
148       
149        if identifier not in caps_config_dict['Identifiers']:
150            raise InvalidParameterValue('Identifier not found', 'identifier')
151       
152        process_conf = caps_config_dict[identifier]
153        process_type = process_conf["wps_interface"]["process_type"]
154
155        # Since we generate the execute response document on demand we can ignore the store option.
156        # It might be necessary to reflect it's value in ExecuteResponse for WPS compliance
157        store = self._getBooleanOwsParam('StoreExecuteResponse')
158        if store == True:
159            if not caps_config_dict[identifier]['wps_interface']['store']:
160                raise InvalidParameterValue('Storing output is not supported for this process')
161           
162        #!TODO: Clarify this.  We can always support status because the ExecuteResponse document is created on-demand
163        status = self._getBooleanOwsParam('Status')
164        if status == True:
165            if not caps_config_dict[identifier]['wps_interface']['status']:
166                raise InvalidParameterValue('Status not supported for this process')
167            elif not store:
168                raise InvalidParameterValue('Both status and store must be true')
169
170        #!NOTE: Lineage effects what is put in the ExecuteResponse document.  It should end up in the
171        #    config section of the ProcessContext.
172        self.lineage = self._getBooleanOwsParam('lineage', default='true')
173       
174        #!TODO: Check these custom arguments are in the capabilities document.
175        costonly = self._getBooleanOwsParam('Costonly')
176        inform = self._getBooleanOwsParam('Inform')           
177       
178        #######################################################################
179        # Parse and validate arguments.  First the DataInputs are extracted from
180        # the OWS params then they are validated and deserialised into python
181        # objects.
182        #
183        #   input_dict: plain string values of DataInputs
184        #   arg_dict:   validated against process.ini and deserialised
185        dataInputs = self.getOwsParam('DataInputs')
186        log.debug("dataInputs = %s" % (dataInputs,))
187        input_dict = self._parseDataInputs(dataInputs)
188        va = validate_arguments.ValidateArguments(identifier, input_dict)
189               
190        # Exceptions here will be caught by COWS
191        #!TODO: include more info about the process in the exception by catching and
192        #       rethrowing the exception.
193        try:
194            arg_dict = va.validate()
195        except Exception, err:
196            exc_info = sys.exc_info()
197            log.warning("exception thrown from validator:")
198            log.warning("traceback.format_exc() = %s" % (traceback.format_exc(),))
199           
200            raise InvalidParameterValue('Invalid parameters',err)
201       
202       
203        #######################################################################
204       
205        #######################################################################
206        #!NOTE: RawDataOutput and ResponseDocument handling has been removed from
207        #    the code.  They aren't needed for DCIP and more thought is needed.
208        #######################################################################
209
210        requestId = common_utils.getUniqueId()
211           
212#        # Username is a DCIP-specific parameter
213#        if arg_dict.has_key('Username'):
214#            username = arg_dict['Username']
215#        else:
216#            log.warn('No Username given, BUT DO WE NEED IT?')
217       
218       
219#        (loggedIn, email) = (True, "nobody@nowhere.com")
220#        if loggedIn == False:
221#            raise OwsError("User %s is not logged into the system." % (username))
222
223        # Get username and e-mail address
224        try:
225            um = UserManager()
226        except:
227            raise OwsError("Cannot find security information to access user details.")
228
229        email = um.getEmailAddress()
230        username = um.getUserName()
231
232        log.debug("identifier=%s, username=%s, requestId=%s costonly=%s lineage=%s inform=%s" % \
233                  (identifier, username, requestId, costonly, self.lineage, inform))
234
235        # Update the cache with any completed requests before consulting the cache
236        requestManager.updateRunningRequests()
237       
238        # check if the process results already exist in the cache
239        if caching_enabled:
240            cachedJobId = cacheManager.getCachedJobId(identifier, arg_dict)
241        else:
242            cachedJobId = None
243
244        log.debug("cachedJobId = %s" % (cachedJobId,))
245
246        if cachedJobId:
247            log.debug("request is cached, cacheJobId=%s" % cachedJobId)
248            context = requestManager.addCachedRequest(requestId, cachedJobId, username)
249           
250            context.outputs['job_details']['lineage'] = self.lineage
251           
252            #!TODO: The new requestId will not show up on the jobs controller because it doesn't really exist.
253            #    I.e. it doesn't have a processDir.  We need to create process records that point to the
254            #    cached values.
255           
256            responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
257                                                           self.validVersions[0], 
258                                                           input_dict, requestId,
259                                                           is_cached=True)
260       
261
262            context.close()
263            response.content_type = 'text/xml'
264            return responseXML
265
266       
267       
268       
269        context = self._makeProcessContext(identifier, requestId, process_type, arg_dict)
270        processCallable = self._loadProcess(context, identifier)
271       
272        if process_type in ["async", "async_l", "async_s"]:
273
274            if not hasattr(processCallable, 'dryRun'):
275                raise RuntimeError('Dry Run not supported for asynchronous process %s.' % processCallable)
276
277
278            context.config['Inform'] = str(inform)
279            context.config['notify_email_to'] = email
280           
281            log.info("Running dryrun for non cached async job")
282            self._runProcessDryRun(processCallable, context, requestId)
283           
284            #if the dryRun ran successfully
285            if context.status == STATUS.ACCEPTED:
286               
287                #after dryRun we can deduce the correct process type           
288                if process_type == 'async':
289                    process_type = self._deduceProcessType(context)
290           
291                (accept, message) = self._checkAsyncQueueLimit(context, 
292                                            requestId, process_type, username)
293           
294                if costonly:
295                    if accept:
296                        context.outputs['job_details']['costonly_status'] = ['cleared', 'The request is ready  for execution']
297                    else:
298                        context.outputs['job_details']['costonly_status'] = ['withheld', message]
299                       
300                    context.save()
301
302                else:
303               
304                    if accept:
305                        #run the async process
306                        log.info("Running %s job, callable = %s" % (process_type, processCallable,))
307                        processQueue = self._selectQueue(identifier, process_type)
308                       
309                        self._submitAsyncProcess(identifier, requestId, context, processQueue)
310                    else:
311                        context.setStatus(STATUS.FAILED, message)
312                        context.save()
313       
314        else: # process type is sync
315           
316            if inform == True:
317                log.warning("inform=True found for syncronouse process, ignoring it. \n(identifier = %s request_id = %s, inputs=%s" \
318                            % (identifier, requestId, arg_dict))
319           
320               
321            requestManager.addSyncRequest(requestId, identifier, context)
322                             
323            if hasattr(processCallable, 'dryRun'):
324                self._runProcessDryRun(processCallable, context, requestId)
325           
326            log.debug("Context status = %s" % (context.status,) )
327           
328            if context.status != STATUS.FAILED:
329
330                #!NOTE: Now all sync processes run in a worker pool
331                if process_type == 'sync':
332                    log.info("Running sync job")
333                    pcs = self._getProcessCallableString(identifier)
334                    self._runSubprocProcess(pcs, context, identifier, requestId)
335                                       
336            log.debug("updating request status to context.status( = %s)" % (context.status,))
337           
338            context.save()
339            #update the status in the database
340            requestManager.updateRequestStatus(requestId, context.status)
341           
342            #!TODO:Will this caching be done by the status change listener in requestManager?
343            if caching_enabled and context.status == STATUS.COMPLETED:
344                cacheManager.cacheRequest(requestId)
345           
346        responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
347                                                       self.validVersions[0], 
348                                                       input_dict, requestId,
349                                                       is_cached=False)
350       
351
352        context.close()
353       
354        #remove the context of a costonly process
355        if costonly:
356            context.delete()
357       
358        log.debug("Finished Execute in %.2fs" % (time.time() - st,))
359       
360        response.content_type = 'text/xml'
361        return responseXML
362
363    def _getBooleanOwsParam(self, paramName, default='false' ):
364       
365        value = self.getOwsParam(paramName, default=default)
366        if value == 'true':
367            return True
368        else:
369            return False
370
371    def _getProcessCallableString(self, identifier):
372        """
373        Parse the configuration dict to find the process module and callable names
374       
375        @return: (module_name, callable_name) where both are strings.
376       
377        """
378        log.debug("process callable string = %s" % (caps_config_dict[identifier]["wps_interface"]["process_callable"],))
379        return caps_config_dict[identifier]["wps_interface"]["process_callable"]
380
381    def _loadProcess(self, context, identifier):
382        # Get the module and function needed...
383        processModuleString = self._getProcessCallableString(identifier)
384
385        processCallable = common_utils.buildProcessCallable(processModuleString)
386
387        return processCallable
388   
389           
390    def _deduceProcessType(self, context):
391        """Work-out whether to queue an asynchronous process
392        based on the result of dryRun.
393       
394        """
395        max_duration = int(wps_config_dict['max_proc_duration'])
396        job_details = context.outputs['job_details']
397        log.debug("context.status = %s" % (context.status,))
398        if job_details['duration'] > max_duration:
399            r = 'async_l'
400        else:
401            r = 'async_s'
402
403        log.debug('Selected queue %s for process with directory %s' % (r, context.processDir))
404        return r
405
406    def _makeProcessContext(self, identifier, requestId, process_type, arg_dict):
407        """
408        Creates a context object for a new job
409        """
410       
411        log.info("Creating process context")
412       
413        proc_conf = createProcessConfig(identifier)
414        procDir = processDirManager.makeProcessDir(requestId, identifier)
415       
416        context = ProcessContext(procDir, proc_conf)
417       
418        context.create(arg_dict)           
419        context.setupJobDetails(requestId, self.lineage)
420       
421        return context
422       
423    def _selectQueue(self, identifier, processType):
424        """
425        Return which SGE queue to send a job to.
426       
427        """
428        if processType == 'async_l':
429            processQueue = wps_config_dict['sge_queue_l']
430        elif processType == 'async_s':
431            processQueue = wps_config_dict['sge_queue_s']
432        else:
433            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
434                           
435        if processDirManager.isStripedProcess(identifier):
436            if '@' in processQueue:
437                raise Exception("SGE queue cannot contain a host identifier in striped mode")
438            processQueue = '%s@%s' % (processQueue, processDirManager.getStripedSGEHost())
439       
440        return processQueue
441       
442    def _submitAsyncProcess(self, identifier, jobId, context, queue):
443        """
444        Submits an asynchronous process and returns updated Exec Resp doc.
445        """
446
447        processCallableString = caps_config_dict[identifier]["wps_interface"]["process_callable"]
448       
449        process_dir = os.path.abspath(context.processDir)
450        context.save()
451
452        log.debug("queue name...... %s" % queue)
453       
454        sge = SgePyInterface()
455        try:
456           
457            #!NOTE: I have removed os.path.abspath('python') because we can't be sure
458            #    python is on the same path on an SGE node as on the WPS machine.
459            #    We must make sure python is in on PATH on the SGE node.
460            runproc = wps_config_dict['runproc_path']
461            sge_job_id = sge.submitJob(runproc, [processCallableString, process_dir], 
462                                       process_dir, queue)
463        except:
464            log.error("Exception occurred submitting process to SGE \n callable=%s\n runproc=%s\n process_dir=%s" % (processCallableString, runproc, process_dir))
465            log.exception('Process Exception follows')
466           
467            # catch the exception and wrap it as an OWSError which should get
468            # turned into an error report in cows.
469            exceptionMessage = self._getExceptionMessage()
470           
471            raise OwsError("Error occurred while submitting asynconous job.\n Message=%s" % exceptionMessage)
472       
473            context.save()
474        else:
475            log.debug('submit sge_job_id to requestManager to add it to the job id mapping file')
476            requestManager.addAsyncRequest(jobId, sge_job_id, identifier, context, queue)
477           
478        finally:
479            sge.close()
480       
481    def _runSubprocProcess(self, processCallableString, context, identifier, requestId):
482        """
483        Run a process in the WorkerPool
484        """
485       
486        log.debug("Executing process callable %s in WorkerPool" % (processCallableString,))
487
488        context.setStatus(STATUS.STARTED, 'Job has started', 0)
489        context.log.info("Executing process %s in WorkerPool on HOSTNAME = %s" \
490                  % (processCallableString, os.getenv('HOSTNAME')))
491        try:
492            workerPool.execute(processCallableString, context)
493        except Exception, e:
494
495            log.exception('Process Exception follows')
496
497            # Note: context related cleanup is done in workerPool.  It should be correctly
498            #     synced with the filesystem on return
499           
500            log.warning('WorkerPool job %s failed' % processCallableString)
501           
502        else:
503            log.debug('WorkerPool job %s returned' % processCallableString)
504            if context.status in (STATUS.ACCEPTED, STATUS.STARTED):
505                context.setStatus(STATUS.COMPLETED, '', 100)
506           
507           
508    def _runProcessDryRun(self, processCallable, context, requestId):
509        """
510        Runs the dryRun method of an asyncronous process. If any Exceptions
511        occurr while running the method the context.status will be set to
512        FAILED and the process should not be submitted to SGE.
513        """
514        try:
515            processCallable.dryRun(context)
516           
517        except Exception, e:
518            log.debug("Exception occurred while running %s" % (processCallable.dryRun,))
519            log.exception('Process Exception follows')
520           
521            exceptionMessage = self._getExceptionMessage()
522            context.setStatus(STATUS.FAILED, exceptionMessage)
523
524            #log the exception in the process log as well.
525            context.log.exception('Process Exception follows')
526            context.save()
527        else:
528           
529            if context.status in (STATUS.STARTED, STATUS.COMPLETED):
530                log.error("Process in %s status after running dryRun, processCallable=%s" % (context.status, processCallable,))
531                log.error("Correcting process status to ACCEPTED")
532                context.setStatus(STATUS.ACCEPTED, '')
533               
534    def _checkAsyncQueueLimit(self, context, requestId, processType, username):
535        """
536        Checks with the requestManager how many jobs a user has running on each
537        of the SGE queues. Then comparis the number of jobs running with the
538        maximum limit for the queue and returns if the job should be accepted.
539       
540        returns (accepted, message)
541        """
542       
543        if processType == 'async_l':
544            queueName = wps_config_dict['sge_queue_l']
545            maxProcesses = int(wps_config_dict['max_l_proc'])
546           
547        elif processType == 'async_s':
548            queueName = wps_config_dict['sge_queue_s']
549            maxProcesses = int(wps_config_dict['max_s_proc'])
550        else:
551            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
552       
553        currentJobs = requestManager.countSGEJobsRunning(username, queueName)
554                     
555        if  currentJobs >= maxProcesses:
556            accept = False
557            message = 'You cannot start this request until one of your current processes completes.'
558           
559            log.debug("request denied for user %s who has %s requests running on the %s queue",
560                       username, currentJobs, queueName)
561           
562        else:
563            accept = True
564            message = ""
565       
566        return (accept, message)
567
568    def _getExceptionMessage(self):
569#        log.debug("sys.exc_info() = %s" % (sys.exc_info(),))
570
571        #get the type + info + traceback from the last exception to occurr
572        t, i, tb = sys.exc_info()
573#        log.debug("t = %s" % (t,))
574#        log.debug("i = %s" % (i,))
575               
576        #update the status in the context         
577        exceptionMessage = traceback.format_exception_only(t, i)[0]
578        exceptionMessage = exceptionMessage.strip()       
579        return exceptionMessage
Note: See TracBrowser for help on using the repository browser.