source: TI03-DataExtractor/trunk/pydxs/OutputManager.py @ 1160

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI03-DataExtractor/trunk/pydxs/OutputManager.py@1160
Revision 1160, 17.3 KB checked in by astephen, 13 years ago (diff)

A number of minor bug-fixes including full time validation in client.

Line 
1#   Copyright (C) 2004 CCLRC & NERC( Natural Environment Research Council ).
2#   This software may be distributed under the terms of the
3#   Q Public License, version 1.0 or later. http://ndg.nerc.ac.uk/public_docs/QPublic_license.txt
4
5"""
6OutputManager.py
7================
8
9This module holds the OutputManager class used to generate products
10in the Data Extractor.
11
12"""
13
14# Import required modules
15import os
16import re
17import cdms
18import vcs
19import sys
20import time
21
22# Import package modules including global variables
23from common import *
24from serverConfig import *
25from localRules import *
26from NumericalOperations import *
27from DXDMLHandler import *
28from CDMSDataHandler import *
29from CDMSOutputHandler import *
30from CSMLDataHandler import *
31from CSMLOutputHandler import *
32from DatasetFormatDecider import *
33from FileNames import *
34from DXErrors import *
35
36# Make sure CDMS automatic bounds generation is set to OFF
37#cdms.setAutoBounds("off")
38
39#global ONE_FILE_PER_TIMESTEP   
40#ONE_FILE_PER_TIMESTEP=0
41
42
43class OutputManager:
44    """
45    Class to control generation of output variables as selected.
46    Externally called as:
47   
48    x=OutputManager(request)
49    x.getOutputFilePaths()
50    x.createOutputs()
51    """
52
53    def __init__(self, sessionObject):
54        """
55        Takes in request object and sets up processing.
56        """
57        self.bag=sessionObject
58        self.outputDir=checkSubDirectory(self.bag["username"]) 
59        self.DXDML=DXDMLHandler()
60        self.varDict={}
61                       
62
63    def _getDatasetDetails(self, varCodes):
64        """
65        Returns the datasetURI for a given variable code.
66        """
67        match=re.match("variable_(\d+)\.(\d+)\.(\d+)", varCodes)
68        if not match:
69            raise DXProcessingError, "Cannot match variable code: "+varCodes
70
71        (dsgCode, dsCode, varCode)=intAll(match.groups())
72        dsURICodes="%s.%s" % (dsgCode, dsCode)
73       
74        dsg=self.bag["datasetGroup_%s" % dsgCode]
75        ds=self.bag["dataset_%s.%s" % (dsgCode, dsCode)]       
76       
77        if self.bag.has_key("datasetURI_%s" % dsURICodes):
78            datasetURI=self.bag["datasetURI_%s" % dsURICodes]
79        else:
80            datasetURI=self.DXDML.getDatasetURI(dsg, ds)
81         
82        return (dsg, ds, datasetURI)
83
84
85    def getOutputInfoDict(self):
86        """
87        Returns output file dictionary including output paths, sizes
88        and estimated duration for producing the output.
89        """
90        if self.varDict!={}:
91            return self.varDict
92        if self.bag.has_key("numericalOperation"):
93            self._handleNumericalOperation(mode="file names")
94        else:
95            self._loopThroughVariables(mode="file names")
96        return self.varDict
97       
98
99    def getOutputFilePathDict(self):
100        """
101        Returns a dictionary of output files with variable IDs as keys.
102        """
103        dct=self.getOutputInfoDict()
104        pathdict={}
105        for key in dct.keys():
106            varID=dct[key][1]
107            paths=dct[key][4]
108            pathdict[varID]=paths
109        return pathdict
110
111
112    def getOutputSizes(self):
113        """
114        Returns a dictionary of the sizes of output variables with IDs as keys.
115        """
116        dct=self.getOutputInfoDict()
117        sizedict={}
118        for key in dct.keys():
119            varID=dct[key][1]
120            size=dct[key][2]
121            sizedict[varID]=size
122        return sizedict
123       
124
125    def getOutputDurationEstimates(self):
126        """
127        Returns a dictionary of the estimated durations to produce the requested
128        outputs, with variable IDs as keys.
129        """
130        dct=self.getOutputInfoDict()
131        durationdict={}
132        for key in dct.keys():
133            varID=dct[key][1]
134            size=dct[key][2]
135            format=dct[key][3]
136            multiplier=1
137            if format=="NASA Ames":
138                multiplier=3
139            durationdict[varID]=size*TIMING_SCALE_FACTOR*multiplier
140        return durationdict
141
142
143    def createOutputs(self):
144        """
145        Calls the appropriate methods to create output files.
146        """
147        if self.bag.has_key("numericalOperation"):
148            self._handleNumericalOperation(mode="create outputs")
149        else:
150            self._loopThroughVariables(mode="create outputs")
151
152
153    def _adjustFileSizeByFormat(self, size, outputFormat):
154        """
155        Returns the size of an output file adjusted according to the format used.
156        """
157        if outputFormat=="NASA Ames":   
158            size=size*5 
159        return size
160
161           
162    def _loopThroughVariables(self, mode):
163        """
164        Loop through the selected variables to generate the output
165        file paths or data.
166        """
167        varKeys=getSortedKeysLike(self.bag, "variable_")
168        self.varDict={}
169
170        varCount=0
171        for varKey in varKeys:
172            varCodes=varKey.split("_")[-1]
173            varID=self.bag[varKey]
174           
175            (datasetGroup, dataset, datasetURI)=self._getDatasetDetails(varKey)
176
177            dataFileHandler=DatasetFormatDecider(datasetGroup, dataset, datasetURI).datasetFormat
178            outputFormat=self.bag["outputFormat_%s" % varCodes]
179            #print dataFileHandler
180           
181            axisSelectionDict=getDictSubsetMatching(self.bag, "axis_%s" % varCodes)
182
183            timeStepStringList=dataFileHandler.getSelectedTimeSteps(datasetURI, varID, axisSelectionDict)
184           
185            sizeOfRequest=dataFileHandler.getSelectedVariableSubsetSize(datasetURI, varID, axisSelectionDict)
186            sizeOfRequest=self._adjustFileSizeByFormat(sizeOfRequest, outputFormat)
187           
188                   
189            if sizeOfRequest>(MAX_FILE_SIZE*(2**20)) or ONE_FILE_PER_TIMESTEP==1:
190                   
191                print "\n\n\n", (MAX_FILE_SIZE*(2**20)), sizeOfRequest, ONE_FILE_PER_TIMESTEP
192               
193                if mode=="file names":
194                    fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=timeStepStringList, 
195                                    fileFormat=outputFormat, variables=[varID],
196                                    basedir=self.outputDir)
197                    outputFilePathList=fileNamer.createFileNameList()
198                   
199                    if isinstance(dataFileHandler, CSMLDataHandler):
200                        # CSML special case, just make first file the xml with copy of first .nc name
201                        ncPath=outputFilePathList[0]
202                        csmlPath=os.path.splitext(ncPath)[0]+".xml"
203                        outputFilePathList.insert(0, csmlPath)
204               
205                elif mode=="create outputs":
206                    # Now get the real data
207                   
208                    outputHandler=self._getOutputHandler(outputFormat, dataFileHandler)
209                    outputFilePathList=[]
210                   
211                    for timeStep in timeStepStringList:
212                        fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=[timeStep], 
213                                    fileFormat=outputFormat, variables=[varID],
214                                    basedir=self.outputDir)
215                        outputFilePath=fileNamer.createFileNameList()[0]
216                        outputFilePathList.append(outputFilePath)
217                                               
218                        data=dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict, timeStep)
219                       
220                        outputFileHandler=outputHandler(outputFilePath)
221                        globalAttributes=dataFileHandler.getCFGlobalAttributes(datasetURI)
222                        outputFileHandler.writeVariableAndGlobalAttributes(data, globalAttributes)
223                        outputFileHandler.closeFile()
224                        print "\nWrote variable '%s' to output file: %s" % (varID, outputFilePath)             
225                       
226            else:
227                if len(timeStepStringList)==0:
228                    timeStepStringList=[]
229                elif len(timeStepStringList)==1:
230                    timeStepStringList=timeStepStringList
231                else:
232                    timeStepStringList=["%s-%s" % (timeStepStringList[0], timeStepStringList[-1])]
233                           
234                print "Work out file name as only one file..."
235                fileNamer=FileNames(datasetGroup=datasetGroup, dataset=dataset, timeSteps=timeStepStringList, 
236                                    fileFormat=outputFormat, variables=[varID],
237                                    basedir=self.outputDir)
238                outputFilePathList=fileNamer.createFileNameList()
239                       
240                if isinstance(dataFileHandler, CSMLDataHandler):
241                    ncPath=outputFilePathList[0]
242                    csmlPath=os.path.splitext(ncPath)[0]+".xml"
243                    outputFilePathList.insert(0, csmlPath)
244                               
245                if mode=="create outputs":
246                    # Now get the real data
247                    outputHandler=self._getOutputHandler(outputFormat, dataFileHandler)         
248                    outputFilePath=outputFilePathList[0]
249                   
250                    if isinstance(dataFileHandler, CSMLDataHandler):
251                        # CSML needs subsetting in one go
252                        #ncPath=outputFilePath
253                        #csmlPath=os.path.splitext(ncPath)[0]+".xml"
254                        #outputFilePathList.insert(0, csmlPath)
255                        dataFileHandler.subsetVariableToCSMLNC(datasetURI, varID, axisSelectionDict, csmlPath, ncPath)
256                        print "\nWrote variable '%s' to output files: %s, %s" % (varID, csmlPath, ncPath)
257                    else:
258                        data=dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict)
259                       
260                        outputFileHandler=outputHandler(outputFilePath)
261                        globalAttributes=dataFileHandler.getCFGlobalAttributes(datasetURI)
262                        outputFileHandler.writeVariableAndGlobalAttributes(data, globalAttributes)
263                        outputFileHandler.closeFile()
264                        print "\nWrote variable '%s' to output file: %s" % (varID, outputFilePath)                 
265                   
266   
267            self.varDict[varCount]=(datasetURI, varID, sizeOfRequest, outputFormat, outputFilePathList[:])
268            print "\n\n", outputFilePathList, "\n\n"
269            varCount=varCount+1
270
271
272    def _getOutputHandler(self, outputFormat, dataHandler):
273        """
274        Get output handler.
275        """
276        if outputFormat not in OUTPUT_FORMATS:
277            raise DXOptionHandlingError, "Output format '%s' is not supported." % outputFormat
278           
279        if isinstance(dataHandler, CDMSDataHandler):
280            if outputFormat=="NetCDF":     
281                outputHandler=CDMSOutputHandler
282            elif outputFormat=="NASA Ames":
283                outputHandler==NASAAmesOutputHandler
284        elif isinstance(dataHandler, CSMLDataHandler):
285            outputHandler=CSMLOutputHandler
286           
287        return outputHandler
288       
289
290    def _handleNumericalOperation(self, mode):
291        """
292        If a numerical operation is defined then don't loop through
293        variables, only output one calculated variable to keep simple.
294        """
295        print "Performing a numerical operation, only one output variable is supported when this option is selected...\n\n"
296        opController=NumericalOperations(self.bag["numericalOperation"])
297        usedVariableIndices=opController.variableIndices
298        opMethod=opController.opMethod.__name__
299       
300        varKeys=getSortedKeysLike(self.bag, "variable_")
301       
302        usedVars=[]
303        indx=0
304        for var in varKeys:
305            if indx in usedVariableIndices:
306                usedVars.append(var)
307               
308        self.varDict={}
309        varGrids=[]
310        arrayShapes=[]
311        arraySizes=[]
312        fileVarSelectorList=[]
313
314        outputFilePathList=None
315        for varKey in usedVars:
316            varCodes=varKey.split("_")[-1]
317            varID=self.bag[varKey]
318           
319            (datasetGroup, dataset, datasetURI)=self._getDatasetDetails(varKey)
320            dataFileHandler=DatasetFormatDecider(datasetGroup, dataset, datasetURI).datasetFormat
321            outputFormat=self.bag["outputFormat_%s" % varCodes]
322           
323            if outputFilePathList==None: 
324                outputFileName="%s_%s.%s" % (opMethod, os.getpid(), mapFileFormatToExtension(outputFormat))         
325                outputFilePathList=[os.path.join(self.outputDir, outputFileName)]
326           
327            if not isinstance(dataFileHandler, CDMSDataHandler):
328                raise DXOptionHandlingError, """The DX can currently only perform numerical operations on CDMS-type variables.
329                    You have selected a non-CDMS variable: '%s'.""" % (varID)
330           
331            axisSelectionDict=getDictSubsetMatching(self.bag, "axis_%s" % varCodes)
332            fileVarSelectorList.append([datasetURI, varID, axisSelectionDict])
333
334            (arrayShape, gridShape, size)=dataFileHandler.getSelectedVariableArrayDetails(datasetURI, varID, axisSelectionDict)
335            #print gridShape, arrayShape
336            varGrids.append(gridShape)
337            arrayShapes.append(arrayShape)
338            arraySizes.append(size)
339
340        print "Now compare grid shapes of variables..."
341        lenGrids=len(varGrids)
342        noneList=lenGrids*[None]
343
344        if noneList==varGrids:
345            gridsPresent="no"
346        elif None in varGrids:
347            raise DXOptionHandlingError, "Variable axes are different and cannot therefore be used in a mathematical operation."
348        else:
349            gridsPresent="yes"
350
351        refshape=arrayShapes[0]
352        for shape in arrayShapes[1:]:
353   
354            if len(shape)!=len(refshape):
355                raise DXOptionHandlingError, "Shapes of chosen variables are incompatible for mathematical operation."
356            if gridsPresent=="no":
357                if shape!=refshape:
358                    raise DXOptionHandlingError, "Axis lengths of chosen variables are different and cannot therefore be used in a mathematical operation."
359                elif gridsPresent=="yes":
360                    if shape[:-2]!=refshape[:-2]:
361                        raise DXOptionHandlingError, "Axis lengths of chosen variables are different and cannot therefore be used in a mathematical operation."
362
363        outputSize=arraySizes[0]
364        for sz in arraySizes[1:]:
365            if sz<outputSize:
366                outputSize=sz
367
368        if mode=="create outputs":
369            varsToUse=[]
370            print "Fetch each var from file..."
371            for (datasetURI, varID, axisSelectionDict) in fileVarSelectorList:
372                varsToUse.append(dataFileHandler.readVariableSubsetIntoMemory(datasetURI, varID, axisSelectionDict))
373               
374            data=opController.performOperation(varsToUse)
375           
376            # Now write the data
377            outputHandler=self._getOutputHandler(outputFormat, dataFileHandler) 
378            outputFilePath=outputFilePathList[0]               
379            outputFileHandler=outputHandler(outputFilePath)
380            outputFileHandler.writeVariableAndGlobalAttributes(data, {})
381            outputFileHandler.closeFile()
382           
383            print "\nWrote variable '%s' to output file: %s" % (data.id, outputFilePath)                   
384           
385
386        outputVarID=opMethod
387        self.varDict[0]=(datasetURI, outputVarID, outputSize, outputFormat, outputFilePathList[:])
388
389
390           
391                   
392   
393
394if __name__=="__main__":
395    #print "Setting ONE_FILE_PER_TIMESTEP=1"
396 
397    #ONE_FILE_PER_TIMESTEP=1
398    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
399                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
400                     "outputFormat_1.3.1":"NetCDF"})
401    print x.getOutputFilePathDict()   
402                 
403    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
404                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
405                     "axis_1.3.1.1":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
406                     "outputFormat_1.3.1":"NetCDF",
407                     "datasetGroup_2":"Test Data Group 2",
408                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
409                     "axis_2.1.1.1":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
410                     "outputFormat_2.1.1":"NetCDF"})
411
412    print x.getOutputFilePathDict()   
413   
414    print "\n\n\n"
415    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
416                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
417                     "axis_1.3.1.1":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
418                     "outputFormat_1.3.1":"NetCDF",
419                     "datasetGroup_2":"Test Data Group 2",
420                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
421                     "axis_2.1.1.1":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
422                     "axis_2.1.1.2":(30,-30),
423                     "outputFormat_2.1.1":"NetCDF"})
424
425    x.createOutputs()       
426   
427    print "Setting ONE_FILE_PER_TIMESTEP=0"
428    ONE_FILE_PER_TIMESTEP=0
429    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
430                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
431                     "axis_1.3.1.1":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
432                     "outputFormat_1.3.1":"NetCDF",
433                     "datasetGroup_2":"Test Data Group 2",
434                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
435                     "axis_2.1.1.1":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
436                     "axis_2.1.1.2":(30,-30),
437                     "outputFormat_2.1.1":"NetCDF"})
438    print x.getOutputFilePathDict()       
439   
440    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
441                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
442                     "axis_1.3.1.1":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
443                     "outputFormat_1.3.1":"NetCDF",
444                     "datasetGroup_2":"Test Data Group 2",
445                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
446                     "axis_2.1.1.1":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
447                     "axis_2.1.1.2":(30,-30),
448                     "outputFormat_2.1.1":"NetCDF"})
449    x.createOutputs()   
450    print x.getOutputFilePathDict() 
451    print x.getOutputInfoDict()
452    print x.getOutputDurationEstimates()
453    print x.getOutputSizes()
454   
455    print "\n\n\n"
456    print "DIFFERENCING VARS..."
457    x=OutputManager({"username":"jane", "datasetGroup_1":"Test Data Group 1",
458                     "dataset_1.3":"Test Dataset 1", "variable_1.3.1":"pqn",
459                     "axis_1.3.1.1":("1999-01-01T00:00:00", "1999-01-01T06:00:00"),
460                     "outputFormat_1.3.1":"NetCDF",
461                     "datasetGroup_2":"Test Data Group 2",
462                     "dataset_2.1":"Test Dataset 2", "variable_2.1.1":"var2",
463                     "axis_2.1.1.1":("2004-01-01T12:00:00", "2004-01-01T12:00:00"),
464                     "axis_2.1.1.2":(30,-30),
465                     "outputFormat_2.1.1":"NetCDF",
466                     "numericalOperation":"(var2)-(var1)"})     
467   
468    x.getOutputFilePathDict()
469    x.createOutputs()
470
471    print "TRYING REAL TEST.................."
472    x=OutputManager({'username':None, 'variable_1.1.1':'pqn', 
473                     'outputFormat_1.1.1':'NetCDF', 'userRoles':[], 
474                     'datasetGroup_1':'Test Data Group 1', 
475                     'axis_1.1.1.3':(-30, 30), 'dataset_1.1':'Test Dataset 1'})
476    print x.getOutputFilePathDict()
477   
478    ONE_FILE_PER_TIMESTEP=0   
479    print "\n\nCSML test!\n"
480    x=OutputManager({'username':None, 'variable_1.1.1':'var2', 
481                     'outputFormat_1.1.1':'CSML/NetCDF', 'userRoles':[], 
482                     'datasetGroup_1':'CSML test dataset group', 
483                     'axis_1.1.1.3':(0, 35), 'dataset_1.1':'CSML test dataset great test'})
484    print "\nGet output file path dict...", x.getOutputFilePathDict()
485    print "\nCREATE CSML Outputs...", x.createOutputs()
Note: See TracBrowser for help on using the repository browser.