source: cows_wps/trunk/process_modules/extract_uk_station_data.py @ 7613

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/extract_uk_station_data.py@7613
Revision 7613, 6.6 KB checked in by astephen, 10 years ago (diff)

removed keeping of non-zip outputs.

Line 
1"""
2extract_uk_station_data.py
3===================
4
5Process extract_uk_station_data that holds the ExtractUKStationData class.
6
7"""
8
9import os, stat, time, sys
10import logging
11
12from cows_wps.process_handler.fileset import FileSet, FLAG
13import cows_wps.process_handler.process_support as process_support
14from cows_wps.process_handler.context.process_status import STATUS
15
16import process_modules.lib_get_weather_stations.utils as gws_utils
17import process_modules.lib_extract_uk_station_data.utils as exuk_utils
18import process_modules.process_base
19
20
21
22log = logging.getLogger(__name__)
23log.setLevel(logging.DEBUG)
24
25
26class ExtractUKStationData(process_modules.process_base.ProcessBase):
27
28    # Define arguments that we need to set from inputs
29    args_to_set = ["StationIDs", "Counties", "BBox", "DataTypes", "StartDateTime", "EndDateTime",
30                   "Delimiter", "OutputTimeChunk", "ObsTableName", "StationsFile"] 
31
32    # Define defaults for arguments that might not be set
33    input_arg_defaults = {"StationIDs": [], 
34                          "Counties": [],
35                          "BBox": None,
36                          "DataTypes": [],
37                          "StationsFile": None,
38                         }
39
40    # Define a dictionary for arguments that need to be processed
41    # before they are set (with values as the function doing the processing).
42    arg_processers = {"StartDateTime": gws_utils.revertDateTimeToLongString,
43                      "EndDateTime": gws_utils.revertDateTimeToLongString}
44
45
46
47    def _executeProc(self, context, dry_run):
48        """
49        This is called to step through the various parts of the process
50        executing the actual process if ``dry_run`` is False and just
51        returning information on the volume and duration of the outputs
52        if ``dry_run`` is True.
53        """
54        # Call standard _setup
55        self._setup(context)
56        a = self.args
57
58        if not dry_run:
59            # Now set status to started
60            context.setStatus(STATUS.STARTED, 'Job is now running', 0)
61            # Need to save status for async processes
62            context.saveStatus()
63
64        # Resolve the list of stations
65        stationList = self._resolveStationList(context, dry_run)
66
67        # Now check limit on number of station IDs
68        nStations = len(stationList)
69        STATION_LIMIT = 100
70        nYears = int(a["EndDateTime"][:4]) - int(a["StartDateTime"][:4])
71
72        if nStations > STATION_LIMIT and a["OutputTimeChunk"] == "decadal":
73            a["OutputTimeChunk"] = "year"
74            raise Exception("The number of selected station IDs has been calculated to be greater than %d. Please select a chunk size other than 'decadal' for such as large volume of data." % STATION_LIMIT)
75
76        if nYears > 1 and nStations > STATION_LIMIT:
77            raise Exception("The number of selected station IDs has been calculated to be greater than %d. Please select a time window no longer than 1 year." % STATION_LIMIT)
78
79        # Define data file base
80        prefix = "station_data"
81        if a["Delimiter"] == "comma":
82            ext = "csv"
83        else:
84            ext = "txt"
85
86        dataFileBase = os.path.join(context.processDir, 'outputs', prefix)
87
88        if not dry_run:
89            # Estimate we are 5% of the way through
90            context.setStatus(STATUS.STARTED, 'Extracted station ID list.', 5)
91            context.saveStatus()
92
93            # Need temp dir for big file extractions
94            procTmpDir = os.path.join(context.processDir, 'tmp')
95
96            outputPaths = exuk_utils.extractStationDataByTimeChunk([a["ObsTableName"]], a["StartDateTime"], 
97                       a["EndDateTime"], stationList, a["OutputTimeChunk"], dataFileBase, a["Delimiter"],
98                       ext, procTmpDir, context)
99       
100            for outputPath in outputPaths: 
101                self._addFileToOutputs(outputPath, "Station data file.")
102
103            # Finish up by calling function to set status to complete and zip up files etc
104            # In this case we set keep = True so that weather station file is accessible to downstream process
105            # without unzipping. This is fine as files are small.
106            process_support.finishProcess(context, self.fileSet, self.startTime, keep = False)
107
108        else:
109            outputPaths = ["%s-%s.%s" % (dataFileBase, i, ext) for i in range(nYears + 1)] 
110            for outputPath in outputPaths:
111                size = nStations * 200 * 365
112                self._addFileToOutputs(outputPath, "Station data file.", size = size)
113
114            estimated_duration = (nYears + 1) * 60 # seconds
115            process_support.finishDryRun(context, [], self.fileSet,
116                            estimated_duration, acceptedMessage = 'Dry run complete')
117
118
119    def _resolveStationList(self, context, dry_run):
120        """
121        Works out whether we need to generate a station list or use those
122        sent as inputs.
123        """
124        a = self.args
125        stationList = None
126
127        # Use input station list if provided
128        if a["StationIDs"] != []:
129            stationList = a["StationIDs"]
130
131        # Use stationsFile is we have one
132        elif a["StationsFile"]:
133            stationList = exuk_utils.extractStationsFromFile(a["StationsFile"]) 
134
135        # Add output file
136        stationsFile = 'weather_stations.txt'
137        stationsFilePath = os.path.join(context.processDir, "outputs", stationsFile)
138
139  #      if not dry_run:
140        if stationList == None:
141             # Call code to get Weather Stations
142            stationList = gws_utils.getStationList(a["Counties"], a["BBox"], a["DataTypes"], a["StartDateTime"],
143                           a["EndDateTime"], stationsFilePath)
144
145        # Write the file one per station id per line
146        stationList.sort()
147
148        fout = open(stationsFilePath, "w")
149        fout.write("\r\n".join([str(station) for station in stationList]))
150        fout.close()
151
152        # Add the stations list to the XML output section: ProcessSpecificContent
153        context.outputs['ProcessSpecificContent'] = {"WeatherStations": " ".join(stationList)}
154
155        # Add file to outputs
156        self._addFileToOutputs(stationsFilePath, 'Weather Stations File')
157
158        return stationList
159
160
161    def _validateInputs(self):
162        """
163        Runs specific checking of arguments and their compatibility.
164        """
165        a = self.args
166        if a["Counties"] == [] and a["BBox"] == None and a["StationIDs"] == [] and a["StationsFile"] == None:
167            raise Exception("Invalid arguments provided. Must provide one of (i) a geographical bounding box, (ii) a list of counties, (iii) a set of station IDs or (iv) a file containing a set of selected station IDs.")
168 
Note: See TracBrowser for help on using the repository browser.