Changeset 7061


Ignore:
Timestamp:
23/06/10 13:15:43 (9 years ago)
Author:
astephen
Message:

Re-factored so dryRun and call step through the same stuff.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • cows_wps/trunk/process_modules/get_weather_stations.py

    r7019 r7061  
    2323 
    2424class GetWeatherStations(object): 
     25 
     26    # Define arguments that we need to set from inputs 
     27    args_to_set = ["Counties", "BBox", "DataTypes", "StartDateTime", "EndDateTime"] 
     28 
     29    # Define defaults for arguments that might not be set 
     30    input_arg_defaults = {"Counties": [], 
     31                          "BBox": None, 
     32                          "DataTypes": [], 
     33                         } 
     34 
     35    # Define a dictionary for arguments that need to be processed  
     36    # before they are set (with values as the function doing the processing). 
     37    arg_processers = {"StartDateTime": gws_utils.revertDateTimeToLongString,  
     38                      "EndDateTime": gws_utils.revertDateTimeToLongString} 
     39 
     40 
    2541    def __call__(self, context): 
     42        """ 
     43        This is where the WPS controller calls the process. 
     44        Since this process is async we actually pass both the 
     45        __call__ and dryRun methods through the same _executeProc  
     46        method which does the same for each except that for  
     47        dry_run = True it will not produce any outputs. 
     48        """ 
     49 
     50        return self._executeProc(context, dry_run = False) 
     51 
     52 
     53    def dryRun(self, context): 
     54        """ 
     55        This is where the WPS controller calls the process for 
     56        an estimate of the volume and duration of the outputs. 
     57 
     58        Since this process is async we actually pass both the 
     59        __call__ and dryRun methods through the same _executeProc  
     60        method which does the same for each except that for  
     61        dry_run = True it will not produce any outputs. 
     62        """ 
     63 
     64        return self._executeProc(context, dry_run = True) 
     65 
     66     
     67    def _executeProc(self, context, dry_run): 
     68        """ 
     69        This is called to step through the various parts of the process  
     70        executing the actual process if ``dry_run`` is False and just  
     71        returning information on the volume and duration of the outputs 
     72        if ``dry_run`` is True. 
     73        """ 
     74        # Always record start time for duration in outputs 
     75        startTime = time.time() 
     76 
     77        # Always need a FileSet, even if empty 
     78        self.fileSet = context.outputs['FileSet'] = FileSet() 
     79 
     80        # parse the inputs and set dictionary: self.args 
     81        self._parseInputs(context.inputs) 
     82 
     83        # Check inputs are compatible 
     84        self._validateInputs() 
     85 
     86        if not dry_run: 
     87            # Now set status to started 
     88            context.setStatus(STATUS.STARTED, 'Job is now running', 0) 
     89 
     90        # Add output file  
     91        stationsFile = 'weather_stations.txt' 
     92        stationsFilePath = os.path.join(context.processDir, "outputs", stationsFile) 
     93 
     94        if not dry_run: 
     95            # Call code to get Weather Stations 
     96            a = self.args 
     97            stationList = gws_utils.getStationList(a["Counties"], a["BBox"], a["DataTypes"],  
     98                           a["StartDateTime"], a["EndDateTime"], stationsFilePath) 
     99 
     100        # Add the stations list to the XML output section: ProcessSpecificContent 
     101        context.outputs['ProcessSpecificContent'] = {"WeatherStations": " ".join(stationList)}  
     102 
     103        # In this case we want to inform the output XML that you can send the outputs to a separate process 
     104        # This string can be picked up the an intelligent client in order to construct a new WPS request 
     105        # with this job as its input 
     106        context.outputs['job_details']['job_capabilities'] = "send_to_extract_weather_data" 
     107 
     108        if not dry_run: 
     109            # We can log information at any time to the main log file 
     110            context.log.info('Written output file: %s' % stationsFilePath) 
     111        else: 
     112            context.log.debug("Running dry run.") 
     113 
     114        # Add the stations file to the outputs 
     115        if not dry_run: 
     116            self._addFileToOutputs(stationsFilePath, 'Weather Stations File') 
     117        else: 
     118            estimated_size = 30000 
     119            self._addFileToOutputs(stationsFilePath, 'Weather Stations File', size = estimated_size) 
     120 
     121        if not dry_run: 
     122            # Finish up by calling function to set status to complete and zip up files etc 
     123            # In this case we set keep = True so that weather station file is accessible to downstream process 
     124            # without unzipping. This is fine as files are small. 
     125            process_support.finishProcess(context, self.fileSet, startTime, keep = True) 
     126        else: 
     127            estimated_duration = 60 # seconds 
     128            process_support.finishDryRun(context, [], self.fileSet, 
     129                            estimated_duration, acceptedMessage = 'Dry run complete')            
     130 
     131 
     132    def _parseInputs(self, inputs): 
     133        """ 
     134        Parse the inputs into an instance dictionary and set defaults where required. 
     135        """ 
     136        self.args = {} 
    26137         
     138        for key in self.args_to_set: 
     139            if key in self.input_arg_defaults.keys(): 
     140                deft = self.input_arg_defaults[key] 
     141                value = inputs.get(key, deft) 
     142 
     143            elif key not in inputs.keys(): 
     144                raise KeyError("Must provide argument '%s' when calling '%s'." % (key, self.__class__.__name__)) 
     145          
     146            else: 
     147                value = inputs[key] 
     148 
     149            # Process arg if required 
     150            if key in self.arg_processers.keys(): 
     151                value = apply(self.arg_processers[key], [value]) 
     152 
     153            self.args[key] = value 
     154 
     155        return 
     156 
     157 
     158    def _validateInputs(self): 
     159        """ 
     160        Runs specific checking of arguments and their compatibility. 
     161        """ 
     162        if self.args["Counties"] == [] and self.args["BBox"] == None: 
     163            raise Exception("Invalid arguments provided. Must provide either a geographical bounding box or a list of counties.") 
     164 
     165 
     166    def _addFileToOutputs(self, fpath, info = 'An output file.', file_flag = FLAG.DATA, size = None): 
     167        """ 
     168        Adds the file to the fileSet outputs. If ``size`` is set (as in  
     169        a dry-run) then use it, otherwise get real size of file. 
     170        """ 
     171        if size == None: 
     172            size = os.stat(fpath)[stat.ST_SIZE] 
     173 
     174        fname = os.path.split(fpath)[-1] 
     175        self.fileSet.contents.append(FileSet(file_flag, fname, size, info)) 
     176 
     177 
     178    def OLDdryRun(self, context): 
     179 
     180        self.fileSet = context.outputs['FileSet'] = FileSet() 
     181        self.fileSet.contents.append(FileSet(FLAG.DATA, "testfile.txt", 30000, "Some file")) 
     182         
     183        process_support.finishDryRun(context, [("meta", "data")], self.fileSet, 
     184                                         60, acceptedMessage='Dry run complete') 
     185   
     186 
     187    def OLD__call__(self, context): 
     188        """ 
     189        This is where the WPS controller calls the process. 
     190        """ 
     191        # Always record start time for duration in outputs 
    27192        startTime = time.time() 
    28193        jobId = os.path.basename(context.processDir) 
     
    64229        self.fileSet.contents.append(FileSet(FLAG.DATA, WFile, filesize, 'Weather Stations File')) 
    65230 
    66 #        context.setStatus(STATUS.COMPLETED, 'The End', 100) 
    67 #        completionTime = time.time() 
    68 #        process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime) 
    69  
    70231        # Set keep = True so that weather station files are accessible to downstream process 
    71232        # without unzipping. This is fine as files are small. 
    72233        process_support.finishProcess(context, self.fileSet, startTime, keep = True) 
    73  
    74  
    75     def dryRun(self, context): 
    76  
    77         self.fileSet = context.outputs['FileSet'] = FileSet() 
    78         self.fileSet.contents.append(FileSet(FLAG.DATA, "testfile.txt", 30000, "Some file")) 
    79          
    80         process_support.finishDryRun(context, [("meta", "data")], self.fileSet, 
    81                                          60, acceptedMessage='Dry run complete') 
    82   
Note: See TracChangeset for help on using the changeset viewer.