source: cows_wps/trunk/process_modules/subset_pp_file.py @ 7328

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/subset_pp_file.py@7328
Revision 7328, 4.1 KB checked in by astephen, 10 years ago (diff)

added new subset PP test proc

Line 
1"""
2subset_pp_file.py
3===================
4
5Process subset_pp_file that holds the SubsetPPFile class.
6
7"""
8
9# Standard library imports
10import os, stat, time, sys, logging
11
12# WPS imports
13from cows_wps.process_handler.fileset import FileSet, FLAG
14import cows_wps.process_handler.process_support as process_support
15from cows_wps.process_handler.context.process_status import STATUS
16import process_modules.process_base
17
18# Import process-specific modules
19sys.path.append("/usr/local/cwps/swarv/lib/python")
20import pypp.pp_file
21
22
23# NOTE ABOUT LOGGING:
24# You can log with the context.log object
25
26log = logging.getLogger(__name__)
27log.setLevel(logging.DEBUG)
28
29
30class SubsetPPFile(process_modules.process_base.ProcessBase):
31
32    # Define arguments that we need to set from inputs
33    # Based on args listed in process config file
34    args_to_set = ["FilePath", "StashCodes"]
35
36    # Define defaults for arguments that might not be set
37    # A dictionary of arguments that we can over-write default values for
38    # Some args might be mutually-exclusive or inclusive so useful to set
39    # here as well as in the config file.
40    input_arg_defaults = {}
41
42    # Define a dictionary for arguments that need to be processed
43    # before they are set (with values as the function doing the processing).
44    arg_processers = {}
45
46   
47    def _executeProc(self, context, dry_run):
48        """
49        This is called to step through the various parts of the process
50        executing the actual process if ``dry_run`` is False and just
51        returning information on the volume and duration of the outputs
52        if ``dry_run`` is True.
53        """
54        # Call standard _setup
55        self._setup(context)
56        a = self.args
57
58        if not dry_run:
59            # Now set status to started
60            context.setStatus(STATUS.STARTED, 'Job is now running', 0)
61
62        # Add output file
63        outputFile = 'output.pp'
64        outputFilePath = os.path.join(context.processDir, "outputs", outputFile)
65
66        if not dry_run:
67            # Really generate output
68            outputStashCodes = a["StashCodes"]
69            ppf = pypp.pp_file.PP_File(a["FilePath"])
70            fieldsToKeep = []
71
72            for i in range(ppf.nfields):
73
74                h = ppf.headers[i]
75                if h.stashcode in a["StashCodes"]:
76                    outputStashCodes.append(h.stashcode)
77                    fieldsToKeep.appends(h.stashcode)
78                   
79
80            size = os.path.getsize(outputFilePath)
81
82            # Add the output list to the XML output section: ProcessSpecificContent
83            context.outputs['ProcessSpecificContent'] = {"PPFields": " ".join([str(f) for f in fieldsToKeep])}
84
85            fout = open(outputFilePath, "w")
86            fout.write("\n".join([str(i) for i in outputStashCodes]))
87            fout.close()
88
89            size = os.path.getsize(outputFilePath)
90
91        else:
92            # Make it up for dry run
93            outputStashCodes = a["StashCodes"]
94            size = 2345 
95
96        if not dry_run:
97            # We can log information at any time to the main log file
98            context.log.info('Written output file: %s' % outputFilePath)
99        else:
100            context.log.debug("Running dry run.")
101
102        # Add the stations file to the outputs
103        self._addFileToOutputs(outputFilePath, 'Output File', size = size)
104
105        if not dry_run:
106            # Finish up by calling function to set status to complete and zip up files etc
107            # In this case we set keep = True so that output file is accessible to downstream process
108            # outside of the main output zip file (so downstream process does not need to unzip it).
109            # This is fine as files are small.
110            process_support.finishProcess(context, self.fileSet, self.startTime, keep = True)
111        else:
112            estimated_duration = 10 # seconds
113            process_support.finishDryRun(context, [], self.fileSet,
114                            estimated_duration, acceptedMessage = 'Dry run complete')           
115
116
117    def _validateInputs(self):
118        """
119        Runs specific checking of arguments and their compatibility.
120        """
121        pass
Note: See TracBrowser for help on using the repository browser.