source: cows_wps/trunk/cows_wps/controllers/wps.py @ 6873

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/controllers/wps.py@6873
Revision 6873, 22.5 KB checked in by astephen, 10 years ago (diff)

Added and updated various code to match security, download URLs that are
secured and chunking of MIDAS outputs.

Line 
1#!TODO: CEDA license agreement.  See cows/add_license.py for script to do this.
2
3import logging
4import sys
5import imp
6import traceback
7import urllib
8import os
9import time
10from datetime import datetime
11import random
12from pylons import response
13
14#import from cows
15from cows.model import PossibleValues, WGS84BoundingBox, BoundingBox, Contents
16from cows.pylons import ows_controller
17from cows.exceptions import *
18from cows import helpers
19from cows.builder import loadConfigFile
20
21# Import local modules
22from cows_wps.controllers import *
23from cows_wps.utils.create_process_config import createProcessConfig
24from routes import url_for
25from cows_wps.process_handler import validate_arguments
26from cows_wps.process_handler.context.process_context import ProcessContext
27from cows_wps.process_handler.context.process_status import STATUS
28from cows_wps.model.managers import requestManager, cacheManager, caching_enabled, workerPool
29from cows_wps.model.process_dir import processDirManager
30from cows_wps.renderer import xml_renderer
31from cows_wps.process_handler.fileset import FLAG
32from cows_wps.sge.sge_py_int import SgePyInterface
33
34import cows_wps.utils.common as common_utils
35
36#import config files
37from cows_wps.utils.parse_wps_config import wps_config_dict
38from cows_wps.utils.parse_capabilities_config import caps_config_dict
39from cows_wps.utils.parse_outputs_rules import outputs_rules_config_dict
40
41log = logging.getLogger(__name__)
42#-----------------------------------------------------------------------------
43
44
45# Define global variables
46
47class Operation(object):
48    def __init__(self, name, href):
49        self.name = name
50        self.href = href
51
52   
53def addOperation(ops, name):
54    href=helpers.operation(url_for(qualified=True, action="index")+'?',[]).get.href
55    ops.append(Operation(name,href))
56
57
58ops =[]
59log.info("Building config file")
60addOperation(ops, "GetCapabilities")
61addOperation(ops,"DescribeProcess")
62addOperation(ops, "Execute") 
63
64#-----------------------------------------------------------------------------
65
66
67
68class WpsController(ows_controller.OWSController):
69    #-------------------------------------------------------------------------
70    # Attributes required by OWSController
71
72    service = 'WPS'
73    owsOperations = (ows_controller.OWSController.owsOperations +
74                     ['DescribeProcess', 'Execute'])
75    validVersions = ['0.4.0']
76   
77
78    #-------------------------------------------------------------------------
79
80    # Cababilities information is built-up in I{ops} above, therefore no need
81    # for _loadCapabilities()
82    #
83    #def _loadCapabilities(self):
84    #
85   
86    def _renderCapabilities(self, version, format):
87        t= templateLoader.load("wps_getcapabilitiesresponse_template.xml")
88   
89        return t.generate(caps_config_dict=caps_config_dict, ops=ops, c=c).render()
90
91    def DescribeProcess(self):
92        ids = caps_config_dict['Identifiers']
93        t = templateLoader.load('wps_describeprocess_response.xml')
94           
95        # Get identifier
96        identifier = self.getOwsParam('identifier')
97
98        # Handle the case where all descriptions are requested
99        if identifier == 'all':
100           
101            return t.generate(ids=ids).render()
102        # Handle specific process
103        if identifier in ids:
104            response.content_type = 'text/xml'
105            return t.generate(caps_config_dict=caps_config_dict, ids=[identifier]).render()
106        else:
107            raise InvalidParameterValue('Identifier not found', 'identifier')
108
109
110    def _parseDataInputs(self, dataInputs):
111        # If dataInputs is blank return empty dictionary
112        if not dataInputs:
113            return {}
114
115        arg_dict = {}
116        args = dataInputs.split(",")
117        if len(args) %2 != 0:
118            raise ValueError("Must provide even number of arguments in query string for datainput argument (all comma-separated).")
119
120        if len(args) == 1 and args[0] == "": args = []
121
122        while len(args) > 0:
123            arg,value = args[:2]
124            args = args[2:]
125
126            value = urllib.unquote(value)
127
128            if value.find("|") > -1:
129                value = value.split("|")
130
131            arg_dict[arg] = value
132
133        return arg_dict
134
135       
136    def Execute(self):
137        if request.environ.get("REQUEST_METHOD") == "POST":
138            log.warn("POST request was rejected because we do not allow this yet.")
139            abort(405)
140
141        log.info("entering controllers.wps.Execute")
142
143        st = time.time()
144       
145        identifier = self.getOwsParam('Identifier')
146       
147        if identifier not in caps_config_dict['Identifiers']:
148            raise InvalidParameterValue('Identifier not found', 'identifier')
149       
150        process_conf = caps_config_dict[identifier]
151        process_type = process_conf["wps_interface"]["process_type"]
152
153        # Since we generate the execute response document on demand we can ignore the store option.
154        # It might be necessary to reflect it's value in ExecuteResponse for WPS compliance
155        store = self._getBooleanOwsParam('StoreExecuteResponse')
156        if store == True:
157            if not caps_config_dict[identifier]['wps_interface']['store']:
158                raise InvalidParameterValue('Storing output is not supported for this process')
159           
160        #!TODO: Clarify this.  We can always support status because the ExecuteResponse document is created on-demand
161        status = self._getBooleanOwsParam('Status')
162        if status == True:
163            if not caps_config_dict[identifier]['wps_interface']['status']:
164                raise InvalidParameterValue('Status not supported for this process')
165            elif not store:
166                raise InvalidParameterValue('Both status and store must be true')
167
168        #!NOTE: Lineage effects what is put in the ExecuteResponse document.  It should end up in the
169        #    config section of the ProcessContext.
170        self.lineage = self._getBooleanOwsParam('lineage', default='true')
171       
172        #!TODO: Check these custom arguments are in the capabilities document.
173        costonly = self._getBooleanOwsParam('Costonly')
174        inform = self._getBooleanOwsParam('Inform')           
175       
176        #######################################################################
177        # Parse and validate arguments.  First the DataInputs are extracted from
178        # the OWS params then they are validated and deserialised into python
179        # objects.
180        #
181        #   input_dict: plain string values of DataInputs
182        #   arg_dict:   validated against process.ini and deserialised
183        dataInputs = self.getOwsParam('DataInputs')
184        log.debug("dataInputs = %s" % (dataInputs,))
185        input_dict = self._parseDataInputs(dataInputs)
186        va = validate_arguments.ValidateArguments(identifier, input_dict)
187               
188        # Exceptions here will be caught by COWS
189        #!TODO: include more info about the process in the exception by catching and
190        #       rethrowing the exception.
191        try:
192            arg_dict = va.validate()
193        except Exception, err:
194            exc_info = sys.exc_info()
195            log.warning("exception thrown from validator:")
196            log.warning("traceback.format_exc() = %s" % (traceback.format_exc(),))
197           
198            raise InvalidParameterValue('Invalid parameters',err)
199       
200       
201        #######################################################################
202       
203        #######################################################################
204        #!NOTE: RawDataOutput and ResponseDocument handling has been removed from
205        #    the code.  They aren't needed for DCIP and more thought is needed.
206        #######################################################################
207
208        requestId = common_utils.getUniqueId()
209           
210        # Username is a DCIP-specific parameter
211        if arg_dict.has_key('Username'):
212            username = arg_dict['Username']
213        else:
214            log.warn('No Username given, BUT DO WE NEED IT?')
215       
216       
217        #!TODO: Reimplement user management
218        (loggedIn, email) = (True, "nobody@nowhere.com")
219       
220        if loggedIn == False:
221            raise OwsError("User %s is not logged into the system." % (username))
222
223        log.debug("identifier=%s, username=%s, requestId=%s costonly=%s lineage=%s inform=%s" % \
224                  (identifier, username, requestId, costonly, self.lineage, inform))
225
226        # Update the cache with any completed requests before consulting the cache
227        requestManager.updateRunningRequests()
228       
229        # check if the process results already exist in the cache
230        if caching_enabled:
231            cachedJobId = cacheManager.getCachedJobId(identifier, arg_dict)
232        else:
233            cachedJobId = None
234
235        log.debug("cachedJobId = %s" % (cachedJobId,))
236
237        if cachedJobId:
238            log.debug("request is cached, cacheJobId=%s" % cachedJobId)
239            context = requestManager.addCachedRequest(requestId, cachedJobId, username)
240           
241            context.outputs['job_details']['lineage'] = self.lineage
242           
243            #!TODO: The new requestId will not show up on the jobs controller because it doesn't really exist.
244            #    I.e. it doesn't have a processDir.  We need to create process records that point to the
245            #    cached values.
246           
247            responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
248                                                           self.validVersions[0], 
249                                                           input_dict, requestId,
250                                                           is_cached=True)
251       
252
253            context.close()
254            response.content_type = 'text/xml'
255            return responseXML
256
257       
258       
259       
260        context = self._makeProcessContext(identifier, requestId, process_type, arg_dict)
261        processCallable = self._loadProcess(context, identifier)
262       
263        if process_type in ["async", "async_l", "async_s"]:
264
265            if not hasattr(processCallable, 'dryRun'):
266                raise RuntimeError('Dry Run not supported for asynchronous process %s.' % processCallable)
267
268
269            context.config['Inform'] = str(inform)
270            context.config['notify_email_to'] = email
271           
272            log.info("Running dryrun for non cached async job")
273            self._runProcessDryRun(processCallable, context, requestId)
274           
275            #if the dryRun ran successfully
276            if context.status == STATUS.ACCEPTED:
277               
278                #after dryRun we can deduce the correct process type           
279                if process_type == 'async':
280                    process_type = self._deduceProcessType(context)
281           
282                (accept, message) = self._checkAsyncQueueLimit(context, 
283                                            requestId, process_type, username)
284           
285                if costonly:
286                    if accept:
287                        context.outputs['job_details']['costonly_status'] = ['cleared', 'The request is ready  for execution']
288                    else:
289                        context.outputs['job_details']['costonly_status'] = ['withheld', message]
290                       
291                    context.save()
292
293                else:
294               
295                    if accept:
296                        #run the async process
297                        log.info("Running %s job, callable = %s" % (process_type, processCallable,))
298                        processQueue = self._selectQueue(identifier, process_type)
299                       
300                        self._submitAsyncProcess(identifier, requestId, context, processQueue)
301                    else:
302                        context.setStatus(STATUS.FAILED, message)
303                        context.save()
304       
305        else: # process type is sync
306           
307            if inform == True:
308                log.warning("inform=True found for syncronouse process, ignoring it. \n(identifier = %s request_id = %s, inputs=%s" \
309                            % (identifier, requestId, arg_dict))
310           
311               
312            requestManager.addSyncRequest(requestId, identifier, context)
313                             
314            if hasattr(processCallable, 'dryRun'):
315                self._runProcessDryRun(processCallable, context, requestId)
316           
317            log.debug("Context status = %s" % (context.status,) )
318           
319            if context.status != STATUS.FAILED:
320
321                #!NOTE: Now all sync processes run in a worker pool
322                if process_type == 'sync':
323                    log.info("Running sync job")
324                    pcs = self._getProcessCallableString(identifier)
325                    self._runSubprocProcess(pcs, context, identifier, requestId)
326                                       
327            log.debug("updating request status to context.status( = %s)" % (context.status,))
328           
329            context.save()
330            #update the status in the database
331            requestManager.updateRequestStatus(requestId, context.status)
332           
333            #!TODO:Will this caching be done by the status change listener in requestManager?
334            if caching_enabled and context.status == STATUS.COMPLETED:
335                cacheManager.cacheRequest(requestId)
336           
337        responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
338                                                       self.validVersions[0], 
339                                                       input_dict, requestId,
340                                                       is_cached=False)
341       
342
343        context.close()
344       
345        #remove the context of a costonly process
346        if costonly:
347            context.delete()
348       
349        log.debug("Finished Execute in %.2fs" % (time.time() - st,))
350       
351        response.content_type = 'text/xml'
352        return responseXML
353
354    def _getBooleanOwsParam(self, paramName, default='false' ):
355       
356        value = self.getOwsParam(paramName, default=default)
357        if value == 'true':
358            return True
359        else:
360            return False
361
362    def _getProcessCallableString(self, identifier):
363        """
364        Parse the configuration dict to find the process module and callable names
365       
366        @return: (module_name, callable_name) where both are strings.
367       
368        """
369        log.debug("process callable string = %s" % (caps_config_dict[identifier]["wps_interface"]["process_callable"],))
370        return caps_config_dict[identifier]["wps_interface"]["process_callable"]
371
372    def _loadProcess(self, context, identifier):
373        # Get the module and function needed...
374        processModuleString = self._getProcessCallableString(identifier)
375
376        processCallable = common_utils.buildProcessCallable(processModuleString)
377
378        return processCallable
379   
380           
381    def _deduceProcessType(self, context):
382        """Work-out whether to queue an asynchronous process
383        based on the result of dryRun.
384       
385        """
386        max_duration = int(wps_config_dict['max_proc_duration'])
387        job_details = context.outputs['job_details']
388        log.debug("context.status = %s" % (context.status,))
389        if job_details['duration'] > max_duration:
390            r = 'async_l'
391        else:
392            r = 'async_s'
393
394        log.debug('Selected queue %s for process with directory %s' % (r, context.processDir))
395        return r
396
397    def _makeProcessContext(self, identifier, requestId, process_type, arg_dict):
398        """
399        Creates a context object for a new job
400        """
401       
402        log.info("Creating process context")
403       
404        proc_conf = createProcessConfig(identifier)
405        procDir = processDirManager.makeProcessDir(requestId, identifier)
406       
407        context = ProcessContext(procDir, proc_conf)
408       
409        context.create(arg_dict)           
410        context.setupJobDetails(requestId, self.lineage)
411       
412        return context
413       
414    def _selectQueue(self, identifier, processType):
415        """
416        Return which SGE queue to send a job to.
417       
418        """
419        if processType == 'async_l':
420            processQueue = wps_config_dict['sge_queue_l']
421        elif processType == 'async_s':
422            processQueue = wps_config_dict['sge_queue_s']
423        else:
424            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
425                           
426        if processDirManager.isStripedProcess(identifier):
427            if '@' in processQueue:
428                raise Exception("SGE queue cannot contain a host identifier in striped mode")
429            processQueue = '%s@%s' % (processQueue, processDirManager.getStripedSGEHost())
430       
431        return processQueue
432       
433    def _submitAsyncProcess(self, identifier, jobId, context, queue):
434        """
435        Submits an asynchronous process and returns updated Exec Resp doc.
436        """
437
438        processCallableString = caps_config_dict[identifier]["wps_interface"]["process_callable"]
439       
440        process_dir = os.path.abspath(context.processDir)
441        context.save()
442
443        log.debug("queue name...... %s" % queue)
444       
445        sge = SgePyInterface()
446        try:
447           
448            #!NOTE: I have removed os.path.abspath('python') because we can't be sure
449            #    python is on the same path on an SGE node as on the WPS machine.
450            #    We must make sure python is in on PATH on the SGE node.
451            runproc = wps_config_dict['runproc_path']
452            sge_job_id = sge.submitJob(runproc, [processCallableString, process_dir], 
453                                       process_dir, queue)
454        except:
455            log.error("Exception occurred submitting process to SGE \n callable=%s\n runproc=%s\n process_dir=%s" % (processCallableString, runproc, process_dir))
456            log.exception('Process Exception follows')
457           
458            # catch the exception and wrap it as an OWSError which should get
459            # turned into an error report in cows.
460            exceptionMessage = self._getExceptionMessage()
461           
462            raise OwsError("Error occurred while submitting asynconous job.\n Message=%s" % exceptionMessage)
463       
464            context.save()
465        else:
466            log.debug('submit sge_job_id to requestManager to add it to the job id mapping file')
467            requestManager.addAsyncRequest(jobId, sge_job_id, identifier, context, queue)
468           
469        finally:
470            sge.close()
471       
472    def _runSubprocProcess(self, processCallableString, context, identifier, requestId):
473        """
474        Run a process in the WorkerPool
475        """
476       
477        log.debug("Executing process callable %s in WorkerPool" % (processCallableString,))
478
479        context.setStatus(STATUS.STARTED, 'Job has started', 0)
480        context.log.info("Executing process %s in WorkerPool on HOSTNAME = %s" \
481                  % (processCallableString, os.getenv('HOSTNAME')))
482        try:
483            workerPool.execute(processCallableString, context)
484        except Exception, e:
485
486            log.exception('Process Exception follows')
487
488            # Note: context related cleanup is done in workerPool.  It should be correctly
489            #     synced with the filesystem on return
490           
491            log.warning('WorkerPool job %s failed' % processCallableString)
492           
493        else:
494            log.debug('WorkerPool job %s returned' % processCallableString)
495            if context.status in (STATUS.ACCEPTED, STATUS.STARTED):
496                context.setStatus(STATUS.COMPLETED, '', 100)
497           
498           
499    def _runProcessDryRun(self, processCallable, context, requestId):
500        """
501        Runs the dryRun method of an asyncronous process. If any Exceptions
502        occurr while running the method the context.status will be set to
503        FAILED and the process should not be submitted to SGE.
504        """
505        try:
506            processCallable.dryRun(context)
507           
508        except Exception, e:
509            log.debug("Exception occurred while running %s" % (processCallable.dryRun,))
510            log.exception('Process Exception follows')
511           
512            exceptionMessage = self._getExceptionMessage()
513            context.setStatus(STATUS.FAILED, exceptionMessage)
514
515            #log the exception in the process log as well.
516            context.log.exception('Process Exception follows')
517            context.save()
518        else:
519           
520            if context.status in (STATUS.STARTED, STATUS.COMPLETED):
521                log.error("Process in %s status after running dryRun, processCallable=%s" % (context.status, processCallable,))
522                log.error("Correcting process status to ACCEPTED")
523                context.setStatus(STATUS.ACCEPTED, '')
524               
525    def _checkAsyncQueueLimit(self, context, requestId, processType, username):
526        """
527        Checks with the requestManager how many jobs a user has running on each
528        of the SGE queues. Then comparis the number of jobs running with the
529        maximum limit for the queue and returns if the job should be accepted.
530       
531        returns (accepted, message)
532        """
533       
534        if processType == 'async_l':
535            queueName = wps_config_dict['sge_queue_l']
536            maxProcesses = int(wps_config_dict['max_l_proc'])
537           
538        elif processType == 'async_s':
539            queueName = wps_config_dict['sge_queue_s']
540            maxProcesses = int(wps_config_dict['max_s_proc'])
541        else:
542            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
543       
544        currentJobs = requestManager.countSGEJobsRunning(username, queueName)
545                     
546        if  currentJobs >= maxProcesses:
547            accept = False
548            message = 'You cannot start this request until one of your current processes completes.'
549           
550            log.debug("request denied for user %s who has %s requests running on the %s queue",
551                       username, currentJobs, queueName)
552           
553        else:
554            accept = True
555            message = ""
556       
557        return (accept, message)
558
559    def _getExceptionMessage(self):
560#        log.debug("sys.exc_info() = %s" % (sys.exc_info(),))
561
562        #get the type + info + traceback from the last exception to occurr
563        t, i, tb = sys.exc_info()
564#        log.debug("t = %s" % (t,))
565#        log.debug("i = %s" % (i,))
566               
567        #update the status in the context         
568        exceptionMessage = traceback.format_exception_only(t, i)[0]
569        exceptionMessage = exceptionMessage.strip()       
570        return exceptionMessage
Note: See TracBrowser for help on using the repository browser.