source: cows_wps/trunk/process_modules/get_weather_stations.py @ 7061

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/get_weather_stations.py@7061
Revision 7061, 8.9 KB checked in by astephen, 10 years ago (diff)

Re-factored so dryRun and call step through the same stuff.

Line 
1"""
2get_weather_stations.py
3===================
4
5Process get_weather_stations that holds the GetWeatherStations class.
6
7"""
8
9import os, stat, time
10import sys
11import logging
12
13from cows_wps.process_handler.fileset import FileSet, FLAG
14import cows_wps.process_handler.process_support as process_support
15from cows_wps.process_handler.context.process_status import STATUS
16
17import process_modules.lib_get_weather_stations.utils as gws_utils
18
19
20log = logging.getLogger(__name__)
21log.setLevel(logging.DEBUG)
22
23
24class GetWeatherStations(object):
25
26    # Define arguments that we need to set from inputs
27    args_to_set = ["Counties", "BBox", "DataTypes", "StartDateTime", "EndDateTime"]
28
29    # Define defaults for arguments that might not be set
30    input_arg_defaults = {"Counties": [],
31                          "BBox": None,
32                          "DataTypes": [],
33                         }
34
35    # Define a dictionary for arguments that need to be processed
36    # before they are set (with values as the function doing the processing).
37    arg_processers = {"StartDateTime": gws_utils.revertDateTimeToLongString, 
38                      "EndDateTime": gws_utils.revertDateTimeToLongString}
39
40
41    def __call__(self, context):
42        """
43        This is where the WPS controller calls the process.
44        Since this process is async we actually pass both the
45        __call__ and dryRun methods through the same _executeProc
46        method which does the same for each except that for
47        dry_run = True it will not produce any outputs.
48        """
49
50        return self._executeProc(context, dry_run = False)
51
52
53    def dryRun(self, context):
54        """
55        This is where the WPS controller calls the process for
56        an estimate of the volume and duration of the outputs.
57
58        Since this process is async we actually pass both the
59        __call__ and dryRun methods through the same _executeProc
60        method which does the same for each except that for
61        dry_run = True it will not produce any outputs.
62        """
63
64        return self._executeProc(context, dry_run = True)
65
66   
67    def _executeProc(self, context, dry_run):
68        """
69        This is called to step through the various parts of the process
70        executing the actual process if ``dry_run`` is False and just
71        returning information on the volume and duration of the outputs
72        if ``dry_run`` is True.
73        """
74        # Always record start time for duration in outputs
75        startTime = time.time()
76
77        # Always need a FileSet, even if empty
78        self.fileSet = context.outputs['FileSet'] = FileSet()
79
80        # parse the inputs and set dictionary: self.args
81        self._parseInputs(context.inputs)
82
83        # Check inputs are compatible
84        self._validateInputs()
85
86        if not dry_run:
87            # Now set status to started
88            context.setStatus(STATUS.STARTED, 'Job is now running', 0)
89
90        # Add output file
91        stationsFile = 'weather_stations.txt'
92        stationsFilePath = os.path.join(context.processDir, "outputs", stationsFile)
93
94        if not dry_run:
95            # Call code to get Weather Stations
96            a = self.args
97            stationList = gws_utils.getStationList(a["Counties"], a["BBox"], a["DataTypes"], 
98                           a["StartDateTime"], a["EndDateTime"], stationsFilePath)
99
100        # Add the stations list to the XML output section: ProcessSpecificContent
101        context.outputs['ProcessSpecificContent'] = {"WeatherStations": " ".join(stationList)} 
102
103        # In this case we want to inform the output XML that you can send the outputs to a separate process
104        # This string can be picked up the an intelligent client in order to construct a new WPS request
105        # with this job as its input
106        context.outputs['job_details']['job_capabilities'] = "send_to_extract_weather_data"
107
108        if not dry_run:
109            # We can log information at any time to the main log file
110            context.log.info('Written output file: %s' % stationsFilePath)
111        else:
112            context.log.debug("Running dry run.")
113
114        # Add the stations file to the outputs
115        if not dry_run:
116            self._addFileToOutputs(stationsFilePath, 'Weather Stations File')
117        else:
118            estimated_size = 30000
119            self._addFileToOutputs(stationsFilePath, 'Weather Stations File', size = estimated_size)
120
121        if not dry_run:
122            # Finish up by calling function to set status to complete and zip up files etc
123            # In this case we set keep = True so that weather station file is accessible to downstream process
124            # without unzipping. This is fine as files are small.
125            process_support.finishProcess(context, self.fileSet, startTime, keep = True)
126        else:
127            estimated_duration = 60 # seconds
128            process_support.finishDryRun(context, [], self.fileSet,
129                            estimated_duration, acceptedMessage = 'Dry run complete')           
130
131
132    def _parseInputs(self, inputs):
133        """
134        Parse the inputs into an instance dictionary and set defaults where required.
135        """
136        self.args = {}
137       
138        for key in self.args_to_set:
139            if key in self.input_arg_defaults.keys():
140                deft = self.input_arg_defaults[key]
141                value = inputs.get(key, deft)
142
143            elif key not in inputs.keys():
144                raise KeyError("Must provide argument '%s' when calling '%s'." % (key, self.__class__.__name__))
145         
146            else:
147                value = inputs[key]
148
149            # Process arg if required
150            if key in self.arg_processers.keys():
151                value = apply(self.arg_processers[key], [value])
152
153            self.args[key] = value
154
155        return
156
157
158    def _validateInputs(self):
159        """
160        Runs specific checking of arguments and their compatibility.
161        """
162        if self.args["Counties"] == [] and self.args["BBox"] == None:
163            raise Exception("Invalid arguments provided. Must provide either a geographical bounding box or a list of counties.")
164
165
166    def _addFileToOutputs(self, fpath, info = 'An output file.', file_flag = FLAG.DATA, size = None):
167        """
168        Adds the file to the fileSet outputs. If ``size`` is set (as in
169        a dry-run) then use it, otherwise get real size of file.
170        """
171        if size == None:
172            size = os.stat(fpath)[stat.ST_SIZE]
173
174        fname = os.path.split(fpath)[-1]
175        self.fileSet.contents.append(FileSet(file_flag, fname, size, info))
176
177
178    def OLDdryRun(self, context):
179
180        self.fileSet = context.outputs['FileSet'] = FileSet()
181        self.fileSet.contents.append(FileSet(FLAG.DATA, "testfile.txt", 30000, "Some file"))
182       
183        process_support.finishDryRun(context, [("meta", "data")], self.fileSet,
184                                         60, acceptedMessage='Dry run complete')
185 
186
187    def OLD__call__(self, context):
188        """
189        This is where the WPS controller calls the process.
190        """
191        # Always record start time for duration in outputs
192        startTime = time.time()
193        jobId = os.path.basename(context.processDir)
194
195        # Parse the inputs
196        Counties = context.inputs.get('Counties', [])
197        if Counties == []:
198            BBox = context.inputs.get("BBox", None)
199            if BBox == None:
200                raise Exception("Invalid arguments provided. Must provide either a geographical bounding box or a list of counties.")
201
202        DataTypes = context.inputs.get("DataTypes", [])
203        StartDateTime = context.inputs['StartDateTime']
204        StartDateTime = gws_utils.revertDateTimeToLongString(StartDateTime)
205        log.warn("DATE TIME: %s, type = %s" % (StartDateTime, type(StartDateTime)))
206        EndDateTime = context.inputs['EndDateTime']
207        EndDateTime = gws_utils.revertDateTimeToLongString(EndDateTime)
208       
209        context.setStatus(STATUS.STARTED, 'Job is now running', 0)
210
211        # Always need a FileSet, even if empty
212        self.fileSet = context.outputs['FileSet'] = FileSet()
213
214        # Make path to output file
215        WFile = 'weather_stations.txt'
216        MyFilePath = context.processDir + '/outputs/' + WFile
217
218        # Call code to get Weather Stations
219        station_list = gws_utils.getStationList(Counties, BBox, DataTypes, StartDateTime,
220                           EndDateTime, MyFilePath)
221
222        context.outputs['ProcessSpecificContent'] = {
223                                    "WeatherStations": " ".join(station_list)} 
224        context.outputs['job_details']['job_capabilities'] = "send_to_extract_weather_data"
225
226        context.log.info('Written output file: %s' % WFile)
227        filesize = os.stat(MyFilePath)[stat.ST_SIZE]
228
229        self.fileSet.contents.append(FileSet(FLAG.DATA, WFile, filesize, 'Weather Stations File'))
230
231        # Set keep = True so that weather station files are accessible to downstream process
232        # without unzipping. This is fine as files are small.
233        process_support.finishProcess(context, self.fileSet, startTime, keep = True)
Note: See TracBrowser for help on using the repository browser.