source: cows_wps/trunk/process_modules/extract_uk_station_data.py @ 7068

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/extract_uk_station_data.py@7068
Revision 7068, 5.2 KB checked in by astephen, 9 years ago (diff)

tidying

Line 
1"""
2extract_uk_station_data.py
3===================
4
5Process extract_uk_station_data that holds the ExtractUKStationData class.
6
7"""
8
9import os, stat, time, sys
10
11from cows_wps.process_handler.fileset import FileSet, FLAG
12import cows_wps.process_handler.process_support as process_support
13from cows_wps.process_handler.context.process_status import STATUS
14
15import process_modules.lib_get_weather_stations.utils as gws_utils
16import process_modules.lib_extract_uk_station_data.utils as exuk_utils
17
18# Import local modules
19sys.path.append("/home/badc/software/datasets/ukmo-midas/scripts/extract")
20# MIDAS Station search code
21#import getStations
22import midasSubsetter
23
24
25class ExtractUKStationData(object):
26
27    def __call__(self, context):
28       
29        startTime = time.time()
30        jobId = os.path.basename(context.processDir)
31
32        # Parse the inputs
33        ci = context.inputs
34        StationIDs = ci.get("StationIDs", []) 
35        Delimiter = ci["Delimiter"]
36        Counties = ci.get('Counties', [])
37        BBox = ci.get("BBox", None)
38        DataTypes = ci.get("DataTypes", [])
39
40        # Get required start/end times
41        StartDateTime = ci['StartDateTime']
42        StartDateTime = gws_utils.revertDateTimeToLongString(StartDateTime)
43        EndDateTime = ci['EndDateTime']
44        EndDateTime = gws_utils.revertDateTimeToLongString(EndDateTime)
45        TimeChunk = ci['OutputTimeChunk']
46
47        ObsTableName = ci['ObsTableName'] 
48        StationsFile = ci['StationsFile']
49
50        context.setStatus(STATUS.STARTED, 'Job is now running', 0)
51
52        # Always need a FileSet, even if empty
53        self.fileSet = context.outputs['FileSet'] = FileSet()
54
55        stations_file = "uk_stations.txt"
56        sf_path = context.processDir + '/outputs/' + stations_file
57
58        # Download stations file and read it if possible
59        if StationsFile:
60            stationIDs = exuk_utils.extractStationsFromFile(StationsFile)
61
62        # Get station IDs if not provided
63        if StationIDs == []:
64            # Check we have either counties or bbox to search domain on
65            if Counties == [] and BBox == None:
66                raise Exception("Invalid arguments provided. Must provide either a list of station IDs, a list of counties or a valid geographical bounding box.")
67
68            # Call code to get Weather Stations
69            StationIDs = gws_utils.getStationList(Counties, BBox, DataTypes, StartDateTime,
70                           EndDateTime, sf_path)
71
72            # Call code to get Weather Stations and write file
73            StationIDs.sort()
74
75        # else write the file one per station id per line
76        else:
77            StationIDs.sort()
78            fout = open(sf_path, "w")
79            fout.write("\r\n".join([str(st_id) for st_id in StationIDs])) 
80            fout.close()
81
82        # Now check limit on number of station IDs
83        nStations = len(StationsIDs)
84        if nStations > 100 and TimeChunk == "decadal":
85            raise Exception("The number of selected station IDs has been calculated to be greater than 100. Please select a chunk size other than 'decadal' for such as large volume of data.")
86
87        nYears = int(EndDateTime[:4]) - int(StartDateTime[:4])
88
89        if nYears > 1 and nStations > 100:
90            raise Exception("The number of selected station IDs has been calculated to be greater than 100. Please select a time window no longer than 1 year.")
91
92        self._addFileToFileSet(sf_path, "Station IDs file used for extraction of station data.", FLAG.DATA)
93
94        context.outputs['ProcessSpecificContent'] = {"StationIDs": " ".join(StationIDs)}
95
96        # Pretend that took 10% of time
97        context.setStatus(STATUS.STARTED, 'Extracted station ID list.', 10)
98
99        # Now extract the data itself
100        data_file_prefix = "station_data"
101        if Delimiter == "comma":
102            ext = "csv"
103        else:
104            ext = "txt"
105
106        df_path_base = context.processDir + '/outputs/' + data_file_prefix
107
108        # Need temp dir for big file extractions
109        process_tmp_dir = context.processDir + '/tmp'
110
111        output_file_paths = exuk_utils.extractStationDataByTimeChunk([ObsTableName], StartDateTime, 
112                       EndDateTime, StationIDs, TimeChunk, df_path_base, Delimiter,
113                       ext, process_tmp_dir)
114       
115        for df_path in output_file_paths: 
116            self._addFileToFileSet(df_path, "Station data file.", FLAG.DATA)
117
118#        context.setStatus(STATUS.COMPLETED, 'The End', 100)
119#        completionTime = time.time()
120#        process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime)
121        process_support.finishProcess(context, self.fileSet, startTime)
122
123
124    def _addFileToFileSet(self, path, info, type=FLAG.DATA):
125        "Adds file to output file set."
126        f_size = os.path.getsize(path)
127        output_basename = os.path.basename(path)
128        self.fileSet.contents.append(FileSet(type, output_basename, f_size, info))
129
130
131    def dryRun(self, context):
132
133        self.fileSet = context.outputs['FileSet'] = FileSet()
134        self.fileSet.contents.append(FileSet(FLAG.DATA, "testfile.txt", 30000, "Some file"))
135
136        process_support.finishDryRun(context, [("meta", "data")], self.fileSet,
137                                         60, acceptedMessage='Dry run complete')
138
Note: See TracBrowser for help on using the repository browser.