source: cows_wps/trunk/cows_wps/process_handler/ddp_process_support.py @ 5615

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/process_handler/ddp_process_support.py@5615
Revision 5615, 8.4 KB checked in by spascoe, 11 years ago (diff)

COWS WPS package copied from
 http://proj.badc.rl.ac.uk/svn/dcip/cows-wps/trunk.

This is a stripped down version of the DDP WPS. Some features are
removed and others have been deactivated until we reimplement them in a
more generic way.

Line 
1import time
2import sys
3import os
4import stat
5import logging
6from datetime import datetime
7
8from cows_wps.utils import metadata_utils
9from cows_wps.process_handler.fileset import FileSet, zipFileSet, dryRunZipFileSet, FLAG
10from cows_wps.process_handler.context.process_status import STATUS
11
12import cows_wps.utils.ukcp09_copyright_file_utils as ukcp09_copyright_file_utils
13
14log = logging.getLogger(__name__)
15
16def finishProcess(context, requestMetadata, fileSet, startTime, 
17                  finalMessage='The End', keep=False):
18    """
19    Performs the common finishing up tasks for a processes __call__ method.
20    """
21   
22    writeMetadataFile(context, requestMetadata)
23    processOutputs(context, fileSet, dryRun=False, keep=keep)
24    context.setStatus(STATUS.COMPLETED, finalMessage, 100)
25    completionTime = time.time()   
26    updateJobDetailsAfterCompletion(context, startTime, completionTime)
27
28def finishDryRun(context, requestMetadata, fileSet, duration, 
29                 acceptedMessage="Process execution request has been accepted"):
30    """
31    Performs the common finishing up tasks for a processes dryRun method.
32    """
33
34    _addCopyrightFileToDryrunFileset(fileSet)
35    writeMetadataFile(context, requestMetadata, dryRun=True)
36    processOutputs(context, fileSet, dryRun=True)
37    context.setStatus(STATUS.ACCEPTED, acceptedMessage , 0)
38    updateJobDetailsAfterDryRun(context, duration)
39
40def makeMetadataFileItems(context, excludedMetadataInputs, cachable):
41    """
42    Creates the metadata items for a request and adds the 'ReUseURL' item on
43    the end. If cachable is False the value of the 'ReUseURL item will be a
44    warning.
45    """
46       
47    metadataItems = makeMetadataItemsFromInputs(context, excludedMetadataInputs)
48           
49    return metadataItems
50
51def makeMetadataItemsFromInputs(context, excludedMetadataInputs):
52    """
53    Creates a list of (name, value) metadata items from the request's inputs.
54    If an input item appears in the excludedMetadataInputs list then it will
55    not be use to create a metadata item.
56    """
57    inputs = context.getCopyOfInputs()
58   
59    metadataList = metadata_utils.constructMetadata(context, 
60                                                    excludedMetadataInputs,
61                                                    rt_type='list', 
62                                                    no_commas=False, 
63                                                    truncate=True,
64                                                    **inputs)
65    return metadataList
66
67   
68#----------------------------------------------------------------------------
69# This code would move to a common DDP module as it is used for multiple DDP processes.
70
71def writeMetadataFile(context, metadataItems, fileprefix='metadata', format='xml', dryRun=False):
72    """
73    Writes the given metadata items to a metadata file, either as 'xml' or plain text. Also
74    appends a FileSet object corresponding to the metadata file to the context['FileSet'] object.
75   
76    if dryRun=True then no file will be written but and an estimated file size will be used to
77    create the FileSet object.
78   
79    @param metadataItems: A sequence of (key, value) pairs.
80   
81    """
82    context.log.info("Writing metadata items: %s" % metadataItems)
83
84    metadata_file = '%s.%s' % (fileprefix, format)
85    metadata_path = os.path.join(context.outputDir, metadata_file)
86   
87    lines = []
88   
89    if format == "xml":
90        lines.append("<UKCP09RequestDetails>\r\n")
91    else:
92        lines.append("UKCP09 Request Details:\r\n")
93
94    # These mapping should not be here but factoring them out would take time
95    # So, at least until launch they can stay here
96    # They are not identical to the ddp.utils.terminology_mapper stuff
97    key_map = {"TimeSlices": "TimePeriods",
98               "MeaningPeriod": "TemporalAverages",
99               "LocationType": "SpatialAverage"}
100   
101    if metadataItems != None:
102        for (key, value) in metadataItems:
103
104            # Test if key should be mapped
105            if key in key_map.keys():
106                key = key_map[key]
107
108            if type(value) != type("string"): value = str(value)
109            value = value.replace(",", "")
110   
111            if format == "xml":
112                lines.append("    <%s>%s</%s>\r\n" % (key,value,key))
113            else:
114                lines.append("%s = %s\r\n" % (key, value))
115
116    if format == "xml":
117        lines.append("</UKCP09RequestDetails>\r\n")
118
119   
120    if not dryRun:
121        fout = open(metadata_path, 'w') 
122        fout.writelines(lines)
123        fout.close()
124           
125    fileSet = context.outputs['FileSet']
126   
127    if dryRun:
128        filesize = sum([len(s) for s in lines])       
129    else:
130        filesize = os.stat(metadata_path)[stat.ST_SIZE]
131       
132    fileSet.contents.append(FileSet(FLAG.DATA, metadata_file, filesize, 'metadata file'))
133
134def processOutputs(context, fileSet, dryRun=False, keep=False):
135    """
136    Decide whether to zip the contents of I{context.outputDir} and create
137    a FileSet object representing the final output.
138   
139    """
140   
141    maxRequestVolume = float(context.config['max_request_volume'])
142    maxFileVolume = float(context.config['max_file_volume'])
143   
144#    log.debug("Fileset size before zipping = %s" % (sum([e.size for e in fileSet.contents]),) )
145#    log.debug("maxRequestVolume = %s" % (maxRequestVolume,))
146   
147    if dryRun:       
148        fileSet = dryRunZipFileSet(fileSet, context, maxFileVolume)
149       
150        #check that the fileset isn't too big
151        totalSize = sum([e.size for e in fileSet.contents])
152#        log.debug("Fileset size after zipping = %s" % (totalSize,))
153       
154        if totalSize > maxRequestVolume:
155           
156            context.log.error("totalSize = %s, maxRequestVolume = %s" , totalSize, maxRequestVolume )
157            raise Exception("Your request exceeds the maximum volume limit. Please try submitting smaller requests.")
158       
159    else:
160        fileSet = zipFileSet(fileSet, context, maxFileVolume, keep=keep)
161   
162    return fileSet
163
164def updateJobDetailsAfterCompletion(context, startTime, completionTime):
165    "Updates the job details to reflect state of the job after execution"
166   
167    _updateJobStatus(context)
168    _updateJobVolume(context)
169   
170    duration = completionTime - startTime
171    _setJobDuration(context, duration)
172   
173    _setJobCompletionTime(context, completionTime)
174   
175    _setDownloadCapability(context)
176   
177def updateJobDetailsAfterDryRun(context, duration):
178    "Updates the job details with the results of the dry run"
179   
180    _updateJobStatus(context)
181    _updateJobVolume(context)
182    _setJobDuration(context, duration)
183    _setDownloadCapability(context)
184           
185def _setDownloadCapability(context):
186   
187    jobDetails = context.outputs['job_details']
188    fileSet = context.outputs['FileSet']
189
190    if len(fileSet.contents) == 1:
191        if jobDetails.get('job_capabilities') in [None, ""]:
192            jobDetails['job_capabilities'] = 'download'
193        else:
194            jobDetails['job_capabilities'] += ' download'
195           
196def _updateJobStatus(context):
197    "Updates the jobDetails 'status' entry using the context object"
198   
199    jobDetails = context.outputs['job_details']
200    jobDetails['status'] = [context.status, context.statusMessage, context.percentComplete]
201   
202def _updateJobVolume(context):
203    "Updates the JobVolume by examining the current fileset in context.outputs"
204   
205    jobDetails = context.outputs['job_details']
206    fileSet = context.outputs['FileSet']
207   
208    total_size = 0
209    for file in fileSet.contents:
210        total_size += file.size
211       
212    jobDetails["job_volume"] = total_size
213
214def _setJobDuration(context, duration):
215    "Sets the 'duration' value in the jobDetails dictionary to the value given"
216   
217    jobDetails = context.outputs['job_details']
218    jobDetails["duration"] = duration
219
220def _setJobCompletionTime(context, completionTime):
221    """
222    Sets the jobDetails 'completion_time' and 'completion_date_time' based on
223    the timestamp given.
224    """
225    jobDetails = context.outputs['job_details']
226    jobDetails['completion_time'] = completionTime
227   
228    completionDateTime = datetime.fromtimestamp(completionTime)
229    jobDetails['completion_date_time'] = completionDateTime.isoformat(' ')
230   
231def _addCopyrightFileToDryrunFileset(fileSet):
232   
233    (copyr_fname, copyr_fsize, copyr_finfo) = ukcp09_copyright_file_utils.getCopyrightFileDetailsForDryRun()
234    fileSet.contents.append(FileSet(FLAG.DATA, copyr_fname, copyr_fsize, copyr_finfo))   
235   
Note: See TracBrowser for help on using the repository browser.