source: cows_wps/trunk/process_modules/extract_uk_station_data.py @ 7070

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/extract_uk_station_data.py@7070
Revision 7070, 6.9 KB checked in by astephen, 10 years ago (diff)

fix

Line 
1"""
2extract_uk_station_data.py
3===================
4
5Process extract_uk_station_data that holds the ExtractUKStationData class.
6
7"""
8
9import os, stat, time, sys
10import logging
11
12from cows_wps.process_handler.fileset import FileSet, FLAG
13import cows_wps.process_handler.process_support as process_support
14from cows_wps.process_handler.context.process_status import STATUS
15
16import process_modules.lib_get_weather_stations.utils as gws_utils
17import process_modules.lib_extract_uk_station_data.utils as exuk_utils
18import process_modules.process_base
19
20
21
22log = logging.getLogger(__name__)
23log.setLevel(logging.DEBUG)
24
25
26class ExtractUKStationData(process_modules.process_base.ProcessBase):
27
28    # Define arguments that we need to set from inputs
29    args_to_set = ["StationIDs", "Counties", "BBox", "DataTypes", "StartDateTime", "EndDateTime",
30                   "Delimiter", "OutputTimeChunk", "ObsTableName", "StationsFile"] 
31
32    # Define defaults for arguments that might not be set
33    input_arg_defaults = {"StationIDs": [], 
34                          "Counties": [],
35                          "BBox": None,
36                          "DataTypes": [],
37                          "StationsFile": None,
38                         }
39
40    # Define a dictionary for arguments that need to be processed
41    # before they are set (with values as the function doing the processing).
42    arg_processers = {"StartDateTime": gws_utils.revertDateTimeToLongString,
43                      "EndDateTime": gws_utils.revertDateTimeToLongString}
44
45
46    def _executeProc(self, context, dry_run):
47        """
48        This is called to step through the various parts of the process
49        executing the actual process if ``dry_run`` is False and just
50        returning information on the volume and duration of the outputs
51        if ``dry_run`` is True.
52        """
53        # Call standard _setup
54        self._setup(context)
55        a = self.args
56
57        if not dry_run:
58            # Now set status to started
59            context.setStatus(STATUS.STARTED, 'Job is now running', 0)
60
61        # Resolve the list of stations
62        stationList = self._resolveStationList(context, dry_run)
63
64        # Now check limit on number of station IDs
65        nStations = len(stationList)
66
67        if nStations > 100 and a["OutputTimeChunk"] == "decadal":
68            raise Exception("The number of selected station IDs has been calculated to be greater than 100. Please select a chunk size other than 'decadal' for such as large volume of data.")
69
70        nYears = int(a["EndDateTime"][:4]) - int(a["StartDateTime"][:4])
71
72        if nYears > 1 and nStations > 100:
73            raise Exception("The number of selected station IDs has been calculated to be greater than 100. Please select a time window no longer than 1 year.")
74
75        # Define data file base
76        prefix = "station_data"
77        if a["Delimiter"] == "comma":
78            ext = "csv"
79        else:
80            ext = "txt"
81
82        dataFileBase = os.path.join(context.processDir, 'outputs', prefix)
83
84        if not dry_run:
85            # Estimate we are 5% of the way through
86            context.setStatus(STATUS.STARTED, 'Extracted station ID list.', 5)
87
88            # Need temp dir for big file extractions
89            procTmpDir = os.path.join(context.processDir, 'tmp')
90
91            outputPaths = exuk_utils.extractStationDataByTimeChunk([a["ObsTableName"]], a["StartDateTime"], 
92                       a["EndDateTime"], stationList, a["OutputTimeChunk"], dataFileBase, a["Delimiter"],
93                       ext, procTmpDir)
94       
95            for outputPath in outputPaths: 
96                self._addFileToOutputs(outputPath, "Station data file.")
97
98            # Finish up by calling function to set status to complete and zip up files etc
99            # In this case we set keep = True so that weather station file is accessible to downstream process
100            # without unzipping. This is fine as files are small.
101            process_support.finishProcess(context, self.fileSet, self.startTime, keep = True)
102
103        else:
104            outputPaths = ["%s-%s.%s" % (dataFileBase, i, ext) for i in range(3)] 
105            for outputPath in outputPaths:
106                self._addFileToOutputs(outputPath, "Station data file.", size = 100000)
107
108            estimated_duration = 300 # seconds
109            process_support.finishDryRun(context, [], self.fileSet,
110                            estimated_duration, acceptedMessage = 'Dry run complete')
111
112
113    def _resolveStationList(self, context, dry_run):
114        """
115        Works out whether we need to generate a station list or use those
116        sent as inputs.
117        """
118        a = self.args
119        stationList = None
120
121        # Use input station list if provided
122        if a["StationIDs"] != []:
123            stationList = a["StationIDs"]
124
125        # Use stationsFile is we have one
126        elif a["StationsFile"]:
127            stationList = exuk_utils.extractStationsFromFile(a["StationsFile"]) 
128
129        # Add output file
130        stationsFile = 'weather_stations.txt'
131        stationsFilePath = os.path.join(context.processDir, "outputs", stationsFile)
132
133        if not dry_run:
134
135            if stationList == None:
136                # Call code to get Weather Stations
137                stationList = gws_utils.getStationList(a["Counties"], a["BBox"], a["DataTypes"], a["StartDateTime"],
138                           a["EndDateTime"], stationsFilePath)
139
140            # Write the file one per station id per line
141            stationList.sort()
142
143            fout = open(stationsFilePath, "w")
144            fout.write("\r\n".join([str(station) for station in stationList]))
145            fout.close()
146
147            # Add the stations list to the XML output section: ProcessSpecificContent
148            context.outputs['ProcessSpecificContent'] = {"WeatherStations": " ".join(stationList)}
149
150            # Add file to outputs
151            self._addFileToOutputs(stationsFilePath, 'Weather Stations File')
152
153        else:
154
155            # Estimate size of outputs by estimating the number of stations
156            if len(a["Counties"]) > 0:
157                nEstimatedStations = len(a["Counties"]) * 15
158            else:
159                (w, s, e, n) = a["BBox"]
160                lonExtent = abs(e - w)
161                latExtent = n - s
162                nEstimatedStations = int(lonExtent * latExtent * 50)
163
164            estimatedVolume = nEstimatedStations * 5
165            self._addFileToOutputs(stationsFilePath, 'Weather Stations File', size = estimatedVolume)
166
167            # Make up a station list
168            stationList = [-1] * 100
169
170        return stationList
171
172
173    def _validateInputs(self):
174        """
175        Runs specific checking of arguments and their compatibility.
176        """
177        a = self.args
178        if a["Counties"] == [] and a["BBox"] == None and a["StationIDs"] == [] and a["StationsFile"] == None:
179            raise Exception("Invalid arguments provided. Must provide one of (i) a geographical bounding box, (ii) a list of counties, (iii) a set of station IDs or (iv) a file containing a set of selected station IDs.")
180 
Note: See TracBrowser for help on using the repository browser.