source: cows_wps/trunk/process_modules/extract_uk_station_data.py @ 7016

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/extract_uk_station_data.py@7016
Revision 7016, 5.2 KB checked in by astephen, 9 years ago (diff)

fixes to use common finishProcess call

Line 
1"""
2extract_uk_station_data.py
3===================
4
5Process extract_uk_station_data that holds the ExtractUKStationData class.
6
7"""
8
9import os, stat, time, sys
10
11from cows_wps.process_handler.fileset import FileSet, FLAG
12import cows_wps.process_handler.process_support as process_support
13from cows_wps.process_handler.context.process_status import STATUS
14
15import process_modules.lib_get_weather_stations.utils as gws_utils
16import process_modules.lib_extract_uk_station_data.utils as exuk_utils
17
18# Import local modules
19sys.path.append("/home/badc/software/datasets/ukmo-midas/scripts/extract")
20# MIDAS Station search code
21#import getStations
22import midasSubsetter
23
24
25class ExtractUKStationData(object):
26
27    def __call__(self, context):
28       
29        startTime = time.time()
30        jobId = os.path.basename(context.processDir)
31
32        # Parse the inputs
33        ci = context.inputs
34        StationIDs = ci.get("StationIDs", []) 
35        Delimiter = ci["Delimiter"]
36        Counties = ci.get('Counties', [])
37        BBox = ci.get("BBox", None)
38        DataTypes = ci.get("DataTypes", [])
39
40        # Get required start/end times
41        StartDateTime = ci['StartDateTime']
42        StartDateTime = gws_utils.revertDateTimeToLongString(StartDateTime)
43        EndDateTime = ci['EndDateTime']
44        EndDateTime = gws_utils.revertDateTimeToLongString(EndDateTime)
45        TimeChunk = ci['OutputTimeChunk']
46
47        ObsTableName = ci['ObsTableName'] 
48        StationsFile = ci['StationsFile']
49
50        context.setStatus(STATUS.STARTED, 'Job is now running', 0)
51
52        # Always need a FileSet, even if empty
53        self.fileSet = context.outputs['FileSet'] = FileSet()
54
55        stations_file = "uk_stations.txt"
56        sf_path = context.processDir + '/outputs/' + stations_file
57
58        # Download stations file and read it if possible
59        if StationsFile:
60            stationIDs = exuk_utils.extractStationsFromFile(StationsFile)
61
62        # Get station IDs if not provided
63        if StationIDs == []:
64            # Check we have either counties or bbox to search domain on
65            if Counties == [] and BBox == None:
66                raise Exception("Invalid arguments provided. Must provide either a list of station IDs, a list of counties or a valid geographical bounding box.")
67
68            # Call code to get Weather Stations
69            StationIDs = gws_utils.getStationList(Counties, BBox, DataTypes, StartDateTime,
70                           EndDateTime, sf_path)
71
72            # Call code to get Weather Stations and write file
73            StationIDs.sort()
74
75        # else write the file one per station id per line
76        else:
77            StationIDs.sort()
78            fout = open(sf_path, "w")
79            fout.write("\r\n".join([str(st_id) for st_id in StationIDs])) 
80            fout.close()
81
82        # Now check limit on number of station IDs
83        """
84        n_src_ids = len(StationsIDs)
85        if n_src_ids > 100 and chunk_size == "decadal":
86            raise Exception("The number of selected station IDs has been calculated to be greater than 100. Please select a chunk size other than 'decadal' for such as large volume of data.")
87
88        N_YEARS = ?
89        if n_years > 1 and n_src_ids > 100:
90            raise Exception("The number of selected station IDs has been calculated to be greater than 100. Please select a time window no longer than 1 year.")
91        """
92
93        self._addFileToFileSet(sf_path, "Station IDs file used for extraction of station data.", FLAG.DATA)
94
95        context.outputs['ProcessSpecificContent'] = {"StationIDs": " ".join(StationIDs)}
96
97        # Pretend that took 10% of time
98        context.setStatus(STATUS.STARTED, 'Extracted station ID list.', 10)
99
100        # Now extract the data itself
101        data_file_prefix = "station_data"
102        if Delimiter == "comma":
103            ext = "csv"
104        else:
105            ext = "txt"
106
107        df_path_base = context.processDir + '/outputs/' + data_file_prefix
108
109        # Need temp dir for big file extractions
110        process_tmp_dir = context.processDir + '/tmp'
111
112        output_file_paths = exuk_utils.extractStationDataByTimeChunk([ObsTableName], StartDateTime, 
113                       EndDateTime, StationIDs, TimeChunk, df_path_base, Delimiter,
114                       ext, process_tmp_dir)
115       
116        for df_path in output_file_paths: 
117            self._addFileToFileSet(df_path, "Station data file.", FLAG.DATA)
118
119#        context.setStatus(STATUS.COMPLETED, 'The End', 100)
120#        completionTime = time.time()
121#        process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime)
122        process_support.finishProcess(context, self.fileSet, startTime)
123
124
125    def _addFileToFileSet(self, path, info, type=FLAG.DATA):
126        "Adds file to output file set."
127        f_size = os.path.getsize(path)
128        output_basename = os.path.basename(path)
129        self.fileSet.contents.append(FileSet(type, output_basename, f_size, info))
130
131
132    def dryRun(self, context):
133
134        self.fileSet = context.outputs['FileSet'] = FileSet()
135        self.fileSet.contents.append(FileSet(FLAG.DATA, "testfile.txt", 30000, "Some file"))
136
137        process_support.finishDryRun(context, [("meta", "data")], self.fileSet,
138                                         60, acceptedMessage='Dry run complete')
139
Note: See TracBrowser for help on using the repository browser.