source: CedaCdoScripts/branch/c1/cdo_basic.py @ 203

Subversion URL: http://proj.badc.rl.ac.uk/svn/exarch/CedaCdoScripts/branch/c1/cdo_basic.py@203
Revision 203, 37.0 KB checked in by mjuckes, 7 years ago (diff)

modifications for mclim30; adding option to use rcp45 as extension for mohc historical data

  • Property svn:executable set to *
Line 
1#!/usr/bin/python
2
3import glob, string, os, collections
4import subprocess, time, sys
5##import cdms2 as cdms
6import netCDF4
7nc4 = True
8import random, time, logging, getopt, shelve
9
10debug = True
11cleanUpAsUGo = True
12args = getopt.getopt( sys.argv[1:], 'i:m:e:r:b:x:o:', ['onlyVars=','scratch=','omitVars=','version=','log='] )
13ad = { 'i':'inst', 'm':'model', 'e':'ensem', 'r':'realm', 'b':'batch', 'x':'expt', 'o':'outDir' }
14
15knownops = ['am','sm','bm','mclim5','mclim30','cdd','cwd','tr', 'fd']
16optag = args[-1][0]
17assert optag in knownops, 'First argument [%s] should be operation in: %s' % (optag, str(knmownops))
18processing_batch='aaaa0001'
19qcBaseDir = '/badc/cmip5/metadata/processing/qc/cedaqc/'
20qcBaseDir = None
21if qcBaseDir != None:
22  assert os.path.isdir( qcBaseDir ), 'qcBaseDir:%s not found' % qcBaseDir
23
24fullYears = True
25
26print args
27edict = {}
28for a in args[0]:
29  edict[a[0][1:]] = a[1] 
30
31def writeat( fh, tag, str,log):
32  if len(str) + len(tag) + 2 > 1024 :
33    log.warn( 'TRUNCATING global attribute %s: %s' % (tag,str) )
34    strl = 1022 - len(tag)
35    str = str[:strl]
36  fh.write( '%s %s\n' % (tag,str) )
37
38fullYearNeeded = False
39fullYearWanted = True
40ok = True
41for k in ad.keys():
42  if not edict.has_key(k):
43    print 'required argument %s [%s] missing' % (k,ad[k])
44    ok = False
45assert ok, 'Not all required arguments found'
46## supported operations are box-mean and annual mean/
47##inst = 'MOHC'
48##model = 'HadGEM2-ES'
49##inst = 'NCC'
50##model = 'NorESM1-M'
51##inst = 'CNRM-CERFACS'
52##model = 'CNRM-CM5'
53##inst = 'CSIRO-QCCCE'
54##model = 'CSIRO-Mk3-6-0'
55##inst = 'IPSL'
56##model = 'IPSL-CM5A-LR'
57##realm = 'ocean'
58##realm = 'atmos'
59##batch = 'b011_'
60##onlyVars = ['tas']
61project = 'cmip5'
62ensem = edict['e']
63inst = edict['i']
64model = edict['m']
65realm = edict['r']
66experiment = edict['x']
67batch = edict['b'] + '_'
68processing_batch = edict['b']
69opdir = edict['o']
70onlyVars = None
71omitVars = None
72product = 'output1'
73assert not( edict.has_key( '-onlyVars' ) and edict.has_key( '-omitVars' ) ), 'Specifying both onlyVars and omitVars is an error'
74
75## qcDir = '%s%s/%s/%s/%s/' % (qcBaseDir,product,inst, model, expt)
76
77vstring = '%s%2.2i%2.2i' % time.gmtime()[0:3]
78doctag = optag
79if optag in ['cdd','cwd']:
80  doctag = 'precip_indices'
81if optag in ['tr','fd']:
82  doctag = 'temperature_indices'
83doc_filename = 'ceda_cmip5_processing_%s.pdf' % doctag
84tstring = '%s%2.2i%2.2i-%2.2i%2.2i%2.2i' % time.gmtime()[0:6]
85log_filename = edict.get( '-log', 'ceda_cmip5_processing_log_%s_%s.txt' % (optag,tstring) )
86batching='byFile'
87ofreq = 'yr'
88ifreq = 'mon'
89if optag in ['mclim5']:
90  batching = '5yr'
91  monthsPerBatch = 60
92elif optag in ['mclim30']:
93  batching = '30yr'
94  monthsPerBatch = 360
95elif optag in ['cdd','cwd','tr','fd']:
96  batching = 'yr'
97  ifreq = 'day'
98  monthsPerBatch = 12
99
100procid = '%s_%s_%s_%s_%s_%s' % (optag,inst,model,experiment,realm,ensem)
101sh = shelve.open( 'cdo_basic_sh' )
102if not sh.has_key( procid ):
103  sh[procid] = {}
104
105scriptName = 'cdo_basic.py'
106scriptVersion = 0.9
107log = logging.getLogger( __name__ )
108fh = logging.FileHandler( log_filename )
109log.level = logging.INFO
110log.addHandler( fh )
111creation_date = time.ctime()
112log.info( 'Starting %s wrapper script, version %5.2f' % (scriptName, scriptVersion) )
113log.info( 'Date: %s' % creation_date )
114log.info( 'Operation: %s' % optag )
115log.info( 'Institute: %s' % inst )
116log.info( 'Model: %s' % model )
117log.info( 'Realm: %s' % realm )
118if edict.has_key( '-onlyVars' ):
119   onlyVars = string.split( edict['-onlyVars'], ',' )
120   log.info( 'Only including variables: %s' % edict['-onlyVars'] )
121if edict.has_key( '-omitVars' ):
122   omitVars = string.split( edict['-omitVars'], ',' )
123   log.info( 'Omitting variables: %s' % edict['-omitVars'] )
124if edict.has_key( '-version' ):
125   version = edict['-version']
126   log.info( 'Processing version: %s' % edict['-version'] )
127else:
128   version = None
129## log.info( 'CDO version information:' )
130## os.popen( 'cdo -V > .cdo_version' ).readlines()
131## for l in open( '.cdo_version' ).readlines():
132  ## log.info( string.strip(l) )
133## log.info( 'End of CDO version information.' )
134
135cx = '0123456789qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM'
136puid = ''
137for k in range(6):
138   puid += cx[random.randint(0,len(cx)-1 )]
139
140tmpdir = edict.get( '-scratch', 'tmp/cdo_basic_wrk/%s/' % puid )
141
142if not os.path.isdir(tmpdir):
143  os.mkdir( tmpdir )
144  log.info( 'Creating temporary directory: %s ' % tmpdir )
145
146if tmpdir[-1] != '/':
147    tmpdir += '/'
148
149assert not os.path.isfile( '%scdo_basic_lock' % tmpdir ), 'Lock file exists in work directory %s' % tmpdir
150
151os.popen( 'touch %scdo_basic_lock' % tmpdir ).readlines()
152assert os.path.isfile( '%scdo_basic_lock' % tmpdir ), 'Failed to make lock file in work directory %s' % tmpdir
153
154plurality = 'single'
155mdbase = "http://badc.nerc.ac.uk/browse/badc/cmip5/metadata/processing/" 
156
157rcout = open( 'cdo_stdout.txt', 'w' )
158rcerr = open( 'cdo_stderr.txt', 'w' )
159def run_cmd( cmd, comment='', strict=True, append=None, out=None  ):
160    if type(cmd) == type('x'):
161      args = map( lambda x: string.strip(x, "'"), string.split( cmd ))
162    else:
163      args = cmd
164    log.info( cmd )
165    if append != None:
166      fho = open( append, 'a' )
167      clo = True
168    elif out != None:
169      fho = open( out, 'w' )
170      clo = True
171    else:
172      fho = rcout
173      clo = False
174    proc = subprocess.Popen( args, stdout=fho, stderr=rcerr, env={ 'LD_LIBRARY_PATH':'/usr/local/lib'} )
175    std = proc.communicate()
176    rc = proc.returncode
177    if rc != 0:
178      print args
179      print std[0]
180      print std[1]
181      if strict:
182        assert rc == 0, 'Failed (%s) [rc=%s] executing %s' % (comment,rc,str(cmd))
183      elif rc != 0:
184        print 'Failed (%s) [rc=%s] executing %s' % (comment,rc,str(cmd))
185    if clo:
186      fho.close()
187    return rc
188
189dir_tmpl_s = '/disks/fuse/cmip5/output1/MOHC/HadGEM2-ES/%(expt)s/%(ifreq)s/atmos/Amon/'
190dir_tmpl_s = '/badc/cmip5/data/cmip5/output1/MOHC/HadGEM2-ES/%(expt)s/%(ifreq)s/atmos/Amon/'
191if ifreq == 'mon':
192  mip = {'atmos':'Amon', 'ocean':'Omon', 'land':'Lmon' }[realm]
193elif ifreq == 'day':
194  mip = 'day'
195else:
196  raise 'Should not be here'
197
198dir_tmpl_r = '/badc/cmip5/data/cmip5/output1/%(inst)s/%(model)s/%(expt)s/%(ifreq)s/%(realm)s/%(mip)s/%(ensem)s/'
199dir_tmpl_e = '/badc/cmip5/data/cmip5/output1/%(inst)s/%(model)s/%(expt)s/%(ifreq)s/%(realm)s/%(mip)s/%(ensem)s/latest/'
200dir_tmpl = dir_tmpl_e + '%(var)s/'
201##
202blBaseDir = '/group_workspaces/jasmin/cedaproc/mjuckes/work/qc/cmip5_bl'
203bldir_tmpl =  blBaseDir + '/output1_%(inst)s_%(model)s_%(expt)s/'
204blfile_tmpl = '%(ifreq)s_%(realm)s_%(mip)s_%(ensem)s_%(version)s.txt'
205
206
207def get_var_from_bl_record0( rec ):
208  bits = string.split( rec, ':' )
209  if len(bits) < 2:
210    return None
211  b2 = string.split( bits[1], '/' )
212  if len(b2) == 0:
213    return None
214  var = string.split( b2[-1], '_' )[0]
215  return var
216
217def get_var_from_bl_record( rec ):
218  bits = string.split( rec, ':' )
219  if len(bits) < 2:
220    return None
221  var = bits[0]
222  return var
223
224def get_version_dir(version=None):
225
226  pdict = { 'inst':inst, 'model':model, 'expt':expt, 'ensem':ensem, 'ifreq':ifreq, 'realm':realm, 'mip':mip }
227  rdir = dir_tmpl_r % pdict
228  if version != None:
229    this_v = rdir + version
230    assert os.path.isdir( this_v ), 'Version directory %s not found' % this_v
231    return this_v
232
233  vlist = glob.glob( rdir + 'v*' )
234  assert len(vlist) > 0, 'No version directories found in %s' % rdir
235  vlist.sort()
236  return vlist[-1]
237
238
239fileidrec = collections.namedtuple( 'FileIdRecord', ['stem','starty','endy','startm','endm','isDefDir','dir','ext'] )
240
241def get_expt_fl(expt,var,latest=False):
242  pdict = { 'inst':inst, 'model':model, 'expt':expt, 'var':var, 'ensem':ensem, 'ifreq':ifreq, 'realm':realm, 'mip':mip }
243  bldir = bldir_tmpl % pdict
244  rdir = dir_tmpl_r % pdict
245  if not latest:
246    vlist = glob.glob( rdir + 'v*' )
247    vlist.sort()
248    edir = '%s/%s/' % (vlist[-1],var)
249  else:
250    edir = '%s/latest/%s/' % (rdir,var)
251  ## edir = dir_tmpl % { 'inst':inst, 'model':model, 'expt':expt, 'var':var, 'ensem':ensem, 'ifreq':ifreq, 'realm':realm, 'mip':mip }
252  assert os.path.isdir( edir), 'Error in get_expt_fl: %s is not a directory' % edir
253  log.info( 'edir:: %s' % edir )
254  fl = glob.glob( edir + '*.nc' ) + glob.glob( edir + '*.nc4' )
255  assert len(fl) > 0, 'No netcdf files found in %s' % edir
256  fl.sort()
257  fm = []
258  blvars = []
259  withRcp45Ext = False
260  if os.path.isdir( bldir ):
261    if not latest:
262      this_version = string.split( string.strip(vlist[-1], '/'), '/' )[-1]
263    else:
264      this_version = 'latest'
265    pdict['version'] = this_version
266    blfile = blfile_tmpl % pdict
267    if os.path.isfile( bldir + blfile ):
268      blvars = map( get_var_from_bl_record, open( bldir + blfile ).readlines() )
269      if var in blvars:
270        log.info( 'Skipping black-listed ADS expt=%s, var=%s' % (expt,var) )
271        return fm
272      elif '%s__rcp45' % var in blvars:
273        log.info( 'Using rcp45 data to complete time range expt=%s, var=%s' % (expt,var) )
274        withRcp45Ext = True
275      else:
276        log.info( 'Var %s not in black list [%s] %s' % (var, bldir + blfile, str(blvars)) )
277    else:
278      log.info( 'Black list file %s not present' % (bldir+blfile) )
279  else:
280    log.info( 'Black list directory %s not present' % bldir )
281  if withRcp45Ext:
282    altDir = string.replace( edir, pdict['expt'], 'rcp45' )
283    fl2 = glob.glob( altDir + '*.nc' ) + glob.glob( altDir + '*.nc4' )
284    assert len(fl2) > 0, 'No netcdf files found in %s' % altDir
285    fl2.sort()
286    fl.append(  fl2[0] )
287  for f in fl:
288    fn = string.split(f, '/')[-1]
289    stem = string.split( fn, '.')[0]
290    bits = string.split( stem, '_' )
291    if len(bits) > 5:
292      ts = string.split(bits[5],'-')
293      starty = long(ts[0][0:4])
294      if len(ts[0]) >= 6:
295        startm = long(ts[0][4:6])
296        if len(ts) == 2:
297          endm = long(ts[1][4:6])
298        else:
299          endm = startm
300      else:
301        startm = 1
302        endm = 12
303      if len(ts) == 2:
304        endy = long(ts[1][0:4])
305      else:
306        endy = starty
307      if withRcp45Ext and f == fl[-1]:
308        fm.append( fileidrec(stem,starty,endy,startm,endm,False,altDir,None) )
309      else:
310        fm.append( fileidrec(stem,starty,endy,startm,endm,True,None,None) )
311    else:
312      fm.append( fileidrec(stem,None,None,None,None,True,None,None) )
313      assert batching != '5yr', 'No time bounds in filename: %s (batching = %s)' % (fn,batching)
314  return fm
315
316go = False
317go = True
318ofiles = {}
319ifiles = {}
320eextras = {}
321tids = {}
322varld = {}
323hists = {}
324crds = {}
325
326def listAnd( l1, l2 ):
327  l3 = []
328  for i in l1:
329    if i in l2:
330      l3.append(i)
331  return l3
332
333def listExcl( l1, l2 ):
334  l3 = []
335  for i in l1:
336    if i not in l2:
337      l3.append(i)
338  return l3
339
340expts = [experiment,]
341if optag in ['cdd','cwd']:
342   onlyVars = ['pr']
343   omitVars = None
344if optag in ['tr','fd']:
345   onlyVars = ['tasmin']
346   omitVars = None
347
348for expt in expts:
349  ##dir = dir_tmpl_e % locals()
350  dir = get_version_dir( version=version)
351  vars = glob.glob( '%s/*' % dir )
352  thisvarl = map( lambda x: string.split(x,'/')[-1], vars )
353  thisvarl.sort()
354  if onlyVars == None:
355    if omitVars == None:
356      varld[expt] = thisvarl[:]
357    else:
358      varld[expt] = listExcl( thisvarl, omitVars )
359  else:
360    varld[expt] = listAnd( thisvarl, onlyVars )
361    if len(varld[expt]) == 0:
362       print 'No common elements found:'
363       print thisvarl
364       print onlyVars
365       raise
366 
367  for var in varld[expt]:
368    ofiles['%s_%s' % (expt,var)] = ''
369    ifiles['%s_%s' % (expt,var)] = ''
370    tids['%s_%s' % (expt,var)] = ''
371
372ye = []
373badkeys = []
374for expt in expts:
375  for var in varld[expt]:
376     fm = get_expt_fl( expt, var, latest=True )
377     if len(fm) > 0:
378        ye.append( (expt,var,fm) )
379     else:
380        print 'Bad key: %s_%s' % (expt,var), fm, type(fm).__name__
381        badkeys.append( '%s_%s' % (expt,var) )
382
383
384## sys.exit(0)
385
386lon1,lon2,lat1,lat2 = (60.,120.,30.,60.)
387cdoop, ofreq,opName, proc_description = { 'am':('divdpy -yearsum -muldpm', 'yr','Annual mean','Annual mean of monthly mean data'), \
388                        'sm':('divdpy -seassum -muldpm', 'yr','Seasonal mean','Annual seasonal of monthly mean data'), \
389                        'bm':('fldmean -sellonlatbox,%s,%s,%s,%s' % (lon1,lon2,lat1,lat2),ifreq,'Box mean','Spatial mean over a latitude-longitude box'), \
390                        'mclim5':('ymonavg', '5yr','5-year monthly climatology','Monthly climatology for 5 year period: 5 year mean for each month'),
391                        'mclim30':('ymonavg', '30yr','30-year monthly climatology','Monthly climatology for 30 year period: 30 year mean for each month'),
392                        'cdd':('eca_cdd','yr','Continuous dry days','Maximum of continuous dry days in a year'),
393                        'cwd':('eca_cwd','yr','Consecutive wet days','Maximum of consecutive wet days in a year'),
394                        'tr':('eca_tr','yr','Tropical nights','Count of tropical nights per year'),
395                        'fd':('eca_fd','yr','Frost days','Count of frost days (Tmin below zero) per year') }[optag]
396## opdir = '%s%s' % (batch,optag)
397if go and not os.path.isdir( opdir ):
398  os.mkdir( opdir )
399
400step1 = False
401step1 = True
402go1 = True
403log.info( 'CDO command: %s ' % cdoop )
404eeproc = {}
405k=-1
406print 'INFO0080: ',k, str(k), str( [1,2,3] )
407##for expt,var,fm in ye:
408mclim30batching = [1901,1961,1976]
409def run_evf( expt,var,fm ):
410    kk = 0
411    dir = get_version_dir( version=version)
412    ##edir = dir_tmpl % locals()
413    edir = '%s/%s/' % (dir,var)
414    key = '%s_%s' % (expt,var)
415    rp1 = os.path.realpath( edir )
416    rpath = os.path.realpath( rp1 +'/' + fm[0][0] + '.nc' )
417    rdir = os.path.split( rpath )[0]
418    mtime = time.localtime(os.path.getmtime(rdir))
419    timenow = '%4.4i%2.2i%2.2i' % time.localtime()[0:3]
420    modtime = '%4.4i%2.2i%2.2i' % mtime[0:3]
421    log.info( 'Starting expt=%s, var=%s [# files = %s, dir mod time: %s]' % (expt,var,len(fm),modtime ) )
422    log.info( 'Data directory: %s' % (rdir) )
423     
424    if not eeproc.has_key( var ):
425      print 'Creating element for %s' % var
426      if sh[procid].has_key(var):
427        eeproc[var] = sh[procid][var]
428        eeproc[var].append( (modtime,len(fm),timenow,rdir) )
429      else:
430        eeproc[var] = [(modtime,len(fm),timenow,rdir),]
431    else:
432      eeproc[var].append( (modtime,len(fm),timenow,rdir) )
433    for file in fm:
434        if nc4:
435          if file.isDefDir:
436            fpath =  '%s%s.nc' % (edir,file[0]) 
437          else:
438            print 'using alternative directory: %s ' % file.dir
439            fpath =  '%s%s.nc' % (file.dir,file[0]) 
440          nc = netCDF4.Dataset(fpath, 'r')
441          kl = ['tracking_id','creation_date','table_id','realization','initialization_method','physics_version','history']
442          tid,crd,table_id,realization,initialization_method,physics_version,this_hist = map( lambda x: nc.getncattr(x), kl )
443          calendar = nc.variables['time'].getncattr('calendar')
444          nc.close()
445        else:
446
447          nc = cdms.open( '%s%s.nc' % (edir,file[0]) )
448          tid = nc.tracking_id
449          crd = nc.creation_date
450          table_id = nc.table_id
451          realization = nc.realization[0]
452          initialization_method = nc.initialization_method[0]
453          physics_version = nc.physics_version[0]
454          this_hist = nc.history
455          if 'time' in nc.axes.keys():
456           calendar = nc.getAxis('time').attributes.get( 'calendar', 'none' )
457           if calendar == 'none':
458             print 'calendar attribute not found'
459          else:
460             print 'time axis not found'
461             calendar = 'none'
462          print 'calendar = %s' % calendar
463          nc.close()
464        log.info( 'File: %s; tracking id: %s; creation date: %s' % (file[0],tid,crd) )
465 
466        tids[key] += '%s,' % tid
467        crds[key] = crd
468        if not hists.has_key(key):
469           hists[key] = this_hist
470        ifiles[key] += '%s,' % file[0]
471    if not fm[-1].isDefDir:
472      eextras[key] = 'withExt'
473
474    startstart = None
475    nbfb = 0
476    nbfbb = 0
477    log.info( 'batching: %s, %s, %s' % (batching,expt,var) )
478    if batching == 'byFile':
479      smem = []
480      syey = []
481      for file in fm:
482        fn = string.split( file[0], '/' )[-1]
483        dseg = string.split( string.split( fn, '.' )[0], '_' )[-1]
484        sd, ed = string.split( dseg, '-' )
485## ta True --> file does not start in Jan.
486        ta = len(sd) > 4 and sd[4:6] != '01'
487        tb = len(ed) > 4 and ed[4:6] != '12'
488        if ta or tb:
489          nbfb +=1
490        if ta and tb:
491          nbfbb += 1
492        else:
493          smem.append( ( int( sd[4:6] ), int( ed[4:6] ) ) )
494          syey.append( ( int( sd[0:4] ), int( ed[0:4] ) ) )
495      ##  if len(sd) > 4 and sd[4:6] != '01':
496            ##nbfb += 1
497        ##if len(ed) > 4 and ed[4:6] != '12':
498            ##nbfb += 1
499     
500      done = False
501      nodir = False
502      if nbfb > 0:
503        log.info( 'File date boundaries do not match end of year (%s,%s): merging: %s, %s' % (nbfb, nbfbb, expt,var) )
504        if nbfbb > 0:
505          cfls = ''
506          ofile = ' %sofile_%s_%s_%s.nc' % (tmpdir,expt,var,kk)
507          if len(fm) > 1:
508            if fullYears and fm[0][3] != 1:
509              assert len(fm) > 1, 'Not ready for this case with single input file'
510              yl = range(fm[0][1]+1,fm[0][2]+1)
511              cyl = string.join( map( lambda x:'%s' % x, yl), ',' )
512              cmd = '/usr/bin/cdo -s selyear,%s  %s%s.nc  %s/%s' % (cyl, edir, fm[0][0], tmpdir,'.tmp1.nc')
513              rc = run_cmd( cmd, strict=True )
514              cfls = ' %s/.tmp1.nc' % tmpdir
515              fi0 = 1
516            else:
517              fi0 = 0
518
519            if fullYears and fm[-1][4] != 12:
520              fi9 = -1
521            else:
522              fi9 = len(fm)
523
524            log.info( str( fm[-1]  ))
525            for file in fm[fi0:fi9]:
526              cfls += ' %s%s.nc' % (edir, file[0])
527
528            if fullYears and fm[-1][4] != 12:
529              yl = range(fm[-1][1],fm[-1][2])
530              cyl = string.join( map( lambda x:'%s' % x, yl), ',' )
531              cmd = '/usr/bin/cdo -s selyear,%s  %s%s.nc  %s/%s' % (cyl, edir, fm[-1][0], tmpdir,'.tmp2.nc')
532              rc = run_cmd( cmd, strict=True )
533              cfls += ' %s/.tmp2.nc' % tmpdir
534
535            tfile = '%s/.tmp_ofile_%s_%s.nc' % (tmpdir,expt,var)
536            cmd1 = '/usr/bin/cdo -s mergetime %s %s' % (cfls,tfile)
537            cmd = '/usr/bin/cdo -s %s %s %s' % (cdoop, tfile, ofile)
538            log.info( 'Merging file %s\n' % cmd1 )
539          else:
540            yl = range(fm[0][1]+1,fm[0][2])
541            cyl = string.join( map( lambda x:'%s' % x, yl), ',' )
542            cmd = '/usr/bin/cdo -s selyear,%s  %s%s.nc  %s/%s' % (cyl, edir, fm[0][0], tmpdir,'.tmp1.nc')
543            rc = run_cmd( cmd, strict=True )
544            cmd1 = None
545            cmd = '/usr/bin/cdo -s %s %s/%s %s' % (cdoop, tmpdir, '.tmp1.nc', ofile)
546          log.info( 'cdo command: %s\n' % cmd )
547          ## raise 'bad file boundary'
548           
549          if go and step1 and go1:
550            if cmd1 != None:
551              rc = run_cmd( cmd1, strict=True )
552            else:
553              rc = 0
554
555            if rc != 0:
556              badkeys.append(key)
557            else:
558              rc = run_cmd( cmd, strict=True )
559              if rc != 0:
560                badkeys.append(key)
561
562            if cleanUpAsUGo:
563              ## short pause to make sure cmd has really finished
564              time.sleep( 1 )
565              for f in [tfile, '%s/%s' % (tmpdir, '.tmp1.nc'), '%s/%s' % (tmpdir, '.tmp2.nc')]:
566                if os.path.isfile( f ):
567                  os.unlink( f )
568          else:
569            print 'single: %s, %s' % (cmd1, cmd)
570          ofiles[key] += ' %s' % (ofile)
571          done = True
572        else:
573          i0 = 0
574          nf = len(fm)
575          if smem[0][0] != 1:
576            log.info(  'Omitting first file: %s' % str(fm[0]) )
577            i0 = 1
578            nf = len(fm)-1
579          nf = (nf/2)*2
580## check to see whether we can process in pairs
581          pairedfiles = True
582          for k in range(0,nf,2):
583            pairedfiles = pairedfiles and smem[k+i0][0] == 1 and smem[k+i0+1][1] == 12
584
585          if pairedfiles:
586            oldfm = fm[:]
587            fm = []
588            nodir = True
589            for kkk in range(0,nf,2):
590              ofile = ' %smergefile_%s_%s_%s.nc' % (tmpdir,expt,var,kkk)
591              cmd = '/usr/bin/cdo -s mergetime %s%s.nc %s%s.nc %s' % (edir, oldfm[kkk][0], edir, oldfm[kkk+1][0], ofile)
592              fm.append( (ofile,syey[kkk][0],syey[kkk+1][1]) )
593              if go and step1 and go1:
594                rc = run_cmd( cmd, strict=True )
595                if rc != 0:
596                  badkeys.append(key)
597
598          else:
599            assert inst != 'MIROC', 'Cannot handle this file collection'
600         
601
602      if not done:
603        kkk = 0
604        for file in fm:
605          kkk += 1
606          ofile = ' %sofile_%s_%s_%s.nc' % (tmpdir,expt,var,kkk)
607          if nodir:
608            cmd = '/usr/bin/cdo -s %s %s.nc %s' % (cdoop, file[0], ofile)
609          else:
610            cmd = '/usr/bin/cdo -s %s %s%s.nc %s' % (cdoop, edir, file[0], ofile)
611          if go and step1 and go1:
612            rc = run_cmd( cmd, strict=False )
613            if rc != 0:
614              badkeys.append(key)
615          else:
616            print 'single: %s' % cmd
617          ofiles[key] += ' %s' % (ofile)
618      if nbfb > 0:
619          log.info( 'FILE DATE BOUNDARIES DO NOT MATCH ASSUMPTIONS: %s, %s' % (expt,var) )
620    elif batching in ['30yr','5yr','yr']:
621      startstart = fm[0][1]
622      endend = fm[-1][2]
623      if fullYearNeeded:
624        if fm[0][3] != 1:
625          startstart += 1
626        if fm[-1][4] != 12:
627          endend += -1
628      elif fullYearWanted:
629        if fm[0][3] > 3:
630          startstart += 1
631        if fm[-1][4] < 10:
632          endend += -1
633
634      blen = { 'yr':1, '5yr':5, '30yr':30 }[batching]
635      if batching == '30yr':
636        sll = mclim30batching
637      else:
638        if batching == '5yr':
639          bstart = ((startstart+3)/5)*5 + 1
640          bslast = ((endend)/5 - 1)*5 + 1
641        elif batching == 'yr':
642          bstart = startstart
643          bslast = endend
644        nb = (bslast - bstart)/blen + 1
645        sll = range( bstart, bslast+1, blen )
646
647      for s in sll:
648        tfm = []
649        kk += 1
650        for f in fm:
651          for y in range( s,s+blen):
652            if f not in tfm and (f[1] == None or ( f[1] <= y and f[2] >= y )):
653              tfm.append(f)
654        assert len(tfm) <= 2 or blen > 1, 'Not expecting more than 2 elements: %s' % str(tfm)
655        print tfm
656        if len(tfm) == 0:
657          print 'No files found in segment %s, %s' % (s,s+blen)
658          print expt,var
659          print fm
660          raise 'logic error somewhere'
661        ofile = ' %sofile_%s_%s_%s.nc' % (tmpdir,expt,var,kk)
662        ofiles[key] += ' %s' % (ofile)
663        ylist = string.join( map( str, range( s,s+blen) ), ',' )
664        this_cdoop = cdoop
665        if not ( tfm[0][1] == None or tfm[0][1] < s or ( tfm[0][1] == s and tfm[0][3] == 1) ):
666          error = fullYearNeeded or ( fullYearWanted and not ( tfm[0][1] == s and tfm[0][3] <= 3) )
667          assert not error, 'Data does not start soon enough, %s, %s' % (str(tfm), s )
668          log.warning( 'WARN001: Incomplete climatology: %s, %s, %s' % (s,blen,str(tfm)) )
669
670        if not ( tfm[-1][2] == None or tfm[-1][2] > s + blen -1 or (tfm[-1][2] == s + blen -1 and tfm[-1][4] == 12) ):
671          error = fullYearNeeded or ( fullYearWanted and not ( tfm[-1][2] == s + blen -1 and tfm[-1][4] >= 10) )
672          assert not error, 'Data does not end late enough, %s, %s' % (str(tfm), s )
673          log.warning( 'WARN002: Incomplete climatology: %s, %s, %s' % (s,blen,str(tfm)) )
674
675        for k in range(len(tfm)-1):
676            log.info( string.join(['INFO0100: ',str(k), str( [1,2,3] )] ) )
677            log.info( string.join(['INFO0101: ',str(k), str(tfm[k]), str(tfm[k+1]) ] ) )
678            a = str(tfm[k])
679            b = str(tfm[k+1])
680            testRes = ( tfm[k][2] == tfm[k+1][1] and tfm[k][4] == tfm[k+1][3] -1 ) or ( tfm[k][2] == tfm[k+1][1] - 1 and tfm[k][4] == 12 and tfm[k+1][3] == 1 )
681            if not testRes:
682              assert optag in ['cdd','cwd'], 'Lack of continuity in batch, %s, %s:: %s, %s' % (ofile,k,a,b)
683              log.warning( 'WARN003: Lack of continuity in batch, %s, %s:: %s, %s' % (ofile,k,a,b) )
684       
685        if optag in ['cdd','cwd']:
686          ##this_cdoop = 'settaxis,%s-01-01,12:00,1year -%s -mulc,86400' % (startstart, cdoop )
687          this_cdoop = '%s -mulc,86400' % (cdoop )
688        log.info( '[x1]: len tfm: %s' % (len(tfm)) )
689        if len(tfm) == 0:
690          print 'No files found for year %s ' % y
691        elif len(tfm) == 1:
692          cmd = '/usr/bin/cdo -s %s -selyear,%s %s%s.nc %s' % (this_cdoop,ylist,edir,tfm[0][0],ofile)
693          if go:
694            rc = run_cmd( cmd )
695            if debug:
696              log.info( 'Trying to log file maximum value' )
697              cmdo = 'echo %s%s' % (edir,tfm[0][0])
698              cmd = '/usr/bin/cdo info %s' % ofile
699              rc = run_cmd( cmdo, append='/home/users/prototype/cmip_processing/wrk/cdd1/debug.txt' )
700              rc = run_cmd( cmd, append='/home/users/prototype/cmip_processing/wrk/cdd1/debug.txt' )
701          else:
702            print 'single: %s' % cmd
703        elif len(tfm) >= 2:
704          yl1 = []
705          yl2 = []
706          for y in range( tfm[0][1],tfm[0][2]+1):
707            if y in range( s,s+blen):
708              yl1.append(y)
709          for y in range( tfm[-1][1],tfm[-1][2]+1):
710            if y in range( s,s+blen):
711              yl2.append(y)
712          cyl1 = string.join( map( str, yl1 ), ',' )
713          cyl2 = string.join( map( str, yl2 ), ',' )
714          if len(tfm) == 2:
715            midfiles = ''
716          else:
717            midfiles = string.join( map(lambda x:edir + x[0] + '.nc', tfm[1:-1] ) )
718          tfile = '%s/.tmp_ofile_%s.nc' % (tmpdir,s)
719          cmd1 = '/usr/bin/cdo -s mergetime -selyear,%s %s%s.nc %s -selyear,%s %s%s.nc %s' % (cyl1,edir,tfm[0][0], midfiles, cyl2,edir,tfm[1][0],tfile)
720          cmd2 = '/usr/bin/cdo -s %s %s %s' % (this_cdoop,tfile,ofile)
721          if go:
722            if os.path.isfile( tfile ):
723              os.unlink( tfile )
724            rc = run_cmd( cmd1 )
725            rc = run_cmd( cmd2 )
726            if cleanUpAsUGo:
727              ## short pause to make sure cmd has really finished
728              time.sleep( 1 )
729              os.unlink( tfile )
730          else:
731            print 'double: %s\n     %s' % (cmd1,cmd2)
732        elif tfm[0][3] == 1 and tfm[-1][4] == 12:
733          tfile = '%s/.tmp_ofile_%s.nc' % (tmpdir,s)
734          cmd1 = '/usr/bin/cdo -s mergetime  %s %s' % (string.join( map(lambda x:edir + x[0] + '.nc', tfm ), ' '),tfile)
735          cmd2 = '/usr/bin/cdo -s %s %s %s' % (this_cdoop,tfile,ofile)
736          if go:
737            if os.path.isfile( tfile ):
738              os.unlink( tfile )
739            rc = run_cmd( cmd1 )
740            rc = run_cmd( cmd2 )
741            if cleanUpAsUGo:
742              ## short pause to make sure cmd has really finished
743              time.sleep( 1 )
744              os.unlink( tfile )
745          else:
746            print 'multi1: %s\n     %s' % (cmd1,cmd2)
747        else:
748          print '------------tfm------------',len(tfm)
749          print tfm
750          raise 'Code missing'
751    return tid, crd, table_id, realization, initialization_method, physics_version, startstart,calendar
752
753
754for expt,var,fm in ye:
755    tid, crd, table_id, realization, initialization_method, physics_version, startstart,calendar = run_evf( expt,var,fm )
756
757def proc_expt(expt,version,optag):
758    attfile = '%s/temp_attribute_file.txt' % tmpdir
759    varl = varld[expt]
760    ## for var in varld[expt]:
761    for var in varl:
762      if optag in ['am', 'sm']:
763        if var == 'tasmax':
764          cmval = "time: maximum within days time: mean over days"
765        elif var == 'tasmin':
766          cmval = "time: minimum within days time: mean over days"
767        else:
768          cmval = "time: mean"
769      else:
770        assert True, 'Need to set cell_methods for this operator'
771
772      key = '%s_%s' % (expt,var)
773      if key in badkeys:
774        log.info( 'Skipping %s because of earlier error' % key )
775      elif string.strip( ofiles[key] ) == '':
776        log.info( 'Skipping %s because no output files registered' % key  )
777        log.warning( 'WARN004: unexplained missing output file for key %s' % (key) )
778      else:
779        log.info( 'Trying %s' % key )
780        dir = get_version_dir( version=version)
781    ##    vdir = dir_tmpl % locals()
782        vdir = '%s/%s/' % (dir,var)
783        ovar = var
784        if optag in ['cdd','cwd']:
785          precip_crit = 1.
786          ovar = '%s%i' % (optag, int( precip_crit*10 + 0.5 ) )
787        if optag == 'tr':
788          tasmin_crit = 20.
789          ovar = 'tr%i' % int( tasmin_crit + 0.5 )
790        ofilename = '%s_%s_%s_%s_%s_%s.nc' % (ovar,ofreq,model,expt,ensem,optag)
791        if os.path.isfile( ofilename ):
792           os.unlink( ofilename )
793        tid = ''
794        for k in range(36):
795          tid += cx[random.randint(0,len(cx)-1 )]
796        this_mip  = string.split( table_id )[1]
797        version = string.split( string.strip( dir, '/' ), '/' )[-1]
798        esgf_dsid = '%s.%s.%s.%s.%s.%s.%s.%s.%s.%s' % (project,product,inst,expt,model,ifreq,realm,this_mip,ensem,version)
799        if eextras.get( key, None ) == 'withExt':
800          esgf_dsid += ' (with rcp45 extension)'
801        cmval = None
802##cmip5.output1.MIROC.MIROC5.sstClim.monClim.atmos.Amon.r1i1p1.v20111129.xml
803        oo = open( attfile, 'w' )
804        kkey = '%s_%s' % (expt,var)
805        oo.write( '%s %s\n' % ('proc_institute ', 'British Atmospheric Data Centre' ) )
806        oo.write( '%s %s\n' % ('proc_institute_id ', 'BADC' ) )
807        oo.write( '%s %s\n' % ('proc_machine ', 'jasmin-sci1.ceda.ac.uk' ) )
808        oo.write( '%s %s\n' % ('proc_contact ', 'badc@rl.ac.uk, martin.juckes@stfc.ac.uk' ) )
809        oo.write( '%s %s\n' % ('proc_description ', proc_description ) )
810        oo.write( '%s %s\n' % ('creation_date ', creation_date ) )
811        oo.write( '%s %s\n' % ('proc_version ', 'v%s' % vstring ) )
812        oo.write( '%s %s\n' % ('script_version ', '%5.2f' % scriptVersion ) )
813        oo.write( '%s %s\n' % ('operation ', opName ) )
814        oo.write( '%s %s\n' % ('operation_id ', optag ) )
815        oo.write( '%s %s\n' % ('operation_qualifiers ', ' ' ) )
816        oo.write( '%s %s\n' % ('input_frequency ', ifreq ) )
817        oo.write( '%s %s\n' % ('input_esgf_dsid ', esgf_dsid ) )
818        oo.write( '%s %s\n' % ('model_id ', model ) )
819        oo.write( '%s %s\n' % ('experiment_id ', expt ) )
820        oo.write( '%s %s\n' % ('institute_id ', inst ) )
821        oo.write( '%s %s\n' % ('table_id ', table_id ) )
822        oo.write( '%s %s\n' % ('initialization_method ', initialization_method ) )
823        oo.write( '%s %s\n' % ('physics_version ', physics_version ) )
824        oo.write( '%s %s\n' % ('realization ', realization ) )
825        oo.write( '%s %s\n' % ('frequency ', ofreq ) )
826        oo.write( '%s %s\n' % ('modeling_realm ', realm ) )
827        oo.write( '%s %s\n' % ('input_variable ', var ) )
828        oo.write( '%s %s\n' % ('product ', 'derived' ) )
829        oo.write( '%s %s\n' % ('input_dir ', vdir ) )
830        oo.write( '%s %s\n' % ('plurality ', plurality ) )
831        oo.write( '%s %s\n' % ('proc_code_repository_url ', 'http://proj.badc.rl.ac.uk/svn/exarch/CedaCdoScripts' ) )
832        oo.write( '%s %s\n' % ('proc_code_home_url ', 'http://proj.badc.rl.ac.uk/exarch/wiki/PackageRBPS' ) )
833        oo.write( '%s %s\n' % ('proc_method_url ', '%s/documentation/%s/%s' % (mdbase, plurality, doc_filename)  ) )
834        oo.write( '%s %s\n' % ('proc_log_url ', '%s/logfiles/%s/%s/%s' % (mdbase, plurality, optag, log_filename)  ) )
835        oo.write( '%s %s\n' % ('proc_batch ', processing_batch  ) )
836        if optag in ['cdd','cwd']:
837          oo.write( '%s %s\n' % ('proc_pars','precip_crit' ) )
838          oo.write( '%s %s\n' % ('precip_crit',precip_crit) )
839        if optag == 'tr':
840          oo.write( '%s %s\n' % ('proc_pars','tasmin_crit' ) )
841          oo.write( '%s %s\n' % ('tasmin_crit',tasmin_crit) )
842
843        parts = string.split( ifiles[kkey], ',' )
844        if len( parts) > 1:
845          bits = string.split( parts[0], '_' )
846          stem = string.join( bits[0:-1], '_' )
847          dat = string.split( bits[-1], '.' )[-1]
848          attr = '%s_%%(date)s.nc [%s' % (stem,dat)
849          for p in parts[1:]:
850            if string.strip(p) != '':
851              bits = string.split( p, '_' )
852              dat = string.split( bits[-1], '.' )[-1]
853              attr += ',' + dat
854          if eextras.get( key, None ) == 'withExt':
855            attr += '*] (* -- last in rcp45)'
856          else:
857            attr += ']'
858          writeat(oo, 'input_files ', attr, log )
859        else:
860          writeat(oo, 'input_files ', ifiles[kkey], log )
861
862        if len( tids[kkey] ) > 511:
863           log.warning( 'WARN003: truncating tid list for %s\n%s' % (ofilename, tids[kkey]) )
864           tids[kkey] = tids[kkey][0:500] + '.....'
865
866        oo.write( '%s %s\n' % ('input_tids ', tids[kkey]) )
867        oo.write( '%s %s\n' % ('input_creation_date ', crds[kkey]) )
868        oo.write( '%s %s\n' % ('tracking_id ', tid) )
869        writeat( oo, 'history ', '%s; %s' % (string.replace(hists[kkey],'\n',';;'), '%s: %s: %s' % (creation_date, 'CDO script', proc_description)), log )
870        oo.close()
871        tmpofilename = tmpdir + '.cdo_basic_temp.nc'
872        if os.path.isfile( tmpofilename ):
873           os.unlink( tmpofilename )
874        if len( string.split( ofiles[kkey] ) ) > 1:
875          ##cdoc = 'settaxis,%s-01-01,12:00,365day -mergetime' % startstart
876
877          cdo_extra = ''
878          if optag in ['cdd','cwd']:
879            varTag = { 'cdd':'dry','cwd':'wet' }[optag]
880            cdo_extra = '-chname,consecutive_%s_days_index_per_time_period,%s' % (varTag,ovar)
881
882          if optag not in ['mclim30']:
883            cmd = '/usr/bin/cdo -s mergetime %s %s' % (ofiles[kkey],tmpofilename)
884            rc = run_cmd( cmd )
885          ##if optag in ['cdd', 'tr']:
886            ##cdo_extra = '-settaxis,%s-01-01,12:00,1year' % startstart
887## the desired outcome does not appear to be achievable with cdo
888            assert os.path.isfile( tmpofilename ),'File "%s" not found [1], kkey = %s' % (tmpofilename,kkey)
889            cmd = '/usr/bin/cdo -s setgatts,%s %s %s %s/%s' % (attfile,cdo_extra,tmpofilename,opdir,ofilename)
890            rc = run_cmd( cmd )
891          else:
892            fl = string.split( ofiles[kkey] )
893            assert len(fl) == len(mclim30batching),'Mismatch in length of fl: %s' % str(fl)
894            for i in range(len(fl)):
895               ofilename = '%s_%s_%s_%s_%s_%s_%s-%s.nc' % (ovar,ofreq,model,expt,ensem,optag,mclim30batching[i],mclim30batching[i]+29)
896               cmd = '/usr/bin/cdo -s setgatts,%s %s %s %s/%s' % (attfile,cdo_extra,fl[i],opdir,ofilename)
897               rc = run_cmd( cmd )
898
899          if optag in ['cdd','cwd']:
900            import ncExtras
901            thresholdVname = "prthreshold"
902            precip_critMetre = precip_crit*0.001
903            cmdL = ["--time", '%s,"days since %s-01-01",%s,1,years' % (calendar,startstart,-1), '--cb', 'climBounds', \
904                    "--sc", "%s,%s,standard_name=lwe_thickness_of_precipitation_amount;units=m" % (thresholdVname,precip_critMetre), \
905                    '%s/%s' % (opdir,ofilename), 'ncExtras.nc']
906            log.info( str(cmdL ) )
907            ncExtras.run( cmdL )
908## also need a lwe_thickness_of_precipitation_amount variable (units "m") specifying the threshold.
909## "--sc prthreshold,0.01,standard_name=lwe_thickness_of_precipitation_amount;units=m"
910## -A: forces the new variables to be added to existing file
911## -C: prevents ancilliary variables (e.g. associated with the bounds dimension) from being added in
912##
913            cmd = '/usr/bin/ncks -A -C -v time,climBounds,%s ncExtras.nc %s/%s' % (thresholdVname,opdir,ofilename)
914            log.info( 'xxx: %s' % cmd )
915            if optag == 'cwd':
916              reln = 'above'
917              ancvar = 'number_of_cwd_periods_with_more_than_5days_per_time_period'
918            else:
919              reln = 'below'
920              ancvar = 'number_of_cdd_periods_with_more_than_5days_per_time_period'
921            cmval = 'time: mean within days time: maximum over days'
922            attedExtras = ['-a','standard_name,%s,o,c,spell_length_of_days_with_lwe_thickness_of_precipitation_amount_%s_threshold' % (ovar,reln), \
923                           '-a','units,%s,o,c,1' % (ancvar), \
924                           '-a','units,%s,o,c,days' % (ovar), \
925                           '-a','coordinates,%s,o,c,%s' % (ovar,thresholdVname)]
926### following the example 7.11 of CF Conventions 1.6. Note that "maximum over days" is not very precise -- but "maximum over spells" is not allowed.
927            rc = run_cmd( cmd )
928           
929        else:
930          assert os.path.isfile( string.strip(ofiles[kkey]) ), 'File "%s" not found [2], kkey = %s' % (ofiles[kkey], kkey)
931          cmd = '/usr/bin/cdo -s setgatts,%s %s %s/%s' % (attfile,ofiles[kkey],opdir,ofilename)
932          log.info( 'Command: %s\n' %  cmd )
933          rc = run_cmd( cmd )
934
935        ###cmd = '/usr/local/2/bin/ncatted -a cell_methods,%s,o,c,%s %s' % (var,cmval,ofilename)
936          if cmval != None or len(attedExtras) > 0:
937            cmd = ['/usr/local/2/bin/ncatted',]
938            if cmval != None:
939              cmd += ['-a','cell_methods,%s,o,c,%s' % (ovar,cmval)]
940            for c in attedExtras:
941              cmd.append(c)
942            cmd.append('%s/%s' % (opdir,ofilename))
943            rc = run_cmd( cmd )
944
945def close_all():
946  log.info( 'Script completed' )
947  fh.close()
948  rcout.close()
949  rcerr.close()
950  for k in eeproc.keys():
951    sh[procid][k] = eeproc[k]
952  sh.close()
953
954if go:
955  log.info( 'Merging files' )
956  for expt in expts:
957    proc_expt(expt,version,optag)
958
959close_all()
960
961os.popen( 'rm %scdo_basic_lock' % tmpdir ).readlines()
Note: See TracBrowser for help on using the repository browser.