Changeset 7069


Ignore:
Timestamp:
24/06/10 10:02:16 (9 years ago)
Author:
astephen
Message:

updated using process_base

Location:
cows_wps/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • cows_wps/trunk/cows_wps/controllers/submitter.py

    r7000 r7069  
    4343 
    4444        # Now we are only dealing with async requests 
     45        return wps_request_url 
    4546        cost_only_request_url = wps_request_url + "&Costonly=true" 
    4647 
  • cows_wps/trunk/process_modules/extract_uk_station_data.py

    r7068 r7069  
    88 
    99import os, stat, time, sys 
     10import logging 
    1011 
    1112from cows_wps.process_handler.fileset import FileSet, FLAG 
     
    1516import process_modules.lib_get_weather_stations.utils as gws_utils 
    1617import process_modules.lib_extract_uk_station_data.utils as exuk_utils 
    17  
    18 # Import local modules 
    19 sys.path.append("/home/badc/software/datasets/ukmo-midas/scripts/extract") 
    20 # MIDAS Station search code 
    21 #import getStations 
    22 import midasSubsetter 
     18import process_modules.process_base 
    2319 
    2420 
    25 class ExtractUKStationData(object): 
    2621 
    27     def __call__(self, context): 
    28          
    29         startTime = time.time() 
    30         jobId = os.path.basename(context.processDir) 
     22log = logging.getLogger(__name__) 
     23log.setLevel(logging.DEBUG) 
    3124 
    32         # Parse the inputs 
    33         ci = context.inputs 
    34         StationIDs = ci.get("StationIDs", [])  
    35         Delimiter = ci["Delimiter"] 
    36         Counties = ci.get('Counties', []) 
    37         BBox = ci.get("BBox", None) 
    38         DataTypes = ci.get("DataTypes", []) 
    3925 
    40         # Get required start/end times 
    41         StartDateTime = ci['StartDateTime'] 
    42         StartDateTime = gws_utils.revertDateTimeToLongString(StartDateTime) 
    43         EndDateTime = ci['EndDateTime'] 
    44         EndDateTime = gws_utils.revertDateTimeToLongString(EndDateTime) 
    45         TimeChunk = ci['OutputTimeChunk'] 
     26class ExtractUKStationData(process_modules.process_base.ProcessBase): 
    4627 
    47         ObsTableName = ci['ObsTableName']  
    48         StationsFile = ci['StationsFile'] 
     28    # Define arguments that we need to set from inputs 
     29    args_to_set = ["StationIDs", "Counties", "BBox", "DataTypes", "StartDateTime", "EndDateTime", 
     30                   "Delimiter", "OutputTimeChunk", "ObsTableName", "StationsFile"]  
    4931 
    50         context.setStatus(STATUS.STARTED, 'Job is now running', 0) 
     32    # Define defaults for arguments that might not be set 
     33    input_arg_defaults = {"StationIDs": [],  
     34                          "Counties": [], 
     35                          "BBox": None, 
     36                          "DataTypes": [], 
     37                          "StationsFile": None, 
     38                         } 
    5139 
    52         # Always need a FileSet, even if empty 
    53         self.fileSet = context.outputs['FileSet'] = FileSet() 
     40    # Define a dictionary for arguments that need to be processed 
     41    # before they are set (with values as the function doing the processing). 
     42    arg_processers = {"StartDateTime": gws_utils.revertDateTimeToLongString, 
     43                      "EndDateTime": gws_utils.revertDateTimeToLongString} 
    5444 
    55         stations_file = "uk_stations.txt" 
    56         sf_path = context.processDir + '/outputs/' + stations_file 
    5745 
    58         # Download stations file and read it if possible 
    59         if StationsFile: 
    60             stationIDs = exuk_utils.extractStationsFromFile(StationsFile) 
     46    def _executeProc(self, context, dry_run): 
     47        """ 
     48        This is called to step through the various parts of the process 
     49        executing the actual process if ``dry_run`` is False and just 
     50        returning information on the volume and duration of the outputs 
     51        if ``dry_run`` is True. 
     52        """ 
     53        # Call standard _setup 
     54        self._setup(context) 
     55        a = self.args 
    6156 
    62         # Get station IDs if not provided 
    63         if StationIDs == []: 
    64             # Check we have either counties or bbox to search domain on 
    65             if Counties == [] and BBox == None: 
    66                 raise Exception("Invalid arguments provided. Must provide either a list of station IDs, a list of counties or a valid geographical bounding box.") 
     57        if not dry_run: 
     58            # Now set status to started 
     59            context.setStatus(STATUS.STARTED, 'Job is now running', 0) 
    6760 
    68             # Call code to get Weather Stations 
    69             StationIDs = gws_utils.getStationList(Counties, BBox, DataTypes, StartDateTime, 
    70                            EndDateTime, sf_path) 
    71  
    72             # Call code to get Weather Stations and write file 
    73             StationIDs.sort() 
    74  
    75         # else write the file one per station id per line 
    76         else: 
    77             StationIDs.sort() 
    78             fout = open(sf_path, "w") 
    79             fout.write("\r\n".join([str(st_id) for st_id in StationIDs]))  
    80             fout.close() 
     61        # Resolve the list of stations 
     62        stationList = self._resolveStationList(context, dry_run) 
    8163 
    8264        # Now check limit on number of station IDs 
    83         nStations = len(StationsIDs) 
    84         if nStations > 100 and TimeChunk == "decadal": 
     65        nStations = len(stationList) 
     66 
     67        if nStations > 100 and a["OutputTimeChunk"] == "decadal": 
    8568            raise Exception("The number of selected station IDs has been calculated to be greater than 100. Please select a chunk size other than 'decadal' for such as large volume of data.") 
    8669 
    87         nYears = int(EndDateTime[:4]) - int(StartDateTime[:4]) 
     70        nYears = int(a["EndDateTime"][:4]) - int(a["StartDateTime"][:4]) 
    8871 
    8972        if nYears > 1 and nStations > 100: 
    9073            raise Exception("The number of selected station IDs has been calculated to be greater than 100. Please select a time window no longer than 1 year.") 
    9174 
    92         self._addFileToFileSet(sf_path, "Station IDs file used for extraction of station data.", FLAG.DATA) 
    93  
    94         context.outputs['ProcessSpecificContent'] = {"StationIDs": " ".join(StationIDs)} 
    95  
    96         # Pretend that took 10% of time 
    97         context.setStatus(STATUS.STARTED, 'Extracted station ID list.', 10) 
    98  
    99         # Now extract the data itself 
    100         data_file_prefix = "station_data" 
    101         if Delimiter == "comma": 
     75        # Define data file base 
     76        prefix = "station_data" 
     77        if a["Delimiter"] == "comma": 
    10278            ext = "csv" 
    10379        else: 
    10480            ext = "txt" 
    10581 
    106         df_path_base = context.processDir + '/outputs/' + data_file_prefix 
     82        dataFileBase = os.path.join(context.processDir, 'outputs', prefix) 
    10783 
    108         # Need temp dir for big file extractions 
    109         process_tmp_dir = context.processDir + '/tmp' 
     84        if not dry_run: 
     85            # Estimate we are 5% of the way through  
     86            context.setStatus(STATUS.STARTED, 'Extracted station ID list.', 5) 
    11087 
    111         output_file_paths = exuk_utils.extractStationDataByTimeChunk([ObsTableName], StartDateTime,  
    112                        EndDateTime, StationIDs, TimeChunk, df_path_base, Delimiter, 
    113                        ext, process_tmp_dir) 
     88            # Need temp dir for big file extractions 
     89            procTmpDir = os.path.join(context.processDir, 'tmp') 
     90 
     91            outputPaths = exuk_utils.extractStationDataByTimeChunk([a["ObsTableName"]], a["StartDateTime"],  
     92                       a["EndDateTime"], stationList, a["OutputTimeChunk"], dataFileBase, a["Delimiter"], 
     93                       ext, procTmpDir) 
    11494        
    115         for df_path in output_file_paths:  
    116             self._addFileToFileSet(df_path, "Station data file.", FLAG.DATA) 
     95            for outputPath in outputPaths:  
     96                self._addFileToOutputs(outputPath, "Station data file.") 
    11797 
    118 #        context.setStatus(STATUS.COMPLETED, 'The End', 100) 
    119 #        completionTime = time.time() 
    120 #        process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime) 
    121         process_support.finishProcess(context, self.fileSet, startTime) 
     98            # Finish up by calling function to set status to complete and zip up files etc 
     99            # In this case we set keep = True so that weather station file is accessible to downstream process 
     100            # without unzipping. This is fine as files are small. 
     101            process_support.finishProcess(context, self.fileSet, self.startTime, keep = True) 
     102 
     103        else: 
     104            outputPaths = ["%s-%s.%s" % (dataFileBase, i, ext) for i in range(3)]  
     105            for outputPath in outputPaths: 
     106                self._addFileToOutputs(outputPath, "Station data file.", size = 100000) 
     107 
     108            estimated_duration = 300 # seconds 
     109            process_support.finishDryRun(context, [], self.fileSet, 
     110                            estimated_duration, acceptedMessage = 'Dry run complete') 
    122111 
    123112 
    124     def _addFileToFileSet(self, path, info, type=FLAG.DATA): 
    125         "Adds file to output file set." 
    126         f_size = os.path.getsize(path) 
    127         output_basename = os.path.basename(path) 
    128         self.fileSet.contents.append(FileSet(type, output_basename, f_size, info)) 
     113    def _resolveStationList(self, context, dry_run): 
     114        """ 
     115        Works out whether we need to generate a station list or use those 
     116        sent as inputs. 
     117        """ 
     118        a = self.args 
     119        stationList = None 
     120 
     121        # Use input station list if provided 
     122        if a["StationIDs"] != []: 
     123            stationList = a["StationIDs"] 
     124 
     125        # Use stationsFile is we have one 
     126        elif a["StationsFile"]: 
     127            stationList = exuk_utils.extractStationsFromFile(a["StationsFile"])   
     128 
     129        # Add output file 
     130        stationsFile = 'weather_stations.txt' 
     131        stationsFilePath = os.path.join(context.processDir, "outputs", stationsFile) 
     132 
     133        if not dry_run: 
     134 
     135            if stationList == None: 
     136                # Call code to get Weather Stations 
     137                stationList = gws_utils.getStationList(Counties, BBox, DataTypes, StartDateTime, 
     138                           EndDateTime, sf_path) 
     139 
     140            # Write the file one per station id per line 
     141            stationList.sort() 
     142 
     143            fout = open(stationsFilePath, "w") 
     144            fout.write("\r\n".join([str(station) for station in stationList])) 
     145            fout.close() 
     146 
     147            # Add the stations list to the XML output section: ProcessSpecificContent 
     148            context.outputs['ProcessSpecificContent'] = {"WeatherStations": " ".join(stationList)} 
     149 
     150            # Add file to outputs 
     151            self._addFileToOutputs(stationsFilePath, 'Weather Stations File') 
     152 
     153        else: 
     154 
     155            # Estimate size of outputs by estimating the number of stations 
     156            if len(a["Counties"]) > 0: 
     157                nEstimatedStations = len(a["Counties"]) * 15 
     158            else: 
     159                (w, s, e, n) = a["BBox"] 
     160                lonExtent = abs(e - w) 
     161                latExtent = n - s 
     162                nEstimatedStations = int(lonExtent * latExtent * 50) 
     163 
     164            estimatedVolume = nEstimatedStations * 5 
     165            self._addFileToOutputs(stationsFilePath, 'Weather Stations File', size = estimatedVolume) 
     166 
     167            # Make up a station list 
     168            stationList = [-1] * 100 
     169 
     170        return stationList 
    129171 
    130172 
    131     def dryRun(self, context): 
    132  
    133         self.fileSet = context.outputs['FileSet'] = FileSet() 
    134         self.fileSet.contents.append(FileSet(FLAG.DATA, "testfile.txt", 30000, "Some file")) 
    135  
    136         process_support.finishDryRun(context, [("meta", "data")], self.fileSet, 
    137                                          60, acceptedMessage='Dry run complete') 
    138  
     173    def _validateInputs(self): 
     174        """ 
     175        Runs specific checking of arguments and their compatibility. 
     176        """ 
     177        a = self.args 
     178        if a["Counties"] == [] and a["BBox"] == None and a["StationIDs"] == [] and a["StationsFile"] == None: 
     179            raise Exception("Invalid arguments provided. Must provide one of (i) a geographical bounding box, (ii) a list of counties, (iii) a set of station IDs or (iv) a file containing a set of selected station IDs.") 
     180   
Note: See TracChangeset for help on using the changeset viewer.