source: cows_wps/trunk/process_modules/get_weather_stations.py @ 7066

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/get_weather_stations.py@7066
Revision 7066, 4.1 KB checked in by astephen, 10 years ago (diff)

got it working with process_base class.

Line 
1"""
2get_weather_stations.py
3===================
4
5Process get_weather_stations that holds the GetWeatherStations class.
6
7"""
8
9import os, stat, time
10import sys
11import logging
12
13from cows_wps.process_handler.fileset import FileSet, FLAG
14import cows_wps.process_handler.process_support as process_support
15from cows_wps.process_handler.context.process_status import STATUS
16
17import process_modules.lib_get_weather_stations.utils as gws_utils
18import process_modules.process_base
19
20
21log = logging.getLogger(__name__)
22log.setLevel(logging.DEBUG)
23
24
25class GetWeatherStations(process_modules.process_base.ProcessBase):
26
27    # Define arguments that we need to set from inputs
28    args_to_set = ["Counties", "BBox", "DataTypes", "StartDateTime", "EndDateTime"]
29
30    # Define defaults for arguments that might not be set
31    input_arg_defaults = {"Counties": [],
32                          "BBox": None,
33                          "DataTypes": [],
34                         }
35
36    # Define a dictionary for arguments that need to be processed
37    # before they are set (with values as the function doing the processing).
38    arg_processers = {"StartDateTime": gws_utils.revertDateTimeToLongString, 
39                      "EndDateTime": gws_utils.revertDateTimeToLongString}
40
41   
42    def _executeProc(self, context, dry_run):
43        """
44        This is called to step through the various parts of the process
45        executing the actual process if ``dry_run`` is False and just
46        returning information on the volume and duration of the outputs
47        if ``dry_run`` is True.
48        """
49        # Call standard _setup
50        self._setup(context)
51
52        if not dry_run:
53            # Now set status to started
54            context.setStatus(STATUS.STARTED, 'Job is now running', 0)
55
56        # Add output file
57        stationsFile = 'weather_stations.txt'
58        stationsFilePath = os.path.join(context.processDir, "outputs", stationsFile)
59
60        if not dry_run:
61            # Call code to get Weather Stations
62            a = self.args
63            stationList = gws_utils.getStationList(a["Counties"], a["BBox"], a["DataTypes"], 
64                           a["StartDateTime"], a["EndDateTime"], stationsFilePath)
65       
66            # Add the stations list to the XML output section: ProcessSpecificContent
67            context.outputs['ProcessSpecificContent'] = {"WeatherStations": " ".join(stationList)} 
68
69        # In this case we want to inform the output XML that you can send the outputs to a separate process
70        # This string can be picked up the an intelligent client in order to construct a new WPS request
71        # with this job as its input
72        context.outputs['job_details']['job_capabilities'] = "send_to_extract_weather_data"
73
74        if not dry_run:
75            # We can log information at any time to the main log file
76            context.log.info('Written output file: %s' % stationsFilePath)
77        else:
78            context.log.debug("Running dry run.")
79
80        # Add the stations file to the outputs
81        if not dry_run:
82            self._addFileToOutputs(stationsFilePath, 'Weather Stations File')
83        else:
84            estimated_size = 30000
85            self._addFileToOutputs(stationsFilePath, 'Weather Stations File', size = estimated_size)
86
87        if not dry_run:
88            # Finish up by calling function to set status to complete and zip up files etc
89            # In this case we set keep = True so that weather station file is accessible to downstream process
90            # without unzipping. This is fine as files are small.
91            process_support.finishProcess(context, self.fileSet, self.startTime, keep = True)
92        else:
93            estimated_duration = 60 # seconds
94            process_support.finishDryRun(context, [], self.fileSet,
95                            estimated_duration, acceptedMessage = 'Dry run complete')           
96
97
98    def _validateInputs(self):
99        """
100        Runs specific checking of arguments and their compatibility.
101        """
102        if self.args["Counties"] == [] and self.args["BBox"] == None:
103            raise Exception("Invalid arguments provided. Must provide either a geographical bounding box or a list of counties.")
Note: See TracBrowser for help on using the repository browser.