1 | """ |
---|
2 | extract_uk_station_data.py |
---|
3 | =================== |
---|
4 | |
---|
5 | Process extract_uk_station_data that holds the ExtractUKStationData class. |
---|
6 | |
---|
7 | """ |
---|
8 | |
---|
9 | import os, stat, time, sys |
---|
10 | |
---|
11 | from cows_wps.process_handler.fileset import FileSet, FLAG |
---|
12 | import cows_wps.process_handler.ddp_process_support as ddp_process_support |
---|
13 | from cows_wps.process_handler.context.process_status import STATUS |
---|
14 | |
---|
15 | # Import local modules |
---|
16 | sys.path.append("/home/badc/software/datasets/ukmo-midas/scripts/extract") |
---|
17 | # MIDAS Station search code |
---|
18 | import getStations |
---|
19 | import midasSubsetter |
---|
20 | |
---|
21 | |
---|
22 | class ExtractUKStationData(object): |
---|
23 | |
---|
24 | def __call__(self, context): |
---|
25 | |
---|
26 | startTime = time.time() |
---|
27 | jobId = os.path.basename(context.processDir) |
---|
28 | |
---|
29 | # Parse the inputs |
---|
30 | ci = context.inputs |
---|
31 | Username = ci['Username'] |
---|
32 | StationIDs = ci.get("StationIDs", []) |
---|
33 | Delimiter = ci["Delimiter"] |
---|
34 | Counties = ci.get('Counties', []) |
---|
35 | BBox = ci.get("BBox", None) |
---|
36 | DataTypes = ci.get("DataTypes", []) |
---|
37 | |
---|
38 | # Get required start/end times |
---|
39 | StartDateTime = ci['StartDateTime'] |
---|
40 | StartDateTime = self._revertDateTimeToLongString(StartDateTime) |
---|
41 | EndDateTime = ci['EndDateTime'] |
---|
42 | EndDateTime = self._revertDateTimeToLongString(EndDateTime) |
---|
43 | |
---|
44 | ObsTableName = ci['ObsTableName'] |
---|
45 | |
---|
46 | context.setStatus(STATUS.STARTED, 'Job is now running', 0) |
---|
47 | |
---|
48 | # Always need a FileSet, even if empty |
---|
49 | self.fileSet = context.outputs['FileSet'] = FileSet() |
---|
50 | |
---|
51 | stations_file = "uk_stations.txt" |
---|
52 | sf_path = context.processDir + '/outputs/' + stations_file |
---|
53 | |
---|
54 | # Get station IDs if not provided |
---|
55 | if StationIDs == []: |
---|
56 | # Check we have either counties or bbox to search domain on |
---|
57 | if Counties == [] and BBox == None: |
---|
58 | raise Exception("Invalid arguments provided. Must provide either a list of station IDs, a list of counties or a valid geographical bounding box.") |
---|
59 | |
---|
60 | # Call code to get Weather Stations and write file |
---|
61 | st_getter = getStations.StationIDGetter(Counties, bbox=BBox, dataTypes=DataTypes, |
---|
62 | startTime=StartDateTime, endTime=EndDateTime, outputFile=sf_path, noprint=1) |
---|
63 | StationIDs = st_getter.stList |
---|
64 | StationIDs.sort() |
---|
65 | |
---|
66 | # else write the file one per station id per line |
---|
67 | else: |
---|
68 | StationIDs.sort() |
---|
69 | fout = open(sf_path, "w") |
---|
70 | fout.write("\r\n".join([str(st_id) for st_id in StationIDs])) |
---|
71 | fout.close() |
---|
72 | |
---|
73 | self._addFileToFileSet(sf_path, "Station IDs file used for extraction of station data.", FLAG.DATA) |
---|
74 | |
---|
75 | context.outputs['ProcessSpecificContent'] = {"StationIDs": " ".join(StationIDs)} |
---|
76 | |
---|
77 | # Pretend that took 10% of time |
---|
78 | context.setStatus(STATUS.COMPLETED, 'The End', 10) |
---|
79 | |
---|
80 | # Now extract the data itself |
---|
81 | data_file_root = "station_data" |
---|
82 | if Delimiter == "comma": |
---|
83 | data_file = data_file_root + ".csv" |
---|
84 | else: |
---|
85 | data_file = data_file_root + ".txt" |
---|
86 | |
---|
87 | df_path = context.processDir + '/outputs/' + data_file |
---|
88 | |
---|
89 | # Need temp dir for big file extractions |
---|
90 | process_tmp_dir = context.processDir + '/tmp' |
---|
91 | |
---|
92 | midasSubsetter.MIDASSubsetter([ObsTableName], df_path, startTime=StartDateTime, |
---|
93 | endTime=EndDateTime, src_ids=StationIDs, tempDir=process_tmp_dir) |
---|
94 | |
---|
95 | self._addFileToFileSet(df_path, "Station data file.", FLAG.DATA) |
---|
96 | |
---|
97 | context.setStatus(STATUS.COMPLETED, 'The End', 100) |
---|
98 | completionTime = time.time() |
---|
99 | ddp_process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime) |
---|
100 | |
---|
101 | |
---|
102 | def _addFileToFileSet(self, path, info, type=FLAG.DATA): |
---|
103 | "Adds file to output file set." |
---|
104 | f_size = os.path.getsize(path) |
---|
105 | output_basename = os.path.basename(path) |
---|
106 | self.fileSet.contents.append(FileSet(type, output_basename, f_size, info)) |
---|
107 | |
---|
108 | |
---|
109 | def _revertDateTimeToLongString(self, dt): |
---|
110 | """ |
---|
111 | Turns a date/time into a long string as needed by midas code. |
---|
112 | """ |
---|
113 | return str(dt).replace("-", "").replace(" ", "").replace("T", "").replace(":", "") |
---|
114 | |
---|
115 | def dryRun(self, context): |
---|
116 | |
---|
117 | # Not implemented for sync jobs |
---|
118 | pass |
---|
119 | |
---|