source: cows_wps/trunk/process_modules/extract_uk_station_data.py @ 6910

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/extract_uk_station_data.py@6910
Revision 6910, 5.2 KB checked in by astephen, 11 years ago (diff)

fixes

Line 
1"""
2extract_uk_station_data.py
3===================
4
5Process extract_uk_station_data that holds the ExtractUKStationData class.
6
7"""
8
9import os, stat, time, sys
10
11from cows_wps.process_handler.fileset import FileSet, FLAG
12import cows_wps.process_handler.ddp_process_support as ddp_process_support
13from cows_wps.process_handler.context.process_status import STATUS
14
15import process_modules.lib_get_weather_stations.utils as gws_utils
16import process_modules.lib_extract_uk_station_data.utils as exuk_utils
17
18# Import local modules
19sys.path.append("/home/badc/software/datasets/ukmo-midas/scripts/extract")
20# MIDAS Station search code
21#import getStations
22import midasSubsetter
23
24
25class ExtractUKStationData(object):
26
27    def __call__(self, context):
28       
29        startTime = time.time()
30        jobId = os.path.basename(context.processDir)
31
32        # Parse the inputs
33        ci = context.inputs
34        Username = ci['Username']
35        StationIDs = ci.get("StationIDs", []) 
36        Delimiter = ci["Delimiter"]
37        Counties = ci.get('Counties', [])
38        BBox = ci.get("BBox", None)
39        DataTypes = ci.get("DataTypes", [])
40
41        # Get required start/end times
42        StartDateTime = ci['StartDateTime']
43        StartDateTime = gws_utils.revertDateTimeToLongString(StartDateTime)
44        EndDateTime = ci['EndDateTime']
45        EndDateTime = gws_utils.revertDateTimeToLongString(EndDateTime)
46        TimeChunk = ci['OutputTimeChunk']
47
48        ObsTableName = ci['ObsTableName'] 
49
50        context.setStatus(STATUS.STARTED, 'Job is now running', 0)
51
52        # Always need a FileSet, even if empty
53        self.fileSet = context.outputs['FileSet'] = FileSet()
54
55        stations_file = "uk_stations.txt"
56        sf_path = context.processDir + '/outputs/' + stations_file
57
58        # Get station IDs if not provided
59        if StationIDs == []:
60            # Check we have either counties or bbox to search domain on
61            if Counties == [] and BBox == None:
62                raise Exception("Invalid arguments provided. Must provide either a list of station IDs, a list of counties or a valid geographical bounding box.")
63
64            # Call code to get Weather Stations
65            StationIDs = gws_utils.getStationList(Counties, BBox, DataTypes, StartDateTime,
66                           EndDateTime, sf_path)
67
68            # Call code to get Weather Stations and write file
69#            st_getter = getStations.StationIDGetter(Counties, bbox=BBox, dataTypes=DataTypes,
70#                       startTime=StartDateTime, endTime=EndDateTime, outputFile=sf_path, noprint=1)
71#            StationIDs = st_getter.stList
72            StationIDs.sort()
73
74        # else write the file one per station id per line
75        else:
76            StationIDs.sort()
77            fout = open(sf_path, "w")
78            fout.write("\r\n".join([str(st_id) for st_id in StationIDs])) 
79            fout.close()
80
81        # Now check limit on number of station IDs
82        n_src_ids = len(StationsIDs)
83        if n_src_ids > 100 and chunk_size == "decadal":
84            raise Exception("The number of selected station IDs has been calculated to be greater than 100. Please select a chunk size other than 'decadal' for such as large volume of data.")
85
86        N_YEARS = ?
87        if n_years > 1 and n_src_ids > 100:
88            raise Exception("The number of selected station IDs has been calculated to be greater than 100. Please select a time window no longer than 1 year.")
89
90        self._addFileToFileSet(sf_path, "Station IDs file used for extraction of station data.", FLAG.DATA)
91
92        context.outputs['ProcessSpecificContent'] = {"StationIDs": " ".join(StationIDs)}
93
94        # Pretend that took 10% of time
95        context.setStatus(STATUS.STARTED, 'Extracted station ID list.', 10)
96
97        # Now extract the data itself
98        data_file_prefix = "station_data"
99        if Delimiter == "comma":
100            ext = "csv"
101        else:
102            ext = "txt"
103
104        df_path_base = context.processDir + '/outputs/' + data_file_prefix
105
106        # Need temp dir for big file extractions
107        process_tmp_dir = context.processDir + '/tmp'
108
109        output_file_paths = exuk_utils.extractStationDataByTimeChunk([ObsTableName], StartDateTime, EndDateTime,
110                       StationIDs, TimeChunk, df_path_base, ext, process_tmp_dir)
111#        startTime12Chars = StartDateTime[:-2]
112#        endTime12Chars = EndDateTime[:-2]
113
114#        midasSubsetter.MIDASSubsetter([ObsTableName], df_path, startTime=startTime12Chars,
115#                       endTime=endTime12Chars, src_ids=StationIDs,
116#                       tempDir=process_tmp_dir)
117       
118        for df_path in output_file_paths: 
119            self._addFileToFileSet(df_path, "Station data file.", FLAG.DATA)
120
121        context.setStatus(STATUS.COMPLETED, 'The End', 100)
122        completionTime = time.time()
123        ddp_process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime)
124
125
126    def _addFileToFileSet(self, path, info, type=FLAG.DATA):
127        "Adds file to output file set."
128        f_size = os.path.getsize(path)
129        output_basename = os.path.basename(path)
130        self.fileSet.contents.append(FileSet(type, output_basename, f_size, info))
131
132
133    def dryRun(self, context):
134
135        # Not implemented for sync jobs
136        pass
137
Note: See TracBrowser for help on using the repository browser.