source: TI03-DataExtractor/trunk/pydxs/OutputManager.py @ 794

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI03-DataExtractor/trunk/pydxs/OutputManager.py@794
Revision 794, 17.9 KB checked in by astephen, 13 years ago (diff)

Unstable but latest version with multi-variable support and split hooks
for CDML and CSML.

Line 
1#   Copyright (C) 2004 CCLRC & NERC( Natural Environment Research Council ).
2#   This software may be distributed under the terms of the
3#   Q Public License, version 1.0 or later. http://ndg.nerc.ac.uk/public_docs/QPublic_license.txt
4
5"""
6OutputManager.py
7================
8
9This module holds the OutputManager class used to generate products
10in the Data Extractor.
11
12"""
13
14# Import required modules
15import os
16import re
17import cdms
18import vcs
19import sys
20import time
21
22# Import package modules including global variables
23from common import *
24from serverConfig import *
25from localRules import *
26from NumericalOperations import *
27from DXDMLHandler import *
28from CDMSDataHandler import *
29from CDMSOutputHandler import *
30from DatasetFormatDecider import *
31from FileNames import *
32from DXErrors import *
33
34# Make sure CDMS automatic bounds generation is set to OFF
35#cdms.setAutoBounds("off")
36
37global ONE_FILE_PER_TIMESTEP   
38ONE_FILE_PER_TIMESTEP=1
39
40
41class OutputManager:
42    """
43    Class to control generation of output variables as selected.
44    Externally called as:
45   
46    x=OutputManager(request)
47    x.getOutputFilePaths()
48    x.createOutputs()
49    """
50
51    def __init__(self, sessionObject):
52        """
53        Takes in request object and sets up processing.
54        """
55        self.bag=sessionObject
56        self.outputDir=checkSubDirectory(self.bag["username"]) 
57        self.DXDML=DXDMLHandler()
58               
59
60    def _getDatasetDetails(self, varCodes):
61        """
62        Returns the datasetURI for a given variable code.
63        """
64        match=re.match("variable_(\d+)\.(\d+)\.(\d+)", varCodes)
65        if not match:
66            raise DXProcessingError, "Cannot match variable code: "+varCodes
67
68        (dsgCode, dsCode, varCode)=intAll(match.groups())
69        dsURICodes="%s.%s" % (dsgCode, dsCode)
70       
71        dsg=self.bag["datasetGroup_%s" % dsgCode]
72        ds=self.bag["dataset_%s.%s" % (dsgCode, dsCode)]       
73       
74        if self.bag.has_key("datasetURI_%s" % dsURICodes):
75            datasetURI=self.bag["datasetURI_%s" % dsURICodes]
76        else:
77            datasetURI=self.DXDML.getDatasetURI(dsg, ds)
78         
79        return (dsg, ds, datasetURI)
80
81
82    def getOutputInfoDict(self):
83        """
84        Returns output file dictionary including output paths, sizes
85        and estimated duration for producing the output.
86        """
87        if self.bag.has_key("numericalOperation"):
88            self._handleNumericalOperation(mode="file names")
89        else:
90            self._loopThroughVariables(mode="file names")
91        return self.varDict
92       
93
94    def getOutputFilePathDict(self):
95        """
96        Returns a dictionary of output files with variable IDs as keys.
97        """
98        dct=self.getOutputInfoDict()
99        pathdict={}
100        for key in dct.keys():
101            varID=dct[key][1]
102            paths=dct[key][4]
103            pathdict[varID]=paths
104        return pathdict
105
106
107    def getOutputSizes(self):
108        """
109        Returns a dictionary of the sizes of output variables with IDs as keys.
110        """
111        dct=self.getOutputInfoDict()
112        sizedict={}
113        for key in dct.keys():
114            varID=dct[key][1]
115            size=dct[key][2]
116            sizedict[varID]=size
117        return sizedict
118       
119
120    def getOutputDurationEstimates(self):
121        """
122        Returns a dictionary of the estimated durations to produce the requested
123        outputs, with variable IDs as keys.
124        """
125        dct=self.getOutputInfoDict()
126        durationdict={}
127        for key in dct.keys():
128            varID=dct[key][1]
129            size=dct[key][2]
130            format=dct[key][3]
131            multiplier=1
132            if format=="NASA Ames":
133                multiplier=3
134            durationdict[varID]=size*TIMING_SCALE_FACTOR*multiplier
135        return durationdict
136
137
138    def createOutputs(self):
139        """
140        Calls the appropriate methods to create output files.
141        """
142        if self.bag.has_key("numericalOperation"):
143            self._handleNumericalOperation(mode="create outputs")
144        else:
145            self._loopThroughVariables(mode="create outputs")
146
147
148    def _adjustFileSizeByFormat(self, size, outputFormat):
149        """
150        Returns the size of an output file adjusted according to the format used.
151        """
152        if outputFormat=="NASA Ames":   
153            size=size*5 
154        return size
155
156           
157    def _loopThroughVariables(self, mode):
158        """
159        Loop through the selected variables to generate the output
160        file paths or data.
161        """
162        varKeys=getSortedKeysLike(self.bag, "variable_")
163        self.varDict={}
164
165        varCount=0
166        for varKey in varKeys:
167            varCodes=varKey.split("_")[-1]
168            varID=self.bag[varKey]
169           
170            (datasetGroup, dataset, datasetURI)=self._getDatasetDetails(varKey)
171
172            dataFileHandler=DatasetFormatDecider(datasetGroup, dataset, datasetURI).datasetFormat
173            outputFormat=self.bag["outputFormat_%s" % varCodes]
174            print dataFileHandler
175           
176            axisSelectionDict=getDictSubsetMatching(self.bag, "axis_%s" % varCodes)
177
178            timeStepStringList=dataFileHandler.getSelectedTimeSteps(datasetURI, varID, axisSelectionDict)
179                   
180            sizeOfRequest=dataFileHandler.getSelectedVariableSubsetSize(datasetURI, varID, axisSelectionDict)
181            sizeOfRequest=self._adjustFileSizeByFormat(sizeOfRequest, outputFormat)
182           
183                   
184            if sizeOfRequest>(MAX_FILE_SIZE*(2**20)) or ONE_FILE_PER_TIMESTEP==1:
185                   
186                print "\n\n\n", (MAX_FILE_SIZE*(2**20)), sizeOfRequest, ONE_FILE_PER_TIMESTEP
187                if mode=="file names":
188                    fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=timeStepStringList, 
189                                    fileFormat=outputFormat, variables=[varID],
190                                    basedir=self.outputDir)
191                    outputFilePathList=fileNamer.createFileNameList()
192               
193                elif mode=="create outputs":
194                    # Now get the real data
195                    outputHandler=self._getOutputHandler(outputFormat, dataFileHandler)
196                   
197                    outputFilePathList=[]
198                    for timeStep in timeStepStringList:
199                        fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=[timeStep], 
200                                    fileFormat=outputFormat, variables=[varID],
201                                    basedir=self.outputDir)
202                        outputFilePath=fileNamer.createFileNameList()[0]
203                        outputFilePathList.append(outputFilePath)
204                                               
205                        data=dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict, timeStep)
206                       
207                        outputFileHandler=outputHandler(outputFilePath)
208                        globalAttributes=dataFileHandler.getCFGlobalAttributes(datasetURI)
209                        outputFileHandler.writeVariableAndGlobalAttributes(data, globalAttributes)
210                        outputFileHandler.closeFile()
211                        print "\nWrote variable '%s' to output file: %s" % (varID, outputFilePath)             
212                       
213            else:
214                if len(timeStepStringList)==0:
215                    timeStepStringList=[]
216                elif len(timeStepStringList)==1:
217                    timeStepStringList=timeStepStringList
218                else:
219                    timeStepStringList=["%s-%s" % (timeStepStringList[0], timeStepStringList[-1])]
220                           
221                print "Work out file name as only one file..."
222                fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=timeStepStringList, 
223                                    fileFormat=outputFormat, variables=[varID],
224                                    basedir=self.outputDir)
225                outputFilePathList=fileNamer.createFileNameList()       
226               
227                if mode=="create outputs":
228                    # Now get the real data
229                    outputHandler=self._getOutputHandler(outputFormat, dataFileHandler)         
230                    outputFilePath=outputFilePathList[0]
231                    data=dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict)
232                       
233                    outputFileHandler=outputHandler(outputFilePath)
234                    globalAttributes=dataFileHandler.getCFGlobalAttributes(datasetURI)
235                    outputFileHandler.writeVariableAndGlobalAttributes(data, globalAttributes)
236                    outputFileHandler.closeFile()
237                    print "\nWrote variable '%s' to output file: %s" % (varID, outputFilePath)             
238                   
239   
240            self.varDict[varCount]=(datasetURI, varID, sizeOfRequest, outputFormat, outputFilePathList[:])
241            varCount=varCount+1
242
243
244    def _getOutputHandler(self, outputFormat, dataHandler):
245        """
246        Get output handler.
247        """
248        if outputFormat not in OUTPUT_FORMATS:
249            raise DXOptionHandlingError, "Output format '%s' is not supported." % outputFormat
250           
251        if isinstance(dataHandler, CDMSDataHandler):
252            if outputFormat=="NetCDF":     
253                outputHandler=CDMSOutputHandler
254            elif outputFormat=="NASA Ames":
255                outputHandler==NASAAmesOutputHandler
256        elif isinstance(dataHandler, CSMLDataHandler):
257            raise "Not implemented yet."
258           
259        return outputHandler
260       
261
262    def _handleNumericalOperation(self, mode):
263        """
264        If a numerical operation is defined then don't loop through
265        variables, only output one calculated variable to keep simple.
266        """
267        print "Performing a numerical operation, only one output variable is supported when this option is selected...\n\n"
268        opController=NumericalOperations(self.bag["numericalOperation"])
269        usedVariableIndices=opController.variableIndices
270        opMethod=opController.opMethod.__name__
271       
272        varKeys=getSortedKeysLike(self.bag, "variable_")
273       
274        usedVars=[]
275        indx=0
276        for var in varKeys:
277            if indx in usedVariableIndices:
278                usedVars.append(var)
279               
280        self.varDict={}
281        varGrids=[]
282        arrayShapes=[]
283        arraySizes=[]
284        fileVarSelectorList=[]
285
286        outputFilePathList=None
287        for varKey in usedVars:
288            varCodes=varKey.split("_")[-1]
289            varID=self.bag[varKey]
290           
291            (datasetGroup, dataset, datasetURI)=self._getDatasetDetails(varKey)
292            dataFileHandler=DatasetFormatDecider(datasetGroup, dataset, datasetURI).datasetFormat
293            outputFormat=self.bag["outputFormat_%s" % varCodes]
294           
295            if outputFilePathList==None: 
296                outputFileName="%s_%s.%s" % (opMethod, os.getpid(), outputFormat)           
297                outputFilePathList=[os.path.join(self.outputDir, outputFileName)]
298           
299            if not isinstance(dataFileHandler, CDMSDataHandler):
300                raise DXOptionHandlingError, """The DX can currently only perform numerical operations on CDMS-type variables.
301                    You have selected a non-CDMS variable: '%s'.""" % (varID)
302           
303            axisSelectionDict=getDictSubsetMatching(self.bag, "axis_%s" % varCodes)
304            fileVarSelectorList.append([datasetURI, varID, axisSelectionDict])
305
306            (arrayShape, gridShape, size)=dataFileHandler.getSelectedVariableArrayDetails(datasetURI, varID, axisSelectionDict)
307            print gridShape, arrayShape
308            varGrids.append(gridShape)
309            arrayShapes.append(arrayShape)
310            arraySizes.append(size)
311
312        print "Now compare grid shapes of variables..."
313        lenGrids=len(varGrids)
314        noneList=lenGrids*[None]
315
316        if noneList==varGrids:
317            gridsPresent="no"
318        elif None in varGrids:
319            raise DXOptionHandlingError, "Variable axes are different and cannot therefore be used in a mathematical operation."
320        else:
321            gridsPresent="yes"
322
323        refshape=arrayShapes[0]
324        for shape in arrayShapes[1:]:
325   
326            if len(shape)!=len(refshape):
327                raise DXOptionHandlingError, "Shapes of chosen variables are incompatible for mathematical operation."
328            if gridsPresent=="no":
329                if shape!=refshape:
330                    raise DXOptionHandlingError, "Axis lengths of chosen variables are different and cannot therefore be used in a mathematical operation."
331                elif gridsPresent=="yes":
332                    if shape[:-2]!=refshape[:-2]:
333                        raise DXOptionHandlingError, "Axis lengths of chosen variables are different and cannot therefore be used in a mathematical operation."
334
335        outputSize=arraySizes[0]
336        for sz in arraySizes[1:]:
337            if sz<outputSize:
338                outputSize=sz
339
340        if mode=="create outputs":
341            varsToUse=[]
342            print "Fetch each var from file..."
343            for (datasetURI, varID, axisSelectionDict) in fileVarSelectorList:
344                varsToUse.append(dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict))
345               
346            opController.performOperation(fileVarSelectorList)
347
348        self.varDict[0]=(datasetURI, opMethod, outputSize, outputFormat, outputFilePathList[:])
349
350
351"""
352            timeStepStringList=dataFileHandler.getSelectedTimeSteps(datasetURI, varID, axisSelectionDict)
353                   
354            sizeOfRequest=dataFileHandler.getSelectedVariableSubsetSize(datasetURI, varID, axisSelectionDict)
355            sizeOfRequest=self._adjustFileSizeByFormat(sizeOfRequest, outputFormat)
356           
357           
358                   
359            if sizeOfRequest>(MAX_FILE_SIZE*(2**20)) or ONE_FILE_PER_TIMESTEP==1:
360                   
361                print "\n\n\n", (MAX_FILE_SIZE*(2**20)), sizeOfRequest, ONE_FILE_PER_TIMESTEP
362                if mode=="file names":
363                    fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=timeStepStringList,
364                                    fileFormat=outputFormat, variables=[varID],
365                                    basedir=self.outputDir)
366                    outputFilePathList=fileNamer.createFileNameList()
367               
368                elif mode=="create outputs":
369                    # Now get the real data
370                    outputHandler=self._getOutputHandler(outputFormat, dataFileHandler)
371                   
372                    outputFilePathList=[]
373                    for timeStep in timeStepStringList:
374                        fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=[timeStep],
375                                    fileFormat=outputFormat, variables=[varID],
376                                    basedir=self.outputDir)
377                        outputFilePath=fileNamer.createFileNameList()[0]
378                        outputFilePathList.append(outputFilePath)
379                                               
380                        data=dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict, timeStep)
381                       
382                        outputFileHandler=outputHandler(outputFilePath)
383                        globalAttributes=dataFileHandler.getCFGlobalAttributes(datasetURI)
384                        outputFileHandler.writeVariableAndGlobalAttributes(data, globalAttributes)
385                        outputFileHandler.closeFile()
386                        print "\nWrote variable '%s' to output file: %s" % (varID, outputFilePath)             
387                       
388            else:
389                if len(timeStepStringList)==0:
390                    timeStepStringList=[]
391                elif len(timeStepStringList)==1:
392                    timeStepStringList=timeStepStringList
393                else:
394                    timeStepStringList=["%s-%s" % (timeStepStringList[0], timeStepStringList[-1])]
395                           
396                print "Work out file name as only one file..."
397                fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=timeStepStringList,
398                                    fileFormat=outputFormat, variables=[varID],
399                                    basedir=self.outputDir)
400                outputFilePathList=fileNamer.createFileNameList()       
401               
402                if mode=="create outputs":
403                    # Now get the real data
404                    outputHandler=self._getOutputHandler(outputFormat, dataFileHandler)         
405                    outputFilePath=outputFilePathList[0]
406                    data=dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict)
407                       
408                    outputFileHandler=outputHandler(outputFilePath)
409                    globalAttributes=dataFileHandler.getCFGlobalAttributes(datasetURI)
410                    outputFileHandler.writeVariableAndGlobalAttributes(data, globalAttributes)
411                    outputFileHandler.closeFile()
412                    print "\nWrote variable '%s' to output file: %s" % (varID, outputFilePath)             
413                   
414   
415            self.varDict[varCount]=(datasetURI, varID, sizeOfRequest, outputFormat, outputFilePathList[:])
416       
417"""
418
419
420if __name__=="__main__":
421    print "Setting ONE_FILE_PER_TIMESTEP=1"
422 
423    ONE_FILE_PER_TIMESTEP=1
424    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
425                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
426                     "outputFormat_1.3.1":"NetCDF"})
427    print x.getOutputFilePathDict()   
428                 
429    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
430                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
431                     "axis_1.3.1.0":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
432                     "outputFormat_1.3.1":"NetCDF",
433                     "datasetGroup_2":"Test Data Group 2",
434                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
435                     "axis_2.1.1.0":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
436                     "outputFormat_2.1.1":"NetCDF"})
437
438    print x.getOutputFilePathDict()   
439   
440    print "\n\n\n"
441    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
442                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
443                     "axis_1.3.1.0":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
444                     "outputFormat_1.3.1":"NetCDF",
445                     "datasetGroup_2":"Test Data Group 2",
446                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
447                     "axis_2.1.1.0":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
448                     "axis_2.1.1.1":(30,-30),
449                     "outputFormat_2.1.1":"NetCDF"})
450
451    x.createOutputs()       
452   
453    print "Setting ONE_FILE_PER_TIMESTEP=0"
454    ONE_FILE_PER_TIMESTEP=0
455    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
456                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
457                     "axis_1.3.1.0":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
458                     "outputFormat_1.3.1":"NetCDF",
459                     "datasetGroup_2":"Test Data Group 2",
460                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
461                     "axis_2.1.1.0":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
462                     "axis_2.1.1.1":(30,-30),
463                     "outputFormat_2.1.1":"NetCDF"})
464    print x.getOutputFilePathDict()       
465   
466    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
467                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
468                     "axis_1.3.1.0":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
469                     "outputFormat_1.3.1":"NetCDF",
470                     "datasetGroup_2":"Test Data Group 2",
471                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
472                     "axis_2.1.1.0":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
473                     "axis_2.1.1.1":(30,-30),
474                     "outputFormat_2.1.1":"NetCDF"})
475    x.createOutputs()   
476    print x.getOutputFilePathDict() 
477    print x.getOutputInfoDict()
478    print x.getOutputDurationEstimates()
479    print x.getOutputSizes()
480   
481    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
482                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
483                     "axis_1.3.1.0":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
484                     "outputFormat_1.3.1":"NetCDF",
485                     "datasetGroup_2":"Test Data Group 2",
486                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
487                     "axis_2.1.1.0":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
488                     "axis_2.1.1.1":(30,-30),
489                     "outputFormat_2.1.1":"NetCDF",
490                     "numericalOperation":"(variable2)-(variable1)"})   
491   
Note: See TracBrowser for help on using the repository browser.