source: cows_wps/trunk/process_modules/extract_uk_station_data.py @ 7614

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/extract_uk_station_data.py@7614
Revision 7614, 6.7 KB checked in by astephen, 10 years ago (diff)

Test for zero weather stations being found.

Line 
1"""
2extract_uk_station_data.py
3===================
4
5Process extract_uk_station_data that holds the ExtractUKStationData class.
6
7"""
8
9import os, stat, time, sys
10import logging
11
12from cows_wps.process_handler.fileset import FileSet, FLAG
13import cows_wps.process_handler.process_support as process_support
14from cows_wps.process_handler.context.process_status import STATUS
15
16import process_modules.lib_get_weather_stations.utils as gws_utils
17import process_modules.lib_extract_uk_station_data.utils as exuk_utils
18import process_modules.process_base
19
20
21
22log = logging.getLogger(__name__)
23log.setLevel(logging.DEBUG)
24
25
26class ExtractUKStationData(process_modules.process_base.ProcessBase):
27
28    # Define arguments that we need to set from inputs
29    args_to_set = ["StationIDs", "Counties", "BBox", "DataTypes", "StartDateTime", "EndDateTime",
30                   "Delimiter", "OutputTimeChunk", "ObsTableName", "StationsFile"] 
31
32    # Define defaults for arguments that might not be set
33    input_arg_defaults = {"StationIDs": [], 
34                          "Counties": [],
35                          "BBox": None,
36                          "DataTypes": [],
37                          "StationsFile": None,
38                         }
39
40    # Define a dictionary for arguments that need to be processed
41    # before they are set (with values as the function doing the processing).
42    arg_processers = {"StartDateTime": gws_utils.revertDateTimeToLongString,
43                      "EndDateTime": gws_utils.revertDateTimeToLongString}
44
45
46
47    def _executeProc(self, context, dry_run):
48        """
49        This is called to step through the various parts of the process
50        executing the actual process if ``dry_run`` is False and just
51        returning information on the volume and duration of the outputs
52        if ``dry_run`` is True.
53        """
54        # Call standard _setup
55        self._setup(context)
56        a = self.args
57
58        if not dry_run:
59            # Now set status to started
60            context.setStatus(STATUS.STARTED, 'Job is now running', 0)
61            # Need to save status for async processes
62            context.saveStatus()
63
64        # Resolve the list of stations
65        stationList = self._resolveStationList(context, dry_run)
66
67        # Now check limit on number of station IDs
68        nStations = len(stationList)
69        STATION_LIMIT = 100
70        nYears = int(a["EndDateTime"][:4]) - int(a["StartDateTime"][:4])
71
72        if nStations > STATION_LIMIT and a["OutputTimeChunk"] == "decadal":
73            a["OutputTimeChunk"] = "year"
74            raise Exception("The number of selected station IDs has been calculated to be greater than %d. Please select a chunk size other than 'decadal' for such as large volume of data." % STATION_LIMIT)
75
76        if nYears > 1 and nStations > STATION_LIMIT:
77            raise Exception("The number of selected station IDs has been calculated to be greater than %d. Please select a time window no longer than 1 year." % STATION_LIMIT)
78
79        if nStations == 0:
80            raise Exception("No weather stations have been found for this request. Please modify your request and try again.")
81
82        # Define data file base
83        prefix = "station_data"
84        if a["Delimiter"] == "comma":
85            ext = "csv"
86        else:
87            ext = "txt"
88
89        dataFileBase = os.path.join(context.processDir, 'outputs', prefix)
90
91        if not dry_run:
92            # Estimate we are 5% of the way through
93            context.setStatus(STATUS.STARTED, 'Extracted station ID list.', 5)
94            context.saveStatus()
95
96            # Need temp dir for big file extractions
97            procTmpDir = os.path.join(context.processDir, 'tmp')
98
99            outputPaths = exuk_utils.extractStationDataByTimeChunk([a["ObsTableName"]], a["StartDateTime"], 
100                       a["EndDateTime"], stationList, a["OutputTimeChunk"], dataFileBase, a["Delimiter"],
101                       ext, procTmpDir, context)
102       
103            for outputPath in outputPaths: 
104                self._addFileToOutputs(outputPath, "Station data file.")
105
106            # Finish up by calling function to set status to complete and zip up files etc
107            # In this case we set keep = True so that weather station file is accessible to downstream process
108            # without unzipping. This is fine as files are small.
109            process_support.finishProcess(context, self.fileSet, self.startTime, keep = False)
110
111        else:
112            outputPaths = ["%s-%s.%s" % (dataFileBase, i, ext) for i in range(nYears + 1)] 
113            for outputPath in outputPaths:
114                size = nStations * 200 * 365
115                self._addFileToOutputs(outputPath, "Station data file.", size = size)
116
117            estimated_duration = (nYears + 1) * 60 # seconds
118            process_support.finishDryRun(context, [], self.fileSet,
119                            estimated_duration, acceptedMessage = 'Dry run complete')
120
121
122    def _resolveStationList(self, context, dry_run):
123        """
124        Works out whether we need to generate a station list or use those
125        sent as inputs.
126        """
127        a = self.args
128        stationList = None
129
130        # Use input station list if provided
131        if a["StationIDs"] != []:
132            stationList = a["StationIDs"]
133
134        # Use stationsFile is we have one
135        elif a["StationsFile"]:
136            stationList = exuk_utils.extractStationsFromFile(a["StationsFile"]) 
137
138        # Add output file
139        stationsFile = 'weather_stations.txt'
140        stationsFilePath = os.path.join(context.processDir, "outputs", stationsFile)
141
142  #      if not dry_run:
143        if stationList == None:
144             # Call code to get Weather Stations
145            stationList = gws_utils.getStationList(a["Counties"], a["BBox"], a["DataTypes"], a["StartDateTime"],
146                           a["EndDateTime"], stationsFilePath)
147
148        # Write the file one per station id per line
149        stationList.sort()
150
151        fout = open(stationsFilePath, "w")
152        fout.write("\r\n".join([str(station) for station in stationList]))
153        fout.close()
154
155        # Add the stations list to the XML output section: ProcessSpecificContent
156        context.outputs['ProcessSpecificContent'] = {"WeatherStations": " ".join(stationList)}
157
158        # Add file to outputs
159        self._addFileToOutputs(stationsFilePath, 'Weather Stations File')
160
161        return stationList
162
163
164    def _validateInputs(self):
165        """
166        Runs specific checking of arguments and their compatibility.
167        """
168        a = self.args
169        if a["Counties"] == [] and a["BBox"] == None and a["StationIDs"] == [] and a["StationsFile"] == None:
170            raise Exception("Invalid arguments provided. Must provide one of (i) a geographical bounding box, (ii) a list of counties, (iii) a set of station IDs or (iv) a file containing a set of selected station IDs.")
171 
Note: See TracBrowser for help on using the repository browser.