source: CCCC/trunk/ceda_cc/c4_run.py @ 456

Subversion URL: http://proj.badc.rl.ac.uk/svn/exarch/CCCC/trunk/ceda_cc/c4_run.py
Revision 456, 14.8 KB checked in by mjuckes, 4 years ago (diff)

additions for CCI

Line 
1"""ceda_cc
2##########
3Entry point for API.
4
5USAGE
6#####
7c4_run.main( <argument list> )
8"""
9import sys
10from ccinit import c4_init
11
12testmain=False
13## callout to summary.py: if this option is selected, imports of libraries are not needed.
14if not testmain:
15  if __name__ == '__main__':
16   if len(sys.argv) > 1:
17     if sys.argv[1] == '--sum':
18        import summary
19        summary.summariseLogs()
20        raise SystemExit(0)
21     elif sys.argv[1] == '-v':
22        from versionConfig import version, versionComment
23        print 'ceda-cc version %s [%s]' % (version,versionComment)
24        raise SystemExit(0)
25     elif sys.argv[1] == '--unitTest':
26        print "Starting test suite 1"
27        import unitTestsS1
28        print "Starting test suite 2"
29        import unitTestsS2
30        print "Tests completed"
31        raise SystemExit(0)
32   else:
33     print __doc__
34     raise SystemExit(0)
35
36# Standard library imports
37import os, string, time, glob, pkgutil
38import shutil
39## pkgutil is used in file_utils
40# Third party imports
41
42## Local imports with 3rd party dependencies
43#### netcdf --- currently only support for cmds2 -- re-arranged to facilitate support for alternative modules
44
45import file_utils
46
47from file_utils import fileMetadata, ncLib
48
49# Local imports
50import utils_c4 as utils
51import config_c4 as config
52
53reload( utils )
54
55from xceptions import baseException
56
57from fcc_utils2 import tupsort
58
59
60#driving_model_ensemble_member = <CMIP5Ensemble_member>
61#rcm_version_id = <RCMVersionID>                     
62
63class dummy(object):
64  def __init__(self):
65     self.experimental = None
66     self.parent = None
67
68pathTmplDict = { 'CORDEX':'%(project)s/%(product)s/%(domain)s/%(institute)s/%(driving_model)s/%(experiment)s/%(ensemble)s/%(model)s/%(model_version)s/%(frequency)s/%(variable)s/files/%%(version)s/',   \
69                 'SPECS':'%(project)s/%(product)s/%(institute)s/%(model)s/%(experiment)s/%(start_date)s/%(frequency)s/%(realm)s/%(table)s/%(variable)s/%(ensemble)s/files/%%(version)s/', \
70                 'CMIP5':'%(project)s/%(product)s/%(institute)s/%(model)s/%(experiment)s/%(frequency)s/%(realm)s/%(table)s/%(ensemble)s/files/%%(version)s/%(variable)s/', \
71                 'CCMI':'%(project)s/%(product)s/%(institute)s/%(model)s/%(experiment)s/%(frequency)s/%(realm)s/%(table)s/%(ensemble)s/files/%%(version)s/%(variable)s/', \
72                 'ESA-CCI':'%(level)s/%(platform)s/%(sensor)s/%(variable)s/', \
73                 '__def__':'%(project)s/%(product)s/%(institute)s/%(model)s/%(experiment)s/%(frequency)s/%(realm)s/%(variable)s/%(ensemble)s/files/%%(version)s/', \
74               }
75
76## Core DRS: list of vocab names
77## Path template: -- current version puts upper case in "project"
78## Dataset template: 
79
80class recorder(object):
81
82  def __init__(self,project,fileName,type='map',dummy=False):
83    self.dummy = dummy
84    self.file = fileName
85    self.type = type
86    self.pathTmpl = '%(project)s/%(product)s/%(domain)s/%(institute)s/%(driving_model)s/%(experiment)s/%(ensemble)s/%(model)s/%(model_version)s/%(frequency)s/%(variable)s/files/%%(version)s/'
87    self.pathTmpl = pathTmplDict.get(project,pathTmplDict['__def__'])
88    self.records = {}
89    self.tidtupl = []
90
91  def open(self):
92    if self.type == 'map':
93      self.fh = open( self.file, 'a' )
94    else:
95      self.sh = shelve.open( self.file )
96
97  def close(self):
98    if self.type == 'map':
99      self.fh.close()
100    else:
101      self.sh.close()
102
103  def add(self,fpath,drs,safe=True):
104    assert self.type == 'map','Can only do map files at present'
105    assert type(drs) == type( {} ), '2nd user argument to method add should be a dictionary [%s]' % type(drs)
106    tpath = self.pathTmpl % drs
107    if not self.dummy:
108      assert os.path.isfile( fpath ), 'File %s not found' % fpath
109      fdate = time.ctime(os.path.getmtime(fpath))
110      sz = os.stat(fpath).st_size
111    else:
112      fdate = "na"
113      sz = 0
114    record = '%s | OK | %s | modTime = %s | target = %s ' % (fpath,sz,fdate,tpath)
115    fn = string.split( fpath, '/' )[-1]
116    for k in ['creation_date','tracking_id']:
117      if k in drs.keys():
118        record += ' | %s = %s' % (k,drs[k])
119        if k == 'tracking_id':
120          self.tidtupl.append( (fn,drs[k]) )
121
122    self.records[fn] = record
123 
124  def modify(self,fn,msg):
125    assert fn in self.records.keys(),'Attempt to modify non-existent record %s, %s' % [fn,str(self.records.keys()[0:10])]
126    if string.find( self.records[fn], '| OK |') == -1:
127      ##print 'File %s already flagged with errors' % fn
128      return
129    s = string.replace( self.records[fn], '| OK |', '| %s |' % msg )
130    ##print '--> ',s
131    self.records[fn] = s
132
133  def checktids(self):
134## sort by tracking id
135    if len( self.tidtupl ) == 1:
136      return
137    self.tidtupl.sort( cmp=tupsort(k=1).cmp )
138    nd = 0
139    fnl = []
140    for k in range(len(self.tidtupl)-1):
141      if self.tidtupl[k][1] == self.tidtupl[k+1][1]:
142        print 'Duplicate tracking_id: %s, %s:: %s' % (self.tidtupl[k][0],self.tidtupl[k+1][0],self.tidtupl[k][1])
143        nd += 1
144        if len(fnl) == 0 or fnl[-1] != self.tidtupl[k][0]:
145          fnl.append( self.tidtupl[k][0])
146        fnl.append( self.tidtupl[k+1][0])
147    if nd == 0:
148      print 'No duplicate tracking ids found in %s files' % len(self.tidtupl)
149    else:
150      print '%s duplicate tracking ids' % nd
151      for f in fnl:
152        self.modify( f, 'ERROR: duplicate tid' )
153
154  def dumpAll(self,safe=True):
155    keys = self.records.keys()
156    keys.sort()
157    for k in keys:
158      self.dump( self.records[k], safe=safe )
159
160  def dump( self, record, safe=True ):
161    if safe:
162      self.open()
163    self.fh.write( record + '\n' )
164    if safe:
165      self.close()
166
167  def addErr(self,fpath,reason,safe=True):
168    record = '%s | %s' % (fpath, reason)
169    fn = string.split( fpath, '/' )[-1]
170    self.records[fn] = record
171
172class checker(object):
173  def __init__(self, pcfg, cls,reader,abortMessageCount=-1,experimental=False):
174    self.info = dummy()
175    self.info.pcfg = pcfg
176    self.info.fileIsFixed = None
177    self.info.abortMessageCount = abortMessageCount
178    self.info.experimental = experimental
179    self.calendar = 'None'
180    self.ncReader = reader
181    self.cfn = utils.checkFileName( parent=self.info,cls=cls)
182    self.cga = utils.checkGlobalAttributes( parent=self.info,cls=cls)
183    self.cgd = utils.checkStandardDims( parent=self.info,cls=cls)
184    self.cgg = utils.checkGrids( parent=self.info,cls=cls)
185    self.cls = cls
186
187    # Define vocabs based on project
188    ##self.vocabs = getVocabs(pcgf)
189    self.vocabs = pcfg.vocabs
190
191  def checkFile(self,fpath,log=None,attributeMappings=[], getdrs=True):
192    self.calendar = 'None'
193    self.info.log = log
194
195    fn = string.split( fpath, '/' )[-1]
196
197    if attributeMappings != []:
198      self.ncReader.loadNc( fpath )
199      self.ncReader.applyMap( attributeMappings, self.cfn.globalAttributesInFn, log=log )
200      ncRed = True
201      thisFn = self.ncReader.fn
202    else:
203      ncRed = False
204      thisFn = fn
205
206    self.cfn.check( thisFn )
207    if not self.cfn.completed:
208      self.completed = False
209      return
210    if not self.info.pcfg.projectV.id[:2] == '__':
211      if not os.path.isfile( fpath ):
212        print 'File %s not found [2]' % fpath
213        self.completed = False
214        return
215
216    if not ncRed:
217      ##print fpath
218      self.ncReader.loadNc( fpath )
219    self.ga = self.ncReader.ga
220    self.va = self.ncReader.va
221    self.da = self.ncReader.da
222
223    if self.cfn.freq != None:
224      vGroup = self.cfn.freq
225    else:
226      vGroup = self.info.pcfg.mipVocabVgmap.get(self.cfn.group,self.cfn.group)
227    self.cga.check( self.ga, self.va, self.cfn.var, vGroup, self.vocabs, self.cfn.fnParts )
228    if not self.cga.completed:
229      self.completed = False
230      return
231
232    ##self.cgd.plevRequired = config.plevRequired
233    ##self.cgd.plevValues = config.plevValues
234    ##self.cgd.heightRequired = config.heightRequired
235    ##self.cgd.heightValues = config.heightValues
236    ##self.cgd.heightRange = config.heightRange
237    self.cgd.check( self.cfn.var, self.cfn.freq, self.da, self.va, self.cga.isInstantaneous, self.vocabs )
238    self.calendar = self.cgd.calendar
239    if not self.cgd.completed:
240      self.completed = False
241      return
242
243    if self.info.pcfg.doCheckGrids:
244      ##self.cgg.rotatedPoleGrids = config.rotatedPoleGrids
245      ##self.cgg.interpolatedGrids = config.interpolatedGrids
246      self.cgg.check( self.cfn.var, self.cfn.domain, self.da, self.va )
247   
248      if not self.cgg.completed:
249        self.completed = False
250        return
251    self.completed = True
252    if getdrs:
253      self.drs = self.cga.getDrs()
254      self.drs['project'] = self.info.pcfg.projectV.id
255    self.errorCount = self.cfn.errorCount + self.cga.errorCount + self.cgd.errorCount + self.cgg.errorCount
256
257class main(object):
258  """Main entry point for execution.
259
260     All compliance tests are completed in the instantiation of a "main" object. The object created will contain attributes with test results.
261  """
262 
263
264  def __init__(self,args=None,abortMessageCount=-1,printInfo=False,monitorFileHandles = False,cmdl=None):
265    logDict = {}
266    ecount = 0
267    c4i = c4_init(args=args)
268    c4i.logger.info( 'Starting batch -- number of file: %s' % (len(c4i.flist)) )
269    c4i.logger.info( 'Source: %s' % c4i.source )
270    if cmdl != None:
271      c4i.logger.info( 'Command: %s' % cmdl )
272     
273    isDummy  = c4i.project[:2] == '__'
274    if (ncLib == None) and (not isDummy):
275       raise baseException( 'Cannot proceed with non-dummy [%s] project without a netcdf API' % (c4i.project) )
276    pcfg = config.projectConfig( c4i.project )
277    assert pcfg.projectV.v == -1, 'Cannot handle anything other than latest version at present'
278    ncReader = fileMetadata(dummy=isDummy, attributeMappingsLog=c4i.attributeMappingsLog,forceLib=c4i.forceNetcdfLib)
279    c4i.logger.info( 'Python netcdf: %s' % ncReader.ncLib )
280    self.cc = checker(pcfg, c4i.project, ncReader,abortMessageCount=abortMessageCount, experimental=c4i.experimental)
281    rec = recorder( c4i.project, c4i.recordFile, dummy=isDummy )
282    self.ncLib = ncLib
283
284    # This list will record the drs dictionaries of all checked files for export to JSON
285    drs_list = []
286
287    if monitorFileHandles:
288      self.monitor = utils.sysMonitor()
289    else:
290      self.monitor = None
291
292    cal = None
293    if len( c4i.errs ) > 0:
294      for i in range(0,len( c4i.errs ), 2 ):
295        c4i.logger.info( c4i.errs[i] )
296 
297    self.cc.info.amapListDraft = []
298    cbv = utils.checkByVar( parent=self.cc.info,cls=c4i.project,monitor=self.monitor)
299    if c4i.project not in ['ESA-CCI']:
300      cbv.impt( c4i.flist )
301      if printInfo:
302        print cbv.info
303
304    fileLogOpen = False
305    self.resList =  []
306    stdoutsum = 2000
307    npass = 0
308    kf = 0
309    for f in c4i.flist:
310      kf += 1
311      rv = False
312      ec = None
313      if monitorFileHandles:
314        nofhStart = self.monitor.get_open_fds()
315      fn = string.split(f,'/')[-1]
316      c4i.logger.info( 'Starting: %s' % fn )
317      try:
318  ### need to have a unique name, otherwise get mixing of logs despite close statement below.
319  ### if duplicate file names are present, this will be recorded in the main log, tag appended to file level log name (not yet tested).
320        if c4i.logByFile:
321          fLogger = c4i.getFileLog( fn )
322          logDict[fn] = c4i.fileLogfile
323          c4i.logger.info( 'Log file: %s' % c4i.fileLogfile )
324          fileLogOpen = True
325        else:
326          fLogger = c4i.logger
327 
328        fLogger.info( 'Starting file %s' % fn )
329## default appending to myapp.log; mode='w' forces a new file (deleting old contents).
330        self.cc.checkFile( f, log=fLogger,attributeMappings=c4i.attributeMappings, getdrs=c4i.getdrs )
331
332        if self.cc.completed:
333          if cal not in (None, 'None') and self.cc.cgd.varGroup != "fx":
334            if cal != self.cc.calendar:
335              cal_change_err_msg = 'Error: change in calendar attribute %s --> %s' % (cal, self.cc.calendar)
336              c4i.logger.info(cal_change_err_msg)
337              fLogger.info(cal_change_err_msg)
338              self.cc.errorCount += 1
339
340          cal = self.cc.calendar
341          ec = self.cc.errorCount
342        rv =  ec == 0
343        if rv:
344          npass += 1
345        self.resList.append( (rv,ec) )
346
347        if c4i.logByFile:
348          if self.cc.completed:
349            fLogger.info( 'Done -- error count %s' % self.cc.errorCount )
350          else:
351            fLogger.info( 'Done -- checks not completed' )
352          c4i.closeFileLog( )
353          fileLogOpen = False
354
355        if self.cc.completed:
356          c4i.logger.info( 'Done -- error count %s' % self.cc.errorCount ) 
357          ecount += self.cc.errorCount
358          if self.cc.errorCount == 0:
359            rec.add( f, self.cc.drs )
360            drs_list.append({'path': f, 'drs': self.cc.drs})
361          else:
362            rec.addErr( f, 'ERRORS FOUND | errorCount = %s' % self.cc.errorCount )
363        else:
364          ecount += 20
365          c4i.logger.info( 'Done -- testing aborted because of severity of errors' )
366          rec.addErr( f, 'ERRORS FOUND AND CHECKS ABORTED' )
367      except:
368        c4i.logger.error("Exception has occured" ,exc_info=1)
369        if fileLogOpen:
370          fLogger.error("C4.100.001: [exception]: FAILED:: Exception has occured" ,exc_info=1)
371          c4i.closeFileLog( )
372          fileLogOpen = False
373        rec.addErr( f, 'ERROR: Exception' )
374        if not c4i.holdExceptions:
375          raise
376      if stdoutsum > 0 and kf%stdoutsum == 0:
377         print '%s files checked; %s passed this round' % (kf,npass)
378      if monitorFileHandles:
379        nofhEnd = self.monitor.get_open_fds()
380        if nofhEnd > nofhStart:
381           print 'Open file handles: %s --- %s' % (nofhStart, nofhEnd)
382 
383    self.cc.info.log = c4i.logger
384   
385    if c4i.project not in ['SPECS','CCMI','CMIP5','ESA-CCI']:
386       cbv.c4i = c4i
387       cbv.setLogDict( logDict )
388       cbv.check( recorder=rec, calendar=self.cc.calendar)
389       try:
390         ecount += cbv.errorCount
391       except:
392         ecount = None
393    ncReader.close()
394    if type( self.cc.info.amapListDraft ) == type( [] ) and len(  self.cc.info.amapListDraft ) > 0:
395      ll =  self.cc.info.amapListDraft
396      ll.sort()
397      oo = open( 'amapDraft.txt', 'w' )
398      oo.write( ll[0] + '\n' )
399      for i in range( 1,len(ll) ):
400        if ll[i] != ll[i-1]:
401          oo.write( ll[i] + '\n' )
402      oo.close()
403    if c4i.project in ['SPECS','CCMI','CMIP5']:
404      rec.checktids()
405    rec.dumpAll()
406
407    #!TODO: the recorder class could export JSON if it recorded the full drs dictionaries.
408    #       This lightweight solution re-uses the filename from the rec class and dumps
409    #       JSON in a separate function.
410    json_file = os.path.splitext(rec.file)[0] + '.json'
411    dump_drs_list(drs_list, json_file)
412
413    if printInfo:
414      print 'Error count %s' % ecount
415    ##c4i.hdlr.close()
416    c4i.closeBatchLog()
417    self.ok = all( map( lambda x: x[0], self.resList ) )
418
419
420def dump_drs_list(drs_list, filename):
421    import json
422    fh = open(filename, 'a+')
423    for drs in drs_list:
424                fh.write(json.dumps(drs))
425                fh.write('\n')
426    fh.close()
Note: See TracBrowser for help on using the repository browser.