source: CCCC/trunk/c4.py @ 100

Subversion URL: http://proj.badc.rl.ac.uk/svn/exarch/CCCC/trunk/c4.py@100
Revision 100, 16.1 KB checked in by mjuckes, 7 years ago (diff)

fixed leaking of log file handles

Line 
1
2# Standard library imports
3import os, string, time, logging, sys, glob
4
5# Third party imports
6
7try:
8  import cdms2
9  withCdms = True
10except:
11  print 'Failed to import cdms2: will not be able to read NetCDF'
12  withCdms = False
13
14# Local imports
15import utils_c4 as utils
16import config_c4 as config
17
18reload( utils )
19
20
21#driving_model_ensemble_member = <CMIP5Ensemble_member>
22#rcm_version_id = <RCMVersionID>                     
23
24class fileMetadata:
25
26  def __init__(self,dummy=False):
27     self.dummy = dummy
28
29  def loadNc(self,fpath):
30    self.fn = string.split( fpath, '/' )[-1]
31    self.fparts = string.split( self.fn[:-3], '_' )
32    self.ga = {}
33    self.va = {}
34    self.da = {}
35    if self.dummy:
36      self.makeDummyFileImage()
37      return
38    self.nc = cdms2.open( fpath )
39    for k in self.nc.attributes.keys():
40      self.ga[k] = self.nc.attributes[k]
41    for v in self.nc.variables.keys():
42      self.va[v] = {}
43      for k in self.nc.variables[v].attributes.keys():
44        self.va[v][k] = self.nc.variables[v].attributes[k]
45      self.va[v]['_type'] = str( self.nc.variables[v].dtype )
46      if v in ['plev','plev_bnds','height']:
47        self.va[v]['_data'] = self.nc.variables[v].getValue().tolist()
48
49    for v in self.nc.axes.keys():
50      self.da[v] = {}
51      for k in self.nc.axes[v].attributes.keys():
52        self.da[v][k] = self.nc.axes[v].attributes[k]
53      self.da[v]['_type'] = str( self.nc.axes[v].getValue().dtype )
54      self.da[v]['_data'] = self.nc.axes[v].getValue().tolist()
55     
56    self.nc.close()
57
58  def makeDummyFileImage(self):
59    for k in range(10):
60      self.ga['ga%s' % k] =  str(k)
61    for v in [self.fparts[0],]:
62      self.va[v] = {}
63      self.va[v]['standard_name'] = 's%s' % v
64      self.va[v]['name'] = v
65      self.va[v]['units'] = '1'
66      self.va[v]['_type'] = 'float32'
67
68    for v in ['lat','lon','time']:
69      self.da[v] = {}
70      self.da[v]['_type'] = 'float64'
71      self.da[v]['_data'] = range(5)
72    dlist = ['lat','lon','time']
73    svals = lambda p,q: map( lambda y,z: self.da[y].__setitem__(p, z), dlist, q )
74    svals( 'standard_name', ['latitude', 'longitude','time'] )
75    svals( 'name', ['latitude', 'longitude','time'] )
76    svals( 'units', ['degrees_north', 'degrees_east','days since 19590101'] )
77
78  def applyMap( self, mapList, globalAttributesInFn, log=None ):
79    for m in mapList:
80      if m[0] == 'am001':
81        if m[1][0][0] == "@var":
82          if m[1][0][1] in self.va.keys():
83            this = self.va[m[1][0][1]]
84            apThis = True
85            for c in m[1][1:]:
86              if c[0] not in this.keys():
87                apThis = False
88              elif c[1] != this[c[0]]:
89                apThis = False
90            if m[2][0] != '':
91              targ = m[2][0]
92            else:
93              targ = m[1][-1][0]
94            if apThis:
95              if log != None:
96                log.info( 'Setting %s to %s' % (targ,m[2][1]) )
97              ##print 'Setting %s:%s to %s' % (m[1][0][1],targ,m[2][1])
98              self.va[m[1][0][1]][targ] = m[2][1]
99
100        elif m[1][0][0] == "@ax":
101          ##print 'checking dimension ',m[1][0][1], self.da.keys()
102          if m[1][0][1] in self.da.keys():
103            ##print 'checking dimension [2]',m[1][0][1]
104            this = self.da[m[1][0][1]]
105            apThis = True
106            for c in m[1][1:]:
107              if c[0] not in this.keys():
108                apThis = False
109              elif c[1] != this[c[0]]:
110                apThis = False
111            if m[2][0] != '':
112              targ = m[2][0]
113            else:
114              targ = m[1][-1][0]
115            if apThis:
116              if log != None:
117                log.info( 'Setting %s to %s' % (targ,m[2][1]) )
118              ##print 'Setting %s:%s to %s' % (m[1][0][1],targ,m[2][1])
119              self.da[m[1][0][1]][targ] = m[2][1]
120        elif m[1][0][0][0] != "@":
121            this = self.ga
122            apThis = True
123            for c in m[1]:
124              if c[0] not in this.keys():
125                apThis = False
126              elif c[1] != this[c[0]]:
127                apThis = False
128            if m[2][0] != '':
129              targ = m[2][0]
130            else:
131              targ = m[1][-1][0]
132            if apThis:
133              if log != None:
134                log.info( 'Setting %s to %s' % (targ,m[2][1]) )
135              print 'Setting %s:%s to %s' % (m[1][0][1],targ,m[2][1])
136              self.ga[targ] = m[2][1]
137              if targ in globalAttributesInFn:
138                self.fparts[ globalAttributesInFn.index(targ) ] = m[2][1]
139                self.fn = string.join( self.fparts, '_' ) + '.nc'
140        else:
141          print 'Token %s not recognised' % m[1][0][0]
142
143
144class dummy:
145   pass
146
147class recorder:
148
149  def __init__(self,fileName,type='map',dummy=False):
150    self.dummy = dummy
151    self.file = fileName
152    self.type = type
153    self.pathTmpl = '%(project)s/%(product)s/%(domain)s/%(institute)s/%(driving_model)s/%(experiment)s/%(ensemble)s/%(model)s/%(model_version)s/%(frequency)s/%(variable)s/files/%%(version)s/'
154    self.records = {}
155
156  def open(self):
157    if self.type == 'map':
158      self.fh = open( self.file, 'a' )
159    else:
160      self.sh = shelve.open( self.file )
161
162  def close(self):
163    if self.type == 'map':
164      self.fh.close()
165    else:
166      self.sh.close()
167
168  def add(self,fpath,drs,safe=True):
169    assert self.type == 'map','Can only do map files at present'
170    assert type(drs) == type( {} ), '2nd user argument to method add should be a dictionary [%s]' % type(drs)
171    tpath = self.pathTmpl % drs
172    if not self.dummy:
173      assert os.path.isfile( fpath ), 'File %s not found' % fpath
174      fdate = time.ctime(os.path.getmtime(fpath))
175      sz = os.stat(fpath).st_size
176    else:
177      fdate = "na"
178      sz = 0
179    record = '%s | OK | %s | modTime = %s | target = %s ' % (fpath,sz,fdate,tpath)
180    for k in ['creation_date','tracking_id']:
181      if k in drs.keys():
182        record += ' | %s = %s' % (k,drs[k])
183
184    fn = string.split( fpath, '/' )[-1]
185    self.records[fn] = record
186 
187  def modify(self,fn,msg):
188    assert fn in self.records.keys(),'Attempt to modify non-existent record %s, %s' % [fn,str(self.records.keys()[0:10])]
189    if string.find( self.records[fn], '| OK |') == -1:
190      ##print 'File %s already flagged with errors' % fn
191      return
192    s = string.replace( self.records[fn], '| OK |', '| %s |' % msg )
193    ##print '--> ',s
194    self.records[fn] = s
195
196  def dumpAll(self,safe=True):
197    keys = self.records.keys()
198    keys.sort()
199    for k in keys:
200      self.dump( self.records[k], safe=safe )
201
202  def dump( self, record, safe=True ):
203    if safe:
204      self.open()
205    self.fh.write( record + '\n' )
206    if safe:
207      self.close()
208
209  def addErr(self,fpath,reason,safe=True):
210    record = '%s | %s' % (fpath, reason)
211    fn = string.split( fpath, '/' )[-1]
212    self.records[fn] = record
213
214class checker:
215  def __init__(self, pcfg, cls):
216    self.info = dummy()
217    self.info.pcfg = pcfg
218    self.calendar = 'None'
219    self.cfn = utils.checkFileName( parent=self.info,cls=cls)
220    self.cga = utils.checkGlobalAttributes( parent=self.info,cls=cls)
221    self.cgd = utils.checkStandardDims( parent=self.info,cls=cls)
222    self.cgg = utils.checkGrids( parent=self.info,cls=cls)
223    self.cls = cls
224
225    # Define vocabs based on project
226    ##self.vocabs = getVocabs(pcgf)
227    self.vocabs = pcfg.vocabs
228
229  def checkFile(self,fpath,log=None,attributeMappings=[]):
230    self.calendar = 'None'
231    self.info.log = log
232
233    fn = string.split( fpath, '/' )[-1]
234
235    if attributeMappings != []:
236      ncReader.loadNc( fpath )
237      ncReader.applyMap( attributeMappings, self.cfn.globalAttributesInFn, log=log )
238      ncRed = True
239      thisFn = ncReader.fn
240    else:
241      ncRed = False
242      thisFn = fn
243
244    self.cfn.check( thisFn )
245    if not self.cfn.completed:
246      self.completed = False
247      return
248    if not self.info.pcfg.project[:2] == '__':
249      if not os.path.isfile( fpath ):
250        print 'File %s not found [2]' % fpath
251        self.completed = False
252        return
253
254    if not ncRed:
255      ncReader.loadNc( fpath )
256    self.ga = ncReader.ga
257    self.va = ncReader.va
258    self.da = ncReader.da
259
260    self.cga.check( self.ga, self.va, self.cfn.var, self.cfn.freq, self.vocabs, self.cfn.fnParts )
261    if not self.cga.completed:
262      self.completed = False
263      return
264
265    ##self.cgd.plevRequired = config.plevRequired
266    ##self.cgd.plevValues = config.plevValues
267    ##self.cgd.heightRequired = config.heightRequired
268    ##self.cgd.heightValues = config.heightValues
269    ##self.cgd.heightRange = config.heightRange
270    self.cgd.check( self.cfn.var, self.cfn.freq, self.da, self.va, self.cga.isInstantaneous )
271    self.calendar = self.cgd.calendar
272    if not self.cgd.completed:
273      self.completed = False
274      return
275
276    if self.pcfg.doCheckGrids:
277      ##self.cgg.rotatedPoleGrids = config.rotatedPoleGrids
278      ##self.cgg.interpolatedGrids = config.interpolatedGrids
279      self.cgg.check( self.cfn.var, self.cfn.domain, self.da, self.va )
280   
281      if not self.cgg.completed:
282        self.completed = False
283        return
284    self.completed = True
285    self.drs = self.cga.getDrs()
286    self.errorCount = self.cfn.errorCount + self.cga.errorCount + self.cgd.errorCount + self.cgg.errorCount
287
288class c4_init:
289
290  def __init__(self):
291    self.logByFile = True
292    args = sys.argv[1:]
293    nn = 0
294
295    self.attributeMappingFile = None
296    self.recordFile = 'Rec.txt'
297    self.logDir = 'logs_02'
298   
299    # Set default project to "CORDEX"
300    self.project = "CORDEX"
301
302    while len(args) > 0:
303      next = args.pop(0)
304      if next == '-f':
305        flist = [args.pop(0),]
306        self.logByFile = False
307      elif next == '-d':
308        fdir = args.pop(0)
309        flist = glob.glob( '%s/*.nc' % fdir  )
310      elif next == '-D':
311        flist  = []
312        fdir = args.pop(0)
313        for root, dirs, files in os.walk( fdir ):
314          for f in files:
315            fpath = '%s/%s' % (root,f)
316            if os.path.isfile( fpath ) and f[-3:] == '.nc':
317              flist.append( fpath )
318      elif next == '-R':
319        self.recordFile = args.pop(0)
320      elif next == '--ld':
321        self.logDir = args.pop(0)
322      elif next == '--aMap':
323        self.attributeMappingFile = args.pop(0)
324        assert os.path.isfile( self.attributeMappingFile ), 'The token "--aMap" should be followed by the path or name of a file'
325      elif next == "-p":
326        self.project = args.pop(0)
327      else:
328       print 'Unused argument: %s' % next
329       nn+=1
330    assert nn==0, 'Aborting because of unused arguments'
331
332    if self.project[:2] == '__':
333       flist = []
334       ss = 'abcdefgijk'
335       ss = 'abc'
336       ss = 'abcdefgijklmnopqrstuvwxyz'
337       for i in range(10):
338         v = 'v%s' % i
339         for a in ss:
340           for b in ss:
341             flist.append( '%s_day_%s_%s_1900-1909.nc' % (v,a,b) )
342    flist.sort()
343    fnl = []
344    nd = 0
345    for f in flist:
346      fn = string.split(f, '/')[-1]
347      if fn in fnl:
348        print 'ERROR: file name duplicated %s' % fn
349        nd += 0
350      else:
351        fnl.append(fn)
352    assert nd == 0, 'Duplicate file names encountered'
353    self.flist = flist
354    self.fnl = fnl
355    if not os.path.isdir(   self.logDir ):
356       os.mkdir(   self.logDir )
357
358    tstring1 = '%4.4i%2.2i%2.2i_%2.2i%2.2i%2.2i' % time.gmtime()[0:6]
359    batchLogFile = '%s/qcBatchLog_%s.txt' % (  self.logDir,tstring1)
360## default appending to myapp.log; mode='w' forces a new file (deleting old contents).
361    self.logger = logging.getLogger('c4logger')
362    self.hdlr = logging.FileHandler(batchLogFile,mode='w')
363    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
364    self.hdlr.setFormatter(formatter)
365    self.logger.setLevel(logging.INFO)
366    self.logger.addHandler(self.hdlr)
367
368    self.attributeMappings = []
369    if self.attributeMappingFile != None:
370      for l in open( self.attributeMappingFile ).readlines():
371        if l[0] != '#':
372          bb = string.split( string.strip(l), '|' ) 
373          assert len(bb) ==2, "Error in experimental module attributeMapping -- configuration line not scanned [%s]" % str(l)
374          bits = string.split( bb[0], ';' )
375          cl = []
376          for b in bits:
377            cl.append( string.split(b, '=' ) )
378          self.attributeMappings.append( ('am001',cl, string.split(bb[1],'=') ) )
379    print self.attributeMappings
380
381  def getFileLog( self, fn, flf=None ):
382    if flf == None:
383      tstring2 = '%4.4i%2.2i%2.2i' % time.gmtime()[0:3]
384      self.fileLogFile = '%s/%s__qclog_%s.txt' % (self.logDir,fn[:-3],tstring2)
385      m = 'w'
386    else:
387      m = 'a'
388      self.fileLogFile = flf
389
390    self.fLogger = logging.getLogger('fileLog_%s_%s' % (fn,m))
391    if m == 'a':
392      self.fHdlr = logging.FileHandler(self.fileLogFile)
393    else:
394      self.fHdlr = logging.FileHandler(self.fileLogFile,mode=m)
395    fileFormatter = logging.Formatter('%(message)s')
396    self.fHdlr.setFormatter(fileFormatter)
397    self.fLogger.addHandler(self.fHdlr)
398    self.fLogger.setLevel(logging.INFO)
399    return self.fLogger
400
401  def closeFileLog(self):
402    self.fLogger.removeHandler(self.fHdlr)
403    self.fHdlr.close()
404
405
406if __name__ == '__main__':
407
408  logDict = {}
409  c4i = c4_init()
410  isDummy  = c4i.project[:2] == '__'
411  if (not withCdms) and isDummy:
412     print withCdms, c4i.project
413     print 'Cannot proceed with non-dummy project without cdms'
414     raise
415  pcfg = config.projectConfig( c4i.project )
416  cc = checker(pcfg, cls = c4i.project)
417  rec = recorder( c4i.recordFile, dummy=isDummy )
418  ncReader = fileMetadata(dummy=isDummy)
419  monitorFileHandles = False
420  if monitorFileHandles:
421    monitor = utils.sysMonitor()
422  else:
423    monitor = None
424
425  cal = None
426
427  c4i.logger.info( 'Starting batch -- number of file: %s' % (len(c4i.flist)) )
428
429  cbv = utils.checkByVar( parent=cc.info,cls=c4i.project,monitor=monitor)
430  cbv.impt( c4i.flist )
431
432  for f in c4i.flist:
433    if monitorFileHandles:
434      nofhStart = monitor.get_open_fds()
435    fn = string.split(f,'/')[-1]
436    c4i.logger.info( 'Starting: %s' % fn )
437    try:
438### need to have a unique name, otherwise get mixing of logs despite close statement below.
439      if c4i.logByFile:
440        fLogger = c4i.getFileLog( fn )
441        logDict[fn] = c4i.fileLogFile
442        c4i.logger.info( 'Log file: %s' % c4i.fileLogFile )
443      else:
444        fLogger = c4i.logger
445
446      fLogger.info( 'Starting file %s' % fn )
447## default appending to myapp.log; mode='w' forces a new file (deleting old contents).
448      cc.checkFile( f, log=fLogger,attributeMappings=c4i.attributeMappings )
449
450      if cc.completed:
451        if cal not in (None,'None'):
452          if cal != cc.calendar:
453            c4i.logger.info( 'Error: change in calendar attribute %s --> %s' % (cal, cc.calendar) )
454            fLogger.info( 'Error: change in calendar attribute %s --> %s' % (cal, cc.calendar) )
455            cc.errorCount += 1
456        cal = cc.calendar
457
458      if c4i.logByFile:
459        if cc.completed:
460          fLogger.info( 'Done -- error count %s' % cc.errorCount )
461        else:
462          fLogger.info( 'Done -- checks not completed' )
463        c4i.closeFileLog( )
464
465      if cc.completed:
466        c4i.logger.info( 'Done -- error count %s' % cc.errorCount ) 
467        if cc.errorCount == 0:
468          rec.add( f, cc.drs )
469        else:
470          rec.addErr( f, 'ERRORS FOUND | errorCount = %s' % cc.errorCount )
471      else:
472        c4i.logger.info( 'Done -- testing aborted because of severity of errors' )
473        rec.addErr( f, 'ERRORS FOUND AND CHECKS ABORTED' )
474    except:
475      c4i.logger.error("Exception has occured" ,exc_info=1)
476      rec.addErr( f, 'ERROR: Exception' )
477    if monitorFileHandles:
478      nofhEnd = monitor.get_open_fds()
479      if nofhEnd > nofhStart:
480         print 'Open file handles: %s --- %s' % (nofhStart, nofhEnd)
481
482  cc.info.log = c4i.logger
483 
484  if c4i.project != 'SPECS':
485     cbv.c4i = c4i
486     cbv.setLogDict( logDict )
487     cbv.check( recorder=rec, calendar=cc.calendar)
488  rec.dumpAll()
489  c4i.hdlr.close()
490##else:
491  ##f1 = '/data/u10/cordex/AFR-44/SMHI/ECMWF-ERAINT/evaluation/SMHI-RCA4/v1/day/clh/clh_AFR-44_ECMWF-ERAINT_evaluation_r1i1p1_SMHI-RCA4_v1_day_19810101-19851231.nc'
492  ##f2 = '/data/u10/cordex/AFR-44/SMHI/ECMWF-ERAINT/evaluation/SMHI-RCA4/v1/sem/tas/tas_AFR-44_ECMWF-ERAINT_evaluation_r1i1p1_SMHI-RCA4_v1_sem_200012-201011.nc'
493  ##f3 = '/data/u10/cordex/AFR-44i/SMHI/ECMWF-ERAINT/evaluation/SMHI-RCA4/v1/mon/tas/tas_AFR-44i_ECMWF-ERAINT_evaluation_r1i1p1_SMHI-RCA4_v1_mon_199101-200012.nc'
494  ##cc.checkFile( f3 )
Note: See TracBrowser for help on using the repository browser.