source: cows_wps/trunk/process_modules/extract_uk_station_data.py @ 6873

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/extract_uk_station_data.py@6873
Revision 6873, 4.6 KB checked in by astephen, 11 years ago (diff)

Added and updated various code to match security, download URLs that are
secured and chunking of MIDAS outputs.

Line 
1"""
2extract_uk_station_data.py
3===================
4
5Process extract_uk_station_data that holds the ExtractUKStationData class.
6
7"""
8
9import os, stat, time, sys
10
11from cows_wps.process_handler.fileset import FileSet, FLAG
12import cows_wps.process_handler.ddp_process_support as ddp_process_support
13from cows_wps.process_handler.context.process_status import STATUS
14
15import process_modules.lib_get_weather_stations.utils as gws_utils
16import process_modules.lib_extract_uk_station_data.utils as exuk_utils
17
18# Import local modules
19sys.path.append("/home/badc/software/datasets/ukmo-midas/scripts/extract")
20# MIDAS Station search code
21#import getStations
22import midasSubsetter
23
24
25class ExtractUKStationData(object):
26
27    def __call__(self, context):
28       
29        startTime = time.time()
30        jobId = os.path.basename(context.processDir)
31
32        # Parse the inputs
33        ci = context.inputs
34        Username = ci['Username']
35        StationIDs = ci.get("StationIDs", []) 
36        Delimiter = ci["Delimiter"]
37        Counties = ci.get('Counties', [])
38        BBox = ci.get("BBox", None)
39        DataTypes = ci.get("DataTypes", [])
40
41        # Get required start/end times
42        StartDateTime = ci['StartDateTime']
43        StartDateTime = gws_utils.revertDateTimeToLongString(StartDateTime)
44        EndDateTime = ci['EndDateTime']
45        EndDateTime = gws_utils.revertDateTimeToLongString(EndDateTime)
46        TimeChunk = ci['OutputTimeChunk']
47
48        ObsTableName = ci['ObsTableName'] 
49
50        context.setStatus(STATUS.STARTED, 'Job is now running', 0)
51
52        # Always need a FileSet, even if empty
53        self.fileSet = context.outputs['FileSet'] = FileSet()
54
55        stations_file = "uk_stations.txt"
56        sf_path = context.processDir + '/outputs/' + stations_file
57
58        # Get station IDs if not provided
59        if StationIDs == []:
60            # Check we have either counties or bbox to search domain on
61            if Counties == [] and BBox == None:
62                raise Exception("Invalid arguments provided. Must provide either a list of station IDs, a list of counties or a valid geographical bounding box.")
63
64            # Call code to get Weather Stations
65            StationIDs = gws_utils.getStationList(Counties, BBox, DataTypes, StartDateTime,
66                           EndDateTime, sf_path)
67
68            # Call code to get Weather Stations and write file
69#            st_getter = getStations.StationIDGetter(Counties, bbox=BBox, dataTypes=DataTypes,
70#                       startTime=StartDateTime, endTime=EndDateTime, outputFile=sf_path, noprint=1)
71#            StationIDs = st_getter.stList
72            StationIDs.sort()
73
74        # else write the file one per station id per line
75        else:
76            StationIDs.sort()
77            fout = open(sf_path, "w")
78            fout.write("\r\n".join([str(st_id) for st_id in StationIDs])) 
79            fout.close()
80
81        self._addFileToFileSet(sf_path, "Station IDs file used for extraction of station data.", FLAG.DATA)
82
83        context.outputs['ProcessSpecificContent'] = {"StationIDs": " ".join(StationIDs)}
84
85        # Pretend that took 10% of time
86        context.setStatus(STATUS.STARTED, 'Extracted station ID list.', 10)
87
88        # Now extract the data itself
89        data_file_prefix = "station_data"
90        if Delimiter == "comma":
91            ext = "csv"
92        else:
93            ext = "txt"
94
95        df_path_base = context.processDir + '/outputs/' + data_file_prefix
96
97        # Need temp dir for big file extractions
98        process_tmp_dir = context.processDir + '/tmp'
99
100        output_file_paths = exuk_utils.extractStationDataByTimeChunk([ObsTableName], StartDateTime, EndDateTime,
101                       StationIDs, TimeChunk, df_path_base, ext, process_tmp_dir)
102#        startTime12Chars = StartDateTime[:-2]
103#        endTime12Chars = EndDateTime[:-2]
104
105#        midasSubsetter.MIDASSubsetter([ObsTableName], df_path, startTime=startTime12Chars,
106#                       endTime=endTime12Chars, src_ids=StationIDs,
107#                       tempDir=process_tmp_dir)
108       
109        for df_path in output_file_paths: 
110            self._addFileToFileSet(df_path, "Station data file.", FLAG.DATA)
111
112        context.setStatus(STATUS.COMPLETED, 'The End', 100)
113        completionTime = time.time()
114        ddp_process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime)
115
116
117    def _addFileToFileSet(self, path, info, type=FLAG.DATA):
118        "Adds file to output file set."
119        f_size = os.path.getsize(path)
120        output_basename = os.path.basename(path)
121        self.fileSet.contents.append(FileSet(type, output_basename, f_size, info))
122
123
124    def dryRun(self, context):
125
126        # Not implemented for sync jobs
127        pass
128
Note: See TracBrowser for help on using the repository browser.