source: cows_wps/trunk/process_modules/extract_uk_station_data.py @ 6126

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/extract_uk_station_data.py@6126
Revision 6126, 4.3 KB checked in by astephen, 11 years ago (diff)

adnother

Line 
1"""
2extract_uk_station_data.py
3===================
4
5Process extract_uk_station_data that holds the ExtractUKStationData class.
6
7"""
8
9import os, stat, time, sys
10
11from cows_wps.process_handler.fileset import FileSet, FLAG
12import cows_wps.process_handler.ddp_process_support as ddp_process_support
13from cows_wps.process_handler.context.process_status import STATUS
14
15# Import local modules
16sys.path.append("/home/badc/software/datasets/ukmo-midas/scripts/extract")
17# MIDAS Station search code
18import getStations
19import midasSubsetter
20
21
22class ExtractUKStationData(object):
23
24    def __call__(self, context):
25       
26        startTime = time.time()
27        jobId = os.path.basename(context.processDir)
28
29        # Parse the inputs
30        ci = context.inputs
31        Username = ci['Username']
32        StationIDs = ci.get("StationIDs", []) 
33        Delimiter = ci["Delimiter"]
34        Counties = ci.get('Counties', [])
35        BBox = ci.get("BBox", None)
36        DataTypes = ci.get("DataTypes", [])
37
38        # Get required start/end times
39        StartDateTime = ci['StartDateTime']
40        StartDateTime = self._revertDateTimeToLongString(StartDateTime)
41        EndDateTime = ci['EndDateTime']
42        EndDateTime = self._revertDateTimeToLongString(EndDateTime)
43
44        ObsTableName = ci['ObsTableName'] 
45
46        context.setStatus(STATUS.STARTED, 'Job is now running', 0)
47
48        # Always need a FileSet, even if empty
49        self.fileSet = context.outputs['FileSet'] = FileSet()
50
51        stations_file = "uk_stations.txt"
52        sf_path = context.processDir + '/outputs/' + stations_file
53
54        # Get station IDs if not provided
55        if StationIDs == []:
56            # Check we have either counties or bbox to search domain on
57            if Counties == [] and BBox == None:
58                raise Exception("Invalid arguments provided. Must provide either a list of station IDs, a list of counties or a valid geographical bounding box.")
59
60            # Call code to get Weather Stations and write file
61            st_getter = getStations.StationIDGetter(Counties, bbox=BBox, dataTypes=DataTypes,
62                       startTime=StartDateTime, endTime=EndDateTime, outputFile=sf_path, noprint=1)
63            StationIDs = st_getter.stList
64            StationIDs.sort()
65
66        # else write the file one per station id per line
67        else:
68            StationIDs.sort()
69            fout = open(sf_path, "w")
70            fout.write("\r\n".join([str(st_id) for st_id in StationIDs])) 
71            fout.close()
72
73        self._addFileToFileSet(sf_path, "Station IDs file used for extraction of station data.", FLAG.DATA)
74
75        context.outputs['ProcessSpecificContent'] = {"StationIDs": " ".join(StationIDs)}
76
77        # Pretend that took 10% of time
78        context.setStatus(STATUS.COMPLETED, 'The End', 10)
79
80        # Now extract the data itself
81        data_file_root = "station_data"
82        if Delimiter == "comma":
83            data_file = data_file_root + ".csv"
84        else:
85            data_file = data_file_root + ".txt"
86
87        df_path = context.processDir + '/outputs/' + data_file
88
89        # Need temp dir for big file extractions
90        process_tmp_dir = context.processDir + '/tmp'
91
92        startTime12Chars = StartDateTime[:-2]
93        endTime12Chars = EndDateTime[:-2]
94 
95        midasSubsetter.MIDASSubsetter([ObsTableName], df_path, startTime=startTime12Chars, 
96                       endTime=endTime12Chars, src_ids=StationIDs, 
97                       tempDir=process_tmp_dir)
98       
99        self._addFileToFileSet(df_path, "Station data file.", FLAG.DATA)
100
101        context.setStatus(STATUS.COMPLETED, 'The End', 100)
102        completionTime = time.time()
103        ddp_process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime)
104
105
106    def _addFileToFileSet(self, path, info, type=FLAG.DATA):
107        "Adds file to output file set."
108        f_size = os.path.getsize(path)
109        output_basename = os.path.basename(path)
110        self.fileSet.contents.append(FileSet(type, output_basename, f_size, info))
111
112
113    def _revertDateTimeToLongString(self, dt):
114        """
115        Turns a date/time into a long string as needed by midas code.
116        """
117        return str(dt).replace("-", "").replace(" ", "").replace("T", "").replace(":", "")
118
119    def dryRun(self, context):
120
121        # Not implemented for sync jobs
122        pass
123
Note: See TracBrowser for help on using the repository browser.