source: cows_wps/trunk/process_modules/extract_uk_station_data.py @ 6124

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/extract_uk_station_data.py@6124
Revision 6124, 4.1 KB checked in by astephen, 11 years ago (diff)

adding more good stuff for the midas extraction bits.

Line 
1"""
2extract_uk_station_data.py
3===================
4
5Process extract_uk_station_data that holds the ExtractUKStationData class.
6
7"""
8
9import os, stat, time, sys
10
11from cows_wps.process_handler.fileset import FileSet, FLAG
12import cows_wps.process_handler.ddp_process_support as ddp_process_support
13from cows_wps.process_handler.context.process_status import STATUS
14
15# Import local modules
16sys.path.append("/home/badc/software/datasets/ukmo-midas/scripts/extract")
17# MIDAS Station search code
18import getStations
19import midasSubsetter
20
21
22class ExtractUKStationData(object):
23
24    def __call__(self, context):
25       
26        startTime = time.time()
27        jobId = os.path.basename(context.processDir)
28
29        # Parse the inputs
30        ci = context.inputs
31        Username = ci['Username']
32        StationIDs = ci.get("StationIDs", []) 
33        Delimiter = ci["Delimiter"]
34        Counties = ci.get('Counties', [])
35        BBox = ci.get("BBox", None)
36        DataTypes = ci.get("DataTypes", [])
37
38        # Get required start/end times
39        StartDateTime = ci['StartDateTime']
40        StartDateTime = self._revertDateTimeToLongString(StartDateTime)
41        EndDateTime = ci['EndDateTime']
42        EndDateTime = self._revertDateTimeToLongString(EndDateTime)
43
44        ObsTableName = ci['ObsTableName'] 
45
46        context.setStatus(STATUS.STARTED, 'Job is now running', 0)
47
48        # Always need a FileSet, even if empty
49        self.fileSet = context.outputs['FileSet'] = FileSet()
50
51        stations_file = "uk_stations.txt"
52        sf_path = context.processDir + '/outputs/' + stations_file
53
54        # Get station IDs if not provided
55        if StationIDs == []:
56            # Check we have either counties or bbox to search domain on
57            if Counties == [] and BBox == None:
58                raise Exception("Invalid arguments provided. Must provide either a list of station IDs, a list of counties or a valid geographical bounding box.")
59
60            # Call code to get Weather Stations and write file
61            st_getter = getStations.StationIDGetter(Counties, bbox=BBox, dataTypes=DataTypes,
62                       startTime=StartDateTime, endTime=EndDateTime, outputFile=sf_path, noprint=1)
63            StationIDs = st_getter.stList
64            StationIDs.sort()
65
66        # else write the file one per station id per line
67        else:
68            StationIDs.sort()
69            fout = open(sf_path, "w")
70            fout.write("\r\n".join([str(st_id) for st_id in StationIDs])) 
71            fout.close()
72
73        self._addFileToFileSet(sf_path, "Station IDs file used for extraction of station data.", FLAG.DATA)
74
75        context.outputs['ProcessSpecificContent'] = {"StationIDs": " ".join(StationIDs)}
76
77        # Pretend that took 10% of time
78        context.setStatus(STATUS.COMPLETED, 'The End', 10)
79
80        # Now extract the data itself
81        data_file_root = "station_data"
82        if Delimiter == "comma":
83            data_file = data_file_root + ".csv"
84        else:
85            data_file = data_file_root + ".txt"
86
87        df_path = context.processDir + '/outputs/' + data_file
88
89        # Need temp dir for big file extractions
90        process_tmp_dir = context.processDir + '/tmp'
91
92        midasSubsetter.MIDASSubsetter([ObsTableName], df_path, startTime=StartDateTime, 
93                       endTime=EndDateTime, src_ids=StationIDs, tempDir=process_tmp_dir)
94
95        self._addFileToFileSet(df_path, "Station data file.", FLAG.DATA)
96
97        context.setStatus(STATUS.COMPLETED, 'The End', 100)
98        completionTime = time.time()
99        ddp_process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime)
100
101
102    def _addFileToFileSet(self, path, info, type=FLAG.DATA):
103        "Adds file to output file set."
104        f_size = os.path.getsize(path)
105        output_basename = os.path.basename(path)
106        self.fileSet.contents.append(FileSet(type, output_basename, f_size, info))
107
108
109    def _revertDateTimeToLongString(self, dt):
110        """
111        Turns a date/time into a long string as needed by midas code.
112        """
113        return str(dt).replace("-", "").replace(" ", "").replace("T", "").replace(":", "")
114
115    def dryRun(self, context):
116
117        # Not implemented for sync jobs
118        pass
119
Note: See TracBrowser for help on using the repository browser.