source: TI03-DataExtractor/trunk/pydxs/OutputManager.py @ 1109

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI03-DataExtractor/trunk/pydxs/OutputManager.py@1109
Revision 1109, 15.8 KB checked in by astephen, 13 years ago (diff)

Stable-ish version with fully-ish working dxc client.

Line 
1#   Copyright (C) 2004 CCLRC & NERC( Natural Environment Research Council ).
2#   This software may be distributed under the terms of the
3#   Q Public License, version 1.0 or later. http://ndg.nerc.ac.uk/public_docs/QPublic_license.txt
4
5"""
6OutputManager.py
7================
8
9This module holds the OutputManager class used to generate products
10in the Data Extractor.
11
12"""
13
14# Import required modules
15import os
16import re
17import cdms
18import vcs
19import sys
20import time
21
22# Import package modules including global variables
23from common import *
24from serverConfig import *
25from localRules import *
26from NumericalOperations import *
27from DXDMLHandler import *
28from CDMSDataHandler import *
29from CDMSOutputHandler import *
30from DatasetFormatDecider import *
31from FileNames import *
32from DXErrors import *
33
34# Make sure CDMS automatic bounds generation is set to OFF
35#cdms.setAutoBounds("off")
36
37global ONE_FILE_PER_TIMESTEP   
38ONE_FILE_PER_TIMESTEP=1
39
40
41class OutputManager:
42    """
43    Class to control generation of output variables as selected.
44    Externally called as:
45   
46    x=OutputManager(request)
47    x.getOutputFilePaths()
48    x.createOutputs()
49    """
50
51    def __init__(self, sessionObject):
52        """
53        Takes in request object and sets up processing.
54        """
55        self.bag=sessionObject
56        self.outputDir=checkSubDirectory(self.bag["username"]) 
57        self.DXDML=DXDMLHandler()
58        self.varDict={}
59                       
60
61    def _getDatasetDetails(self, varCodes):
62        """
63        Returns the datasetURI for a given variable code.
64        """
65        match=re.match("variable_(\d+)\.(\d+)\.(\d+)", varCodes)
66        if not match:
67            raise DXProcessingError, "Cannot match variable code: "+varCodes
68
69        (dsgCode, dsCode, varCode)=intAll(match.groups())
70        dsURICodes="%s.%s" % (dsgCode, dsCode)
71       
72        dsg=self.bag["datasetGroup_%s" % dsgCode]
73        ds=self.bag["dataset_%s.%s" % (dsgCode, dsCode)]       
74       
75        if self.bag.has_key("datasetURI_%s" % dsURICodes):
76            datasetURI=self.bag["datasetURI_%s" % dsURICodes]
77        else:
78            datasetURI=self.DXDML.getDatasetURI(dsg, ds)
79         
80        return (dsg, ds, datasetURI)
81
82
83    def getOutputInfoDict(self):
84        """
85        Returns output file dictionary including output paths, sizes
86        and estimated duration for producing the output.
87        """
88        if self.varDict!={}:
89            return self.varDict
90        if self.bag.has_key("numericalOperation"):
91            self._handleNumericalOperation(mode="file names")
92        else:
93            self._loopThroughVariables(mode="file names")
94        return self.varDict
95       
96
97    def getOutputFilePathDict(self):
98        """
99        Returns a dictionary of output files with variable IDs as keys.
100        """
101        dct=self.getOutputInfoDict()
102        pathdict={}
103        for key in dct.keys():
104            varID=dct[key][1]
105            paths=dct[key][4]
106            pathdict[varID]=paths
107        return pathdict
108
109
110    def getOutputSizes(self):
111        """
112        Returns a dictionary of the sizes of output variables with IDs as keys.
113        """
114        dct=self.getOutputInfoDict()
115        sizedict={}
116        for key in dct.keys():
117            varID=dct[key][1]
118            size=dct[key][2]
119            sizedict[varID]=size
120        return sizedict
121       
122
123    def getOutputDurationEstimates(self):
124        """
125        Returns a dictionary of the estimated durations to produce the requested
126        outputs, with variable IDs as keys.
127        """
128        dct=self.getOutputInfoDict()
129        durationdict={}
130        for key in dct.keys():
131            varID=dct[key][1]
132            size=dct[key][2]
133            format=dct[key][3]
134            multiplier=1
135            if format=="NASA Ames":
136                multiplier=3
137            durationdict[varID]=size*TIMING_SCALE_FACTOR*multiplier
138        return durationdict
139
140
141    def createOutputs(self):
142        """
143        Calls the appropriate methods to create output files.
144        """
145        if self.bag.has_key("numericalOperation"):
146            self._handleNumericalOperation(mode="create outputs")
147        else:
148            self._loopThroughVariables(mode="create outputs")
149
150
151    def _adjustFileSizeByFormat(self, size, outputFormat):
152        """
153        Returns the size of an output file adjusted according to the format used.
154        """
155        if outputFormat=="NASA Ames":   
156            size=size*5 
157        return size
158
159           
160    def _loopThroughVariables(self, mode):
161        """
162        Loop through the selected variables to generate the output
163        file paths or data.
164        """
165        varKeys=getSortedKeysLike(self.bag, "variable_")
166        self.varDict={}
167
168        varCount=0
169        for varKey in varKeys:
170            varCodes=varKey.split("_")[-1]
171            varID=self.bag[varKey]
172           
173            (datasetGroup, dataset, datasetURI)=self._getDatasetDetails(varKey)
174
175            dataFileHandler=DatasetFormatDecider(datasetGroup, dataset, datasetURI).datasetFormat
176            outputFormat=self.bag["outputFormat_%s" % varCodes]
177            #print dataFileHandler
178           
179            axisSelectionDict=getDictSubsetMatching(self.bag, "axis_%s" % varCodes)
180
181            timeStepStringList=dataFileHandler.getSelectedTimeSteps(datasetURI, varID, axisSelectionDict)
182           
183            sizeOfRequest=dataFileHandler.getSelectedVariableSubsetSize(datasetURI, varID, axisSelectionDict)
184            sizeOfRequest=self._adjustFileSizeByFormat(sizeOfRequest, outputFormat)
185           
186                   
187            if sizeOfRequest>(MAX_FILE_SIZE*(2**20)) or ONE_FILE_PER_TIMESTEP==1:
188                   
189                print "\n\n\n", (MAX_FILE_SIZE*(2**20)), sizeOfRequest, ONE_FILE_PER_TIMESTEP
190                if mode=="file names":
191                    fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=timeStepStringList, 
192                                    fileFormat=outputFormat, variables=[varID],
193                                    basedir=self.outputDir)
194                    outputFilePathList=fileNamer.createFileNameList()
195               
196                elif mode=="create outputs":
197                    # Now get the real data
198                    outputHandler=self._getOutputHandler(outputFormat, dataFileHandler)
199                   
200                    outputFilePathList=[]
201                    for timeStep in timeStepStringList:
202                        fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=[timeStep], 
203                                    fileFormat=outputFormat, variables=[varID],
204                                    basedir=self.outputDir)
205                        outputFilePath=fileNamer.createFileNameList()[0]
206                        outputFilePathList.append(outputFilePath)
207                                               
208                        data=dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict, timeStep)
209                       
210                        outputFileHandler=outputHandler(outputFilePath)
211                        globalAttributes=dataFileHandler.getCFGlobalAttributes(datasetURI)
212                        outputFileHandler.writeVariableAndGlobalAttributes(data, globalAttributes)
213                        outputFileHandler.closeFile()
214                        print "\nWrote variable '%s' to output file: %s" % (varID, outputFilePath)             
215                       
216            else:
217                if len(timeStepStringList)==0:
218                    timeStepStringList=[]
219                elif len(timeStepStringList)==1:
220                    timeStepStringList=timeStepStringList
221                else:
222                    timeStepStringList=["%s-%s" % (timeStepStringList[0], timeStepStringList[-1])]
223                           
224                print "Work out file name as only one file..."
225                fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=timeStepStringList, 
226                                    fileFormat=outputFormat, variables=[varID],
227                                    basedir=self.outputDir)
228                outputFilePathList=fileNamer.createFileNameList()       
229               
230                if mode=="create outputs":
231                    # Now get the real data
232                    outputHandler=self._getOutputHandler(outputFormat, dataFileHandler)         
233                    outputFilePath=outputFilePathList[0]
234                    data=dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict)
235                       
236                    outputFileHandler=outputHandler(outputFilePath)
237                    globalAttributes=dataFileHandler.getCFGlobalAttributes(datasetURI)
238                    outputFileHandler.writeVariableAndGlobalAttributes(data, globalAttributes)
239                    outputFileHandler.closeFile()
240                    print "\nWrote variable '%s' to output file: %s" % (varID, outputFilePath)             
241                   
242   
243            self.varDict[varCount]=(datasetURI, varID, sizeOfRequest, outputFormat, outputFilePathList[:])
244            varCount=varCount+1
245
246
247    def _getOutputHandler(self, outputFormat, dataHandler):
248        """
249        Get output handler.
250        """
251        if outputFormat not in OUTPUT_FORMATS:
252            raise DXOptionHandlingError, "Output format '%s' is not supported." % outputFormat
253           
254        if isinstance(dataHandler, CDMSDataHandler):
255            if outputFormat=="NetCDF":     
256                outputHandler=CDMSOutputHandler
257            elif outputFormat=="NASA Ames":
258                outputHandler==NASAAmesOutputHandler
259        elif isinstance(dataHandler, CSMLDataHandler):
260            raise "Not implemented yet."
261           
262        return outputHandler
263       
264
265    def _handleNumericalOperation(self, mode):
266        """
267        If a numerical operation is defined then don't loop through
268        variables, only output one calculated variable to keep simple.
269        """
270        print "Performing a numerical operation, only one output variable is supported when this option is selected...\n\n"
271        opController=NumericalOperations(self.bag["numericalOperation"])
272        usedVariableIndices=opController.variableIndices
273        opMethod=opController.opMethod.__name__
274       
275        varKeys=getSortedKeysLike(self.bag, "variable_")
276       
277        usedVars=[]
278        indx=0
279        for var in varKeys:
280            if indx in usedVariableIndices:
281                usedVars.append(var)
282               
283        self.varDict={}
284        varGrids=[]
285        arrayShapes=[]
286        arraySizes=[]
287        fileVarSelectorList=[]
288
289        outputFilePathList=None
290        for varKey in usedVars:
291            varCodes=varKey.split("_")[-1]
292            varID=self.bag[varKey]
293           
294            (datasetGroup, dataset, datasetURI)=self._getDatasetDetails(varKey)
295            dataFileHandler=DatasetFormatDecider(datasetGroup, dataset, datasetURI).datasetFormat
296            outputFormat=self.bag["outputFormat_%s" % varCodes]
297           
298            if outputFilePathList==None: 
299                outputFileName="%s_%s.%s" % (opMethod, os.getpid(), mapFileFormatToExtension(outputFormat))         
300                outputFilePathList=[os.path.join(self.outputDir, outputFileName)]
301           
302            if not isinstance(dataFileHandler, CDMSDataHandler):
303                raise DXOptionHandlingError, """The DX can currently only perform numerical operations on CDMS-type variables.
304                    You have selected a non-CDMS variable: '%s'.""" % (varID)
305           
306            axisSelectionDict=getDictSubsetMatching(self.bag, "axis_%s" % varCodes)
307            fileVarSelectorList.append([datasetURI, varID, axisSelectionDict])
308
309            (arrayShape, gridShape, size)=dataFileHandler.getSelectedVariableArrayDetails(datasetURI, varID, axisSelectionDict)
310            #print gridShape, arrayShape
311            varGrids.append(gridShape)
312            arrayShapes.append(arrayShape)
313            arraySizes.append(size)
314
315        print "Now compare grid shapes of variables..."
316        lenGrids=len(varGrids)
317        noneList=lenGrids*[None]
318
319        if noneList==varGrids:
320            gridsPresent="no"
321        elif None in varGrids:
322            raise DXOptionHandlingError, "Variable axes are different and cannot therefore be used in a mathematical operation."
323        else:
324            gridsPresent="yes"
325
326        refshape=arrayShapes[0]
327        for shape in arrayShapes[1:]:
328   
329            if len(shape)!=len(refshape):
330                raise DXOptionHandlingError, "Shapes of chosen variables are incompatible for mathematical operation."
331            if gridsPresent=="no":
332                if shape!=refshape:
333                    raise DXOptionHandlingError, "Axis lengths of chosen variables are different and cannot therefore be used in a mathematical operation."
334                elif gridsPresent=="yes":
335                    if shape[:-2]!=refshape[:-2]:
336                        raise DXOptionHandlingError, "Axis lengths of chosen variables are different and cannot therefore be used in a mathematical operation."
337
338        outputSize=arraySizes[0]
339        for sz in arraySizes[1:]:
340            if sz<outputSize:
341                outputSize=sz
342
343        if mode=="create outputs":
344            varsToUse=[]
345            print "Fetch each var from file..."
346            for (datasetURI, varID, axisSelectionDict) in fileVarSelectorList:
347                varsToUse.append(dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict))
348               
349            data=opController.performOperation(varsToUse)
350           
351            # Now write the data
352            outputHandler=self._getOutputHandler(outputFormat, dataFileHandler) 
353            outputFilePath=outputFilePathList[0]               
354            outputFileHandler=outputHandler(outputFilePath)
355            outputFileHandler.writeVariableAndGlobalAttributes(data, {})
356            outputFileHandler.closeFile()
357           
358            print "\nWrote variable '%s' to output file: %s" % (data.id, outputFilePath)                   
359           
360
361        outputVarID=opMethod
362        self.varDict[0]=(datasetURI, outputVarID, outputSize, outputFormat, outputFilePathList[:])
363
364
365           
366                   
367   
368
369if __name__=="__main__":
370    print "Setting ONE_FILE_PER_TIMESTEP=1"
371 
372    ONE_FILE_PER_TIMESTEP=1
373    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
374                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
375                     "outputFormat_1.3.1":"NetCDF"})
376    print x.getOutputFilePathDict()   
377                 
378    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
379                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
380                     "axis_1.3.1.1":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
381                     "outputFormat_1.3.1":"NetCDF",
382                     "datasetGroup_2":"Test Data Group 2",
383                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
384                     "axis_2.1.1.1":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
385                     "outputFormat_2.1.1":"NetCDF"})
386
387    print x.getOutputFilePathDict()   
388   
389    print "\n\n\n"
390    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
391                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
392                     "axis_1.3.1.1":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
393                     "outputFormat_1.3.1":"NetCDF",
394                     "datasetGroup_2":"Test Data Group 2",
395                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
396                     "axis_2.1.1.1":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
397                     "axis_2.1.1.2":(30,-30),
398                     "outputFormat_2.1.1":"NetCDF"})
399
400    x.createOutputs()       
401   
402    print "Setting ONE_FILE_PER_TIMESTEP=0"
403    ONE_FILE_PER_TIMESTEP=0
404    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
405                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
406                     "axis_1.3.1.1":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
407                     "outputFormat_1.3.1":"NetCDF",
408                     "datasetGroup_2":"Test Data Group 2",
409                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
410                     "axis_2.1.1.1":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
411                     "axis_2.1.1.2":(30,-30),
412                     "outputFormat_2.1.1":"NetCDF"})
413    print x.getOutputFilePathDict()       
414   
415    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
416                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
417                     "axis_1.3.1.1":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
418                     "outputFormat_1.3.1":"NetCDF",
419                     "datasetGroup_2":"Test Data Group 2",
420                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
421                     "axis_2.1.1.1":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
422                     "axis_2.1.1.2":(30,-30),
423                     "outputFormat_2.1.1":"NetCDF"})
424    x.createOutputs()   
425    print x.getOutputFilePathDict() 
426    print x.getOutputInfoDict()
427    print x.getOutputDurationEstimates()
428    print x.getOutputSizes()
429   
430    print "\n\n\n"
431    print "DIFFERENCING VARS..."
432    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
433                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
434                     "axis_1.3.1.1":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
435                     "outputFormat_1.3.1":"NetCDF",
436                     "datasetGroup_2":"Test Data Group 2",
437                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
438                     "axis_2.1.1.1":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
439                     "axis_2.1.1.2":(30,-30),
440                     "outputFormat_2.1.1":"NetCDF",
441                     "numericalOperation":"(var2)-(var1)"})     
442   
443    x.getOutputFilePathDict()
444    x.createOutputs()
445
446    print "TRYING REAL TEST.................."
447    x=OutputManager({'username':None, 'variable_1.1.1':'pqn', 
448                     'outputFormat_1.1.1':'NetCDF', 'userRoles':[], 
449                     'datasetGroup_1':'Test Data Group 1', 
450                     'axis_1.1.1.3':(-30, 30), 'dataset_1.1':'Test Dataset 1'})
451    print x.getOutputFilePathDict()
Note: See TracBrowser for help on using the repository browser.