source: cows_wps/trunk/process_modules/process_base.py @ 7063

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/process_modules/process_base.py@7063
Revision 7063, 5.7 KB checked in by astephen, 10 years ago (diff)

Added process_base.py to use as optional base module/class.

Line 
1"""
2process_base.py
3===============
4
5Holds "ProcessBase" class that is optionally useful as a base class
6for processes.
7
8"""
9
10import os, stat, time
11import sys
12import logging
13
14from cows_wps.process_handler.fileset import FileSet, FLAG
15import cows_wps.process_handler.process_support as process_support
16from cows_wps.process_handler.context.process_status import STATUS
17
18log = logging.getLogger(__name__)
19log.setLevel(logging.DEBUG)
20
21
22class ProcessBase(object):
23
24    # Define arguments that we need to set from inputs
25    args_to_set = ["arg1", "arg2"]
26
27    # Define defaults for arguments that might not be set
28    input_arg_defaults = {"arg1": [],
29                         }
30
31    # Define a dictionary for arguments that need to be processed
32    # before they are set (with values as the function doing the processing).
33    arg_processers = {"arg2": int}
34
35   def _executeProc(self, context, dry_run):
36        """
37        This is called to step through the various parts of the process
38        executing the actual process if ``dry_run`` is False and just
39        returning information on the volume and duration of the outputs
40        if ``dry_run`` is True.
41
42        PLEASE NOTE: This is not a real process. Just an example to copy!
43        """
44        # Call standard _setup
45        self._setup(context)
46
47        if not dry_run:
48            # Now set status to started
49            context.setStatus(STATUS.STARTED, 'Job is now running', 0)
50
51        # Add output file
52        aFile = 'f.txt'
53        aFilePath = os.path.join(context.processDir, "outputs", aFile)
54
55        if not dry_run:
56            # Call code to run process
57            a = self.args
58            aList = " ".join([str(i) for i in self.items()])
59            fout = open(aFilePath, "w")
60            fout.write(str(aList))
61            fout.close()
62
63            context.log.info("Wrote: %s" % aFilePath)
64       
65            # Add the stations list to the XML output section: ProcessSpecificContent
66            context.outputs['ProcessSpecificContent'] = {"ProcessBaseOutputs": " ".join(aList)} 
67
68        # Add the stations file to the outputs
69        if not dry_run:
70            self._addFileToOutputs(aFilePath, 'An output file')
71        else:
72            estimated_size = 100
73            self._addFileToOutputs(aFilePath, 'An output file', size = estimated_size)
74
75        if not dry_run:
76            # Finish up by calling function to set status to complete and zip up files etc
77            process_support.finishProcess(context, self.fileSet, self.startTime)
78        else:
79            estimated_duration = 10 # seconds
80            process_support.finishDryRun(context, [], self.fileSet,
81                            estimated_duration, acceptedMessage = 'Dry run complete')           
82
83
84    def __call__(self, context):
85        """
86        This is where the WPS controller calls the process.
87        If this process is async we actually pass both the
88        __call__ and dryRun methods through the same _executeProc
89        method which does the same for each except that for
90        dry_run = True it will not produce any outputs.
91        """
92
93        return self._executeProc(context, dry_run = False)
94
95
96    def dryRun(self, context):
97        """
98        This is where the WPS controller calls the process for
99        an estimate of the volume and duration of the outputs.
100
101        If this process is async we actually pass both the
102        __call__ and dryRun methods through the same _executeProc
103        method which does the same for each except that for
104        dry_run = True it will not produce any outputs.
105        """
106
107        return self._executeProc(context, dry_run = True)
108
109
110    def _setup(self, context):
111        """
112        Common setup actions.
113        """
114        # Always record start time for duration in outputs
115        self.startTime = time.time()
116
117        # Always need a FileSet, even if empty
118        self.fileSet = context.outputs['FileSet'] = FileSet()
119
120        # parse the inputs and set dictionary: self.args
121        self._parseInputs(context.inputs)
122
123        # Check inputs are compatible
124        self._validateInputs()
125
126
127    def _parseInputs(self, inputs):
128        """
129        Parse the inputs into an instance dictionary and set defaults where required.
130        """
131        self.args = {}
132       
133        for key in self.args_to_set:
134            if key in self.input_arg_defaults.keys():
135                deft = self.input_arg_defaults[key]
136                value = inputs.get(key, deft)
137
138            elif key not in inputs.keys():
139                raise KeyError("Must provide argument '%s' when calling '%s'." % (key, self.__class__.__name__))
140         
141            else:
142                value = inputs[key]
143
144            # Process arg if required
145            if key in self.arg_processers.keys():
146                value = apply(self.arg_processers[key], [value])
147
148            self.args[key] = value
149
150        return
151
152
153    def _validateInputs(self):
154        """
155        Runs specific checking of arguments and their compatibility.
156        """
157        if self.args["Counties"] == [] and self.args["BBox"] == None:
158            raise Exception("Invalid arguments provided. Must provide either a geographical bounding box or a list of counties.")
159
160
161    def _addFileToOutputs(self, fpath, info = 'An output file.', file_flag = FLAG.DATA, size = None):
162        """
163        Adds the file to the fileSet outputs. If ``size`` is set (as in
164        a dry-run) then use it, otherwise get real size of file.
165        """
166        if size == None:
167            size = os.stat(fpath)[stat.ST_SIZE]
168
169        fname = os.path.split(fpath)[-1]
170        self.fileSet.contents.append(FileSet(file_flag, fname, size, info))
Note: See TracBrowser for help on using the repository browser.