Ignore:
Timestamp:
14/12/09 16:03:12 (12 years ago)
Author:
astephen
Message:

adding more good stuff for the midas extraction bits.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • cows_wps/trunk/process_modules/extract_uk_station_data.py

    r6116 r6124  
    77""" 
    88 
    9 import os, stat, time 
     9import os, stat, time, sys 
    1010 
    1111from cows_wps.process_handler.fileset import FileSet, FLAG 
     
    1616sys.path.append("/home/badc/software/datasets/ukmo-midas/scripts/extract") 
    1717# MIDAS Station search code 
     18import getStations 
    1819import midasSubsetter 
    19  
    20 # Set up logger 
    21 log = logging.getLogger(__name__) 
    22 log.setLevel(logging.DEBUG) 
    23  
    2420 
    2521 
    2622class ExtractUKStationData(object): 
     23 
    2724    def __call__(self, context): 
    2825         
     
    3431        Username = ci['Username'] 
    3532        StationIDs = ci.get("StationIDs", [])  
    36         Counties 
    37          
     33        Delimiter = ci["Delimiter"] 
     34        Counties = ci.get('Counties', []) 
     35        BBox = ci.get("BBox", None) 
     36        DataTypes = ci.get("DataTypes", []) 
     37 
     38        # Get required start/end times 
     39        StartDateTime = ci['StartDateTime'] 
     40        StartDateTime = self._revertDateTimeToLongString(StartDateTime) 
     41        EndDateTime = ci['EndDateTime'] 
     42        EndDateTime = self._revertDateTimeToLongString(EndDateTime) 
     43 
     44        ObsTableName = ci['ObsTableName']  
     45 
    3846        context.setStatus(STATUS.STARTED, 'Job is now running', 0) 
    3947 
    4048        # Always need a FileSet, even if empty 
    41         fileSet = context.outputs['FileSet'] = FileSet() 
     49        self.fileSet = context.outputs['FileSet'] = FileSet() 
    4250 
    43         context.outputs['ProcessSpecificContent'] = {}  
    44         MyFile = "uk_station_data.txt" 
    45         MyFilePath = context.processDir + '/outputs/' + MyFile 
     51        stations_file = "uk_stations.txt" 
     52        sf_path = context.processDir + '/outputs/' + stations_file 
    4653 
    4754        # Get station IDs if not provided 
    4855        if StationIDs == []: 
    49             # Try counties next 
    50             
     56            # Check we have either counties or bbox to search domain on 
     57            if Counties == [] and BBox == None: 
     58                raise Exception("Invalid arguments provided. Must provide either a list of station IDs, a list of counties or a valid geographical bounding box.") 
    5159 
    52         context.log.info('Written output file: %s' % MyFile) 
    53         filesize = os.stat(MyFilePath)[stat.ST_SIZE] 
     60            # Call code to get Weather Stations and write file 
     61            st_getter = getStations.StationIDGetter(Counties, bbox=BBox, dataTypes=DataTypes, 
     62                       startTime=StartDateTime, endTime=EndDateTime, outputFile=sf_path, noprint=1) 
     63            StationIDs = st_getter.stList 
     64            StationIDs.sort() 
    5465 
    55         fileSet = context.outputs['FileSet'] = FileSet() 
    56         fileSet.contents.append(FileSet(FLAG.DATA, MyFile, filesize, 'The only output')) 
     66        # else write the file one per station id per line 
     67        else: 
     68            StationIDs.sort() 
     69            fout = open(sf_path, "w") 
     70            fout.write("\r\n".join([str(st_id) for st_id in StationIDs]))  
     71            fout.close() 
     72 
     73        self._addFileToFileSet(sf_path, "Station IDs file used for extraction of station data.", FLAG.DATA) 
     74 
     75        context.outputs['ProcessSpecificContent'] = {"StationIDs": " ".join(StationIDs)} 
     76 
     77        # Pretend that took 10% of time 
     78        context.setStatus(STATUS.COMPLETED, 'The End', 10) 
     79 
     80        # Now extract the data itself 
     81        data_file_root = "station_data" 
     82        if Delimiter == "comma": 
     83            data_file = data_file_root + ".csv" 
     84        else: 
     85            data_file = data_file_root + ".txt" 
     86 
     87        df_path = context.processDir + '/outputs/' + data_file 
     88 
     89        # Need temp dir for big file extractions 
     90        process_tmp_dir = context.processDir + '/tmp' 
     91 
     92        midasSubsetter.MIDASSubsetter([ObsTableName], df_path, startTime=StartDateTime,  
     93                       endTime=EndDateTime, src_ids=StationIDs, tempDir=process_tmp_dir) 
     94 
     95        self._addFileToFileSet(df_path, "Station data file.", FLAG.DATA) 
    5796 
    5897        context.setStatus(STATUS.COMPLETED, 'The End', 100) 
    59          
    6098        completionTime = time.time() 
    6199        ddp_process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime) 
    62100 
    63      
     101 
     102    def _addFileToFileSet(self, path, info, type=FLAG.DATA): 
     103        "Adds file to output file set." 
     104        f_size = os.path.getsize(path) 
     105        output_basename = os.path.basename(path) 
     106        self.fileSet.contents.append(FileSet(type, output_basename, f_size, info)) 
     107 
     108 
     109    def _revertDateTimeToLongString(self, dt): 
     110        """ 
     111        Turns a date/time into a long string as needed by midas code. 
     112        """ 
     113        return str(dt).replace("-", "").replace(" ", "").replace("T", "").replace(":", "") 
    64114 
    65115    def dryRun(self, context): 
     
    67117        # Not implemented for sync jobs 
    68118        pass 
     119 
Note: See TracChangeset for help on using the changeset viewer.