source: cows_wps/trunk/cows_wps/controllers/wps.py @ 7573

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/controllers/wps.py@7573
Revision 7573, 23.5 KB checked in by astephen, 10 years ago (diff)

fixed wps namespace in lookup.

Line 
1import logging
2import sys
3import imp
4import traceback
5import urllib
6import os
7import time
8from datetime import datetime
9import random
10
11from pylons import response
12from routes import url_for
13
14#import from cows
15from cows.model import PossibleValues, WGS84BoundingBox, BoundingBox, Contents
16from cows.pylons import ows_controller
17from cows.exceptions import *
18from cows import helpers
19from cows.builder import loadConfigFile
20
21# Import local modules
22from cows_wps.controllers import *
23from cows_wps.utils.create_process_config import createProcessConfig
24from cows_wps.lib.user_manager import UserManager
25from cows_wps.process_handler import validate_arguments
26from cows_wps.process_handler.context.process_context import ProcessContext
27from cows_wps.process_handler.context.process_status import STATUS
28from cows_wps.model.managers import requestManager, cacheManager, caching_enabled, workerPool
29from cows_wps.model.process_dir import processDirManager
30from cows_wps.renderer import xml_renderer
31from cows_wps.process_handler.fileset import FLAG
32from cows_wps.sge.sge_py_int import SgePyInterface
33
34import cows_wps.utils.common as common_utils
35
36#import config files
37from cows_wps.utils.parse_wps_config import wps_config_dict
38from cows_wps.utils.parse_capabilities_config import caps_config_dict
39from cows_wps.utils.parse_outputs_rules import outputs_rules_config_dict
40from cows_wps.utils.role_handler import RoleHandler
41
42log = logging.getLogger(__name__)
43#-----------------------------------------------------------------------------
44
45
46# Define global variables
47
48class Operation(object):
49    def __init__(self, name, href):
50        self.name = name
51        self.href = href
52
53   
54def addOperation(ops, name):
55    href=helpers.operation(url_for(qualified=True, action="index")+'?',[]).get.href
56    ops.append(Operation(name,href))
57
58
59ops =[]
60log.info("Building config file")
61addOperation(ops, "GetCapabilities")
62addOperation(ops,"DescribeProcess")
63addOperation(ops, "Execute") 
64
65#-----------------------------------------------------------------------------
66
67
68
69class WpsController(ows_controller.OWSController):
70    #-------------------------------------------------------------------------
71    # Attributes required by OWSController
72
73    service = 'WPS'
74    owsOperations = (ows_controller.OWSController.owsOperations +
75                     ['DescribeProcess', 'Execute'])
76    validVersions = ['0.4.0']
77   
78
79    #-------------------------------------------------------------------------
80
81    # Cababilities information is built-up in I{ops} above, therefore no need
82    # for _loadCapabilities()
83    #
84    #def _loadCapabilities(self):
85    #
86   
87    def _renderCapabilities(self, version, format):
88        t= templateLoader.load("wps_getcapabilitiesresponse_template.xml")
89   
90        return t.generate(caps_config_dict=caps_config_dict, ops=ops, c=c).render()
91
92    def DescribeProcess(self):
93        ids = caps_config_dict['Identifiers']
94        t = templateLoader.load('wps_describeprocess_response.xml')
95           
96        # Get identifier
97        identifier = self.getOwsParam('identifier')
98
99        # Handle the case where all descriptions are requested
100        if identifier == 'all':
101           
102            return t.generate(ids=ids).render()
103        # Handle specific process
104        if identifier in ids:
105            response.content_type = 'text/xml'
106            return t.generate(caps_config_dict=caps_config_dict, ids=[identifier]).render()
107        else:
108            raise InvalidParameterValue('Identifier not found', 'identifier')
109
110
111    def _parseDataInputs(self, dataInputs):
112        """
113        Parses inputs, returns dictionary.
114        Applies the following rules:
115         * Any values sent with "|" are split into a list.
116         * Any values containing a "+" sign will be replaced with spaces in the value.
117        """
118        # If dataInputs is blank return empty dictionary
119        if not dataInputs:
120            return {}
121
122        arg_dict = {}
123        args = dataInputs.split(",")
124
125        if len(args) %2 != 0:
126            raise ValueError("Must provide even number of arguments in query string for datainput argument (all comma-separated).")
127
128        if len(args) == 1 and args[0] == "": args = []
129
130        while len(args) > 0:
131            (arg, value) = args[:2]
132            args = args[2:]
133
134#            log.warn("VALUE: %s" % value)
135            value = urllib.unquote(value)
136            value = value.replace("+", " ")
137#            log.warn("VALUE2: %s" % value)
138
139            if value.find("|") > -1:
140                value = value.split("|")
141
142            arg_dict[arg] = value
143
144        return arg_dict
145
146       
147    def Execute(self):
148        if request.environ.get("REQUEST_METHOD") == "POST":
149            log.warn("POST request was rejected because we do not allow this yet.")
150            abort(405)
151
152        log.info("entering controllers.wps.Execute")
153
154        st = time.time()
155       
156        identifier = self.getOwsParam('Identifier')
157       
158        if identifier not in caps_config_dict['Identifiers']:
159            raise InvalidParameterValue('Identifier not found', 'identifier')
160       
161        process_conf = caps_config_dict[identifier]
162        process_type = process_conf["wps_interface"]["process_type"]
163
164        # Since we generate the execute response document on demand we can ignore the store option.
165        # It might be necessary to reflect it's value in ExecuteResponse for WPS compliance
166        store = self._getBooleanOwsParam('StoreExecuteResponse')
167        if store == True:
168            if not caps_config_dict[identifier]['wps_interface']['store']:
169                raise InvalidParameterValue('Storing output is not supported for this process')
170           
171        #!TODO: Clarify this.  We can always support status because the ExecuteResponse document is created on-demand
172        status = self._getBooleanOwsParam('Status')
173        if status == True:
174            if not caps_config_dict[identifier]['wps_interface']['status']:
175                raise InvalidParameterValue('Status not supported for this process')
176            elif not store:
177                raise InvalidParameterValue('Both status and store must be true')
178
179        #!NOTE: Lineage effects what is put in the ExecuteResponse document.  It should end up in the
180        #    config section of the ProcessContext.
181        self.lineage = self._getBooleanOwsParam('lineage', default='true')
182       
183        #!TODO: Check these custom arguments are in the capabilities document.
184        costonly = self._getBooleanOwsParam('Costonly')
185        inform = self._getBooleanOwsParam('Inform')           
186       
187        #######################################################################
188        # Parse and validate arguments.  First the DataInputs are extracted from
189        # the OWS params then they are validated and deserialised into python
190        # objects.
191        #
192        #   input_dict: plain string values of DataInputs
193        #   arg_dict:   validated against process.ini and deserialised
194        dataInputs = self.getOwsParam('DataInputs')
195        log.debug("dataInputs = %s" % (dataInputs,))
196        input_dict = self._parseDataInputs(dataInputs)
197        va = validate_arguments.ValidateArguments(identifier, input_dict)
198               
199        # Exceptions here will be caught by COWS
200        try:
201            arg_dict = va.validate()
202        except Exception, err:
203            exc_info = sys.exc_info()
204            log.warning("exception thrown from validator:")
205            log.warning("traceback.format_exc() = %s" % (traceback.format_exc(),))
206           
207            raise InvalidParameterValue('Invalid parameters',err)
208       
209       
210        #######################################################################
211       
212        #######################################################################
213        #!NOTE: RawDataOutput and ResponseDocument handling has been removed from
214        #    the code.  They aren't needed for DCIP and more thought is needed.
215        #######################################################################
216
217        requestId = common_utils.getUniqueId()
218           
219        # Get username and e-mail address from secure session
220        try:
221            um = UserManager()
222            email = um.getEmailAddress()
223            username = um.getUserName()
224        except:
225            log.warn("Setting user to 'anon' because security implies this requires no login.")
226            username = "anon"
227            email = ""
228
229        log.debug("identifier=%s, username=%s, requestId=%s costonly=%s lineage=%s inform=%s" % \
230                  (identifier, username, requestId, costonly, self.lineage, inform))
231
232        # Now work out which authorisedRoles are associated with this Process ID
233        # and this set of arguments
234        rh = RoleHandler()
235
236        try:
237            authorisedRoles = rh.getRoleNumberFromProcessIDAndArgs(identifier, arg_dict) 
238        except:
239            raise OwsError("""The WPS does not recognise the resource you are trying to access.
240Since it cannot reconcile the access restrictions the resource cannot be made available.
241Please report this problem if you believe that the resource you have requested should be visible to you.""")
242
243        # Update the cache with any completed requests before consulting the cache
244        requestManager.updateRunningRequests()
245       
246        # check if the process results already exist in the cache
247        if caching_enabled:
248            cachedJobId = cacheManager.getCachedJobId(identifier, arg_dict)
249        else:
250            cachedJobId = None
251
252        log.debug("cachedJobId = %s" % (cachedJobId,))
253
254        if cachedJobId:
255            log.debug("request is cached, cacheJobId=%s" % cachedJobId)
256            context = requestManager.addCachedRequest(requestId, cachedJobId, username)
257           
258            context.outputs['job_details']['lineage'] = self.lineage
259           
260            #!TODO: The new requestId will not show up on the jobs controller because it doesn't really exist.
261            #    I.e. it doesn't have a processDir.  We need to create process records that point to the
262            #    cached values.
263           
264            responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
265                                                           self.validVersions[0], 
266                                                           input_dict, requestId,
267                                                           authorisedRoles = authorisedRoles,
268                                                           is_cached = True)
269
270            context.close()
271            response.content_type = 'text/xml'
272            return responseXML
273
274       
275       
276       
277        context = self._makeProcessContext(identifier, requestId, process_type, arg_dict)
278        processCallable = self._loadProcess(context, identifier)
279       
280        if process_type in ["async", "async_l", "async_s"]:
281
282            if not hasattr(processCallable, 'dryRun'):
283                raise RuntimeError('Dry Run not supported for asynchronous process %s.' % processCallable)
284
285
286            context.config['Inform'] = str(inform)
287            context.config['notify_email_to'] = email
288           
289            log.info("Running dryrun for non cached async job")
290            self._runProcessDryRun(processCallable, context, requestId)
291           
292            #if the dryRun ran successfully
293            if context.status == STATUS.ACCEPTED:
294               
295                #after dryRun we can deduce the correct process type           
296                if process_type == 'async':
297                    process_type = self._deduceProcessType(context)
298           
299                (accept, message) = self._checkAsyncQueueLimit(context, 
300                                            requestId, process_type, username)
301           
302                if costonly:
303                    if accept:
304                        context.outputs['job_details']['costonly_status'] = ['cleared', 'The request is ready  for execution']
305                    else:
306                        context.outputs['job_details']['costonly_status'] = ['withheld', message]
307                       
308                    context.save()
309
310                else:
311               
312                    if accept:
313                        #run the async process
314                        log.info("Running %s job, callable = %s" % (process_type, processCallable,))
315                        processQueue = self._selectQueue(identifier, process_type)
316                       
317                        self._submitAsyncProcess(identifier, requestId, username, context, processQueue)
318                    else:
319                        context.setStatus(STATUS.FAILED, message)
320                        context.save()
321       
322        else: # process type is sync
323           
324            if inform == True:
325                log.warning("inform=True found for syncronouse process, ignoring it. \n(identifier = %s request_id = %s, inputs=%s" \
326                            % (identifier, requestId, arg_dict))
327           
328               
329            requestManager.addSyncRequest(requestId, username, identifier, context)
330                             
331            if hasattr(processCallable, 'dryRun'):
332                self._runProcessDryRun(processCallable, context, requestId)
333           
334            log.debug("Context status = %s" % (context.status,) )
335           
336            if context.status != STATUS.FAILED:
337
338                #!NOTE: Now all sync processes run in a worker pool
339                if process_type == 'sync':
340                    log.info("Running sync job")
341                    pcs = self._getProcessCallableString(identifier)
342                    self._runSubprocProcess(pcs, context, identifier, requestId)
343                                       
344            log.debug("updating request status to context.status( = %s)" % (context.status,))
345           
346            context.save()
347            #update the status in the database
348            requestManager.updateRequestStatus(requestId, context.status)
349           
350            #!TODO:Will this caching be done by the status change listener in requestManager?
351            if caching_enabled and context.status == STATUS.COMPLETED:
352                cacheManager.cacheRequest(requestId)
353           
354        responseXML = xml_renderer.wrapExecuteResponse(context, identifier, 
355                                                       self.validVersions[0], 
356                                                       input_dict, requestId,
357                                                       authorisedRoles = authorisedRoles,
358                                                       is_cached=False)
359       
360
361        context.close()
362       
363        #remove the context of a costonly process
364        if costonly:
365            context.delete()
366       
367        log.debug("Finished Execute in %.2fs" % (time.time() - st,))
368       
369        response.content_type = 'text/xml'
370        return responseXML
371
372    def _getBooleanOwsParam(self, paramName, default='false' ):
373       
374        value = self.getOwsParam(paramName, default=default)
375        if value == 'true':
376            return True
377        else:
378            return False
379
380    def _getProcessCallableString(self, identifier):
381        """
382        Parse the configuration dict to find the process module and callable names
383       
384        @return: (module_name, callable_name) where both are strings.
385       
386        """
387        log.debug("process callable string = %s" % (caps_config_dict[identifier]["wps_interface"]["process_callable"],))
388        return caps_config_dict[identifier]["wps_interface"]["process_callable"]
389
390    def _loadProcess(self, context, identifier):
391        # Get the module and function needed...
392        processModuleString = self._getProcessCallableString(identifier)
393
394        processCallable = common_utils.buildProcessCallable(processModuleString)
395
396        return processCallable
397   
398           
399    def _deduceProcessType(self, context):
400        """Work-out whether to queue an asynchronous process
401        based on the result of dryRun.
402       
403        """
404        max_duration = int(wps_config_dict['max_proc_duration'])
405        job_details = context.outputs['job_details']
406        log.debug("context.status = %s" % (context.status,))
407        if job_details['duration'] > max_duration:
408            r = 'async_l'
409        else:
410            r = 'async_s'
411
412        log.debug('Selected queue %s for process with directory %s' % (r, context.processDir))
413        return r
414
415    def _makeProcessContext(self, identifier, requestId, process_type, arg_dict):
416        """
417        Creates a context object for a new job
418        """
419       
420        log.info("Creating process context")
421       
422        proc_conf = createProcessConfig(identifier)
423        procDir = processDirManager.makeProcessDir(requestId, identifier)
424       
425        context = ProcessContext(procDir, proc_conf)
426       
427        context.create(arg_dict)           
428        context.setupJobDetails(requestId, self.lineage)
429       
430        return context
431       
432    def _selectQueue(self, identifier, processType):
433        """
434        Return which SGE queue to send a job to.
435       
436        """
437        if processType == 'async_l':
438            processQueue = wps_config_dict['sge_queue_l']
439        elif processType == 'async_s':
440            processQueue = wps_config_dict['sge_queue_s']
441        else:
442            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
443                           
444        if processDirManager.isStripedProcess(identifier):
445            if '@' in processQueue:
446                raise Exception("SGE queue cannot contain a host identifier in striped mode")
447            processQueue = '%s@%s' % (processQueue, processDirManager.getStripedSGEHost())
448       
449        return processQueue
450       
451    def _submitAsyncProcess(self, identifier, jobId, username, context, queue):
452        """
453        Submits an asynchronous process and returns updated Exec Resp doc.
454        """
455
456        processCallableString = caps_config_dict[identifier]["wps_interface"]["process_callable"]
457       
458        process_dir = os.path.abspath(context.processDir)
459        context.save()
460
461        log.debug("queue name...... %s" % queue)
462       
463        sge = SgePyInterface()
464        try:
465           
466            #!NOTE: I have removed os.path.abspath('python') because we can't be sure
467            #    python is on the same path on an SGE node as on the WPS machine.
468            #    We must make sure python is in on PATH on the SGE node.
469            runproc = wps_config_dict['runproc_path']
470            sge_job_id = sge.submitJob(runproc, [processCallableString, process_dir], 
471                                       process_dir, queue)
472        except:
473            log.error("Exception occurred submitting process to SGE \n callable=%s\n runproc=%s\n process_dir=%s" % (processCallableString, runproc, process_dir))
474            log.exception('Process Exception follows')
475           
476            # catch the exception and wrap it as an OWSError which should get
477            # turned into an error report in cows.
478            exceptionMessage = self._getExceptionMessage()
479           
480            raise OwsError("Error occurred while submitting asynconous job.\n Message=%s" % exceptionMessage)
481       
482            context.save()
483        else:
484            log.debug('submit sge_job_id to requestManager to add it to the job id mapping file')
485            requestManager.addAsyncRequest(jobId, username, sge_job_id, identifier, context, queue)
486           
487        finally:
488            sge.close()
489       
490    def _runSubprocProcess(self, processCallableString, context, identifier, requestId):
491        """
492        Run a process in the WorkerPool
493        """
494       
495        log.debug("Executing process callable %s in WorkerPool" % (processCallableString,))
496
497        context.setStatus(STATUS.STARTED, 'Job has started', 0)
498        context.log.info("Executing process %s in WorkerPool on HOSTNAME = %s" \
499                  % (processCallableString, os.getenv('HOSTNAME')))
500        try:
501            workerPool.execute(processCallableString, context)
502        except Exception, e:
503
504            log.exception('Process Exception follows')
505
506            # Note: context related cleanup is done in workerPool.  It should be correctly
507            #     synced with the filesystem on return
508           
509            log.warning('WorkerPool job %s failed' % processCallableString)
510           
511        else:
512            log.debug('WorkerPool job %s returned' % processCallableString)
513            if context.status in (STATUS.ACCEPTED, STATUS.STARTED):
514                context.setStatus(STATUS.COMPLETED, '', 100)
515           
516           
517    def _runProcessDryRun(self, processCallable, context, requestId):
518        """
519        Runs the dryRun method of an asyncronous process. If any Exceptions
520        occurr while running the method the context.status will be set to
521        FAILED and the process should not be submitted to SGE.
522        """
523        try:
524            processCallable.dryRun(context)
525           
526        except Exception, e:
527            log.debug("Exception occurred while running %s" % (processCallable.dryRun,))
528            log.exception('Process Exception follows')
529           
530            exceptionMessage = self._getExceptionMessage()
531            context.setStatus(STATUS.FAILED, exceptionMessage)
532
533            #log the exception in the process log as well.
534            context.log.exception('Process Exception follows')
535            context.save()
536        else:
537           
538            if context.status in (STATUS.STARTED, STATUS.COMPLETED):
539                log.error("Process in %s status after running dryRun, processCallable=%s" % (context.status, processCallable,))
540                log.error("Correcting process status to ACCEPTED")
541                context.setStatus(STATUS.ACCEPTED, '')
542               
543    def _checkAsyncQueueLimit(self, context, requestId, processType, username):
544        """
545        Checks with the requestManager how many jobs a user has running on each
546        of the SGE queues. Then comparis the number of jobs running with the
547        maximum limit for the queue and returns if the job should be accepted.
548       
549        returns (accepted, message)
550        """
551       
552        if processType == 'async_l':
553            queueName = wps_config_dict['sge_queue_l']
554            maxProcesses = int(wps_config_dict['max_l_proc'])
555           
556        elif processType == 'async_s':
557            queueName = wps_config_dict['sge_queue_s']
558            maxProcesses = int(wps_config_dict['max_s_proc'])
559        else:
560            raise Exception("Unknown process type %s, should be 'async_l' or 'async_s'" % (processType,) )
561       
562        currentJobs = requestManager.countSGEJobsRunning(username, queueName)
563                     
564        if  currentJobs >= maxProcesses:
565            accept = False
566            message = 'You cannot start this request until one of your current processes completes.'
567           
568            log.debug("request denied for user %s who has %s requests running on the %s queue",
569                       username, currentJobs, queueName)
570           
571        else:
572            accept = True
573            message = ""
574       
575        return (accept, message)
576
577    def _getExceptionMessage(self):
578#        log.debug("sys.exc_info() = %s" % (sys.exc_info(),))
579
580        #get the type + info + traceback from the last exception to occurr
581        t, i, tb = sys.exc_info()
582#        log.debug("t = %s" % (t,))
583#        log.debug("i = %s" % (i,))
584               
585        #update the status in the context         
586        exceptionMessage = traceback.format_exception_only(t, i)[0]
587        exceptionMessage = exceptionMessage.strip()       
588        return exceptionMessage
Note: See TracBrowser for help on using the repository browser.