source: cows_wps/trunk/process_modules/extract_uk_station_data.py @ 7082

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/extract_uk_station_data.py@7082
Revision 7082, 7.2 KB checked in by astephen, 10 years ago (diff)

getting status monitoring working properly.

Line 
1"""
2extract_uk_station_data.py
3===================
4
5Process extract_uk_station_data that holds the ExtractUKStationData class.
6
7"""
8
9import os, stat, time, sys
10import logging
11
12from cows_wps.process_handler.fileset import FileSet, FLAG
13import cows_wps.process_handler.process_support as process_support
14from cows_wps.process_handler.context.process_status import STATUS
15
16import process_modules.lib_get_weather_stations.utils as gws_utils
17import process_modules.lib_extract_uk_station_data.utils as exuk_utils
18import process_modules.process_base
19
20
21
22log = logging.getLogger(__name__)
23log.setLevel(logging.DEBUG)
24
25
26class ExtractUKStationData(process_modules.process_base.ProcessBase):
27
28    # Define arguments that we need to set from inputs
29    args_to_set = ["StationIDs", "Counties", "BBox", "DataTypes", "StartDateTime", "EndDateTime",
30                   "Delimiter", "OutputTimeChunk", "ObsTableName", "StationsFile"] 
31
32    # Define defaults for arguments that might not be set
33    input_arg_defaults = {"StationIDs": [], 
34                          "Counties": [],
35                          "BBox": None,
36                          "DataTypes": [],
37                          "StationsFile": None,
38                         }
39
40    # Define a dictionary for arguments that need to be processed
41    # before they are set (with values as the function doing the processing).
42    arg_processers = {"StartDateTime": gws_utils.revertDateTimeToLongString,
43                      "EndDateTime": gws_utils.revertDateTimeToLongString}
44
45
46
47    def _executeProc(self, context, dry_run):
48        """
49        This is called to step through the various parts of the process
50        executing the actual process if ``dry_run`` is False and just
51        returning information on the volume and duration of the outputs
52        if ``dry_run`` is True.
53        """
54        # Call standard _setup
55        self._setup(context)
56        a = self.args
57
58        if not dry_run:
59            # Now set status to started
60            context.setStatus(STATUS.STARTED, 'Job is now running', 0)
61
62        # Resolve the list of stations
63        stationList = self._resolveStationList(context, dry_run)
64
65        # Now check limit on number of station IDs
66        nStations = len(stationList)
67        STATION_LIMIT = 100
68        nYears = int(a["EndDateTime"][:4]) - int(a["StartDateTime"][:4])
69
70        if nStations > STATION_LIMIT and a["OutputTimeChunk"] == "decadal":
71            a["OutputTimeChunk"] = "year"
72            raise Exception("The number of selected station IDs has been calculated to be greater than %d. Please select a chunk size other than 'decadal' for such as large volume of data." % STATION_LIMIT)
73
74        if nYears > 1 and nStations > STATION_LIMIT:
75            raise Exception("The number of selected station IDs has been calculated to be greater than %d. Please select a time window no longer than 1 year." % STATION_LIMIT)
76
77        # Define data file base
78        prefix = "station_data"
79        if a["Delimiter"] == "comma":
80            ext = "csv"
81        else:
82            ext = "txt"
83
84        dataFileBase = os.path.join(context.processDir, 'outputs', prefix)
85
86        if not dry_run:
87            # Estimate we are 5% of the way through
88            context.setStatus(STATUS.STARTED, 'Extracted station ID list.', 5)
89
90            # Need temp dir for big file extractions
91            procTmpDir = os.path.join(context.processDir, 'tmp')
92
93            outputPaths = exuk_utils.extractStationDataByTimeChunk([a["ObsTableName"]], a["StartDateTime"], 
94                       a["EndDateTime"], stationList, a["OutputTimeChunk"], dataFileBase, a["Delimiter"],
95                       ext, procTmpDir, context)
96       
97            for outputPath in outputPaths: 
98                self._addFileToOutputs(outputPath, "Station data file.")
99
100            # Finish up by calling function to set status to complete and zip up files etc
101            # In this case we set keep = True so that weather station file is accessible to downstream process
102            # without unzipping. This is fine as files are small.
103            process_support.finishProcess(context, self.fileSet, self.startTime, keep = True)
104
105        else:
106            outputPaths = ["%s-%s.%s" % (dataFileBase, i, ext) for i in range(nYears + 1)] 
107            for outputPath in outputPaths:
108                size = nStations * 200 * 365
109                self._addFileToOutputs(outputPath, "Station data file.", size = size)
110
111            estimated_duration = (nYears + 1) * 60 # seconds
112            process_support.finishDryRun(context, [], self.fileSet,
113                            estimated_duration, acceptedMessage = 'Dry run complete')
114
115
116    def _resolveStationList(self, context, dry_run):
117        """
118        Works out whether we need to generate a station list or use those
119        sent as inputs.
120        """
121        a = self.args
122        stationList = None
123
124        # Use input station list if provided
125        if a["StationIDs"] != []:
126            stationList = a["StationIDs"]
127
128        # Use stationsFile is we have one
129        elif a["StationsFile"]:
130            stationList = exuk_utils.extractStationsFromFile(a["StationsFile"]) 
131
132        # Add output file
133        stationsFile = 'weather_stations.txt'
134        stationsFilePath = os.path.join(context.processDir, "outputs", stationsFile)
135
136        if not dry_run:
137
138            if stationList == None:
139                # Call code to get Weather Stations
140                stationList = gws_utils.getStationList(a["Counties"], a["BBox"], a["DataTypes"], a["StartDateTime"],
141                           a["EndDateTime"], stationsFilePath)
142
143            # Write the file one per station id per line
144            stationList.sort()
145
146            fout = open(stationsFilePath, "w")
147            fout.write("\r\n".join([str(station) for station in stationList]))
148            fout.close()
149
150            # Add the stations list to the XML output section: ProcessSpecificContent
151            context.outputs['ProcessSpecificContent'] = {"WeatherStations": " ".join(stationList)}
152
153            # Add file to outputs
154            self._addFileToOutputs(stationsFilePath, 'Weather Stations File')
155
156        else:
157
158            # Estimate size of outputs by estimating the number of stations
159            if len(a["Counties"]) > 0:
160                nEstimatedStations = len(a["Counties"]) * 15
161            elif a["BBox"]:
162                (w, s, e, n) = a["BBox"]
163                lonExtent = abs(e - w)
164                latExtent = n - s
165                nEstimatedStations = int(lonExtent * latExtent * 50)
166            else:
167                nEstimatedStations = len(stationList)
168
169            estimatedVolume = nEstimatedStations * 5
170            self._addFileToOutputs(stationsFilePath, 'Weather Stations File', size = estimatedVolume)
171
172            # Make up a station list
173            stationList = [-1] * nEstimatedStations
174
175        return stationList
176
177
178    def _validateInputs(self):
179        """
180        Runs specific checking of arguments and their compatibility.
181        """
182        a = self.args
183        if a["Counties"] == [] and a["BBox"] == None and a["StationIDs"] == [] and a["StationsFile"] == None:
184            raise Exception("Invalid arguments provided. Must provide one of (i) a geographical bounding box, (ii) a list of counties, (iii) a set of station IDs or (iv) a file containing a set of selected station IDs.")
185 
Note: See TracBrowser for help on using the repository browser.