source: CedaCdoScripts/branch/c1/cdo_basic.py @ 126

Subversion URL: http://proj.badc.rl.ac.uk/svn/exarch/CedaCdoScripts/branch/c1/cdo_basic.py@126
Revision 126, 33.6 KB checked in by mjuckes, 7 years ago (diff)

added files

  • Property svn:executable set to *
Line 
1#!/usr/bin/python
2
3import glob, string, os
4import subprocess, time, sys
5import cdms2 as cdms
6import random, time, logging, getopt, shelve
7
8debug = True
9cleanUpAsUGo = True
10args = getopt.getopt( sys.argv[1:], 'i:m:e:r:b:x:o:', ['onlyVars=','scratch=','omitVars=','version=','log='] )
11ad = { 'i':'inst', 'm':'model', 'e':'ensem', 'r':'realm', 'b':'batch', 'x':'expt', 'o':'outDir' }
12
13knownops = ['am','sm','bm','mclim5','cdd','cwd','tr', 'fd']
14optag = args[-1][0]
15assert optag in knownops, 'First argument [%s] should be operation in: %s' % (optag, str(knmownops))
16processing_batch='aaaa0001'
17qcBaseDir = '/badc/cmip5/metadata/processing/qc/cedaqc/'
18qcBaseDir = None
19if qcBaseDir != None:
20  assert os.path.isdir( qcBaseDir ), 'qcBaseDir:%s not found' % qcBaseDir
21
22fullYears = True
23
24edict = {}
25for a in args[0]:
26  edict[a[0][1:]] = a[1] 
27
28def writeat( fh, tag, str,log):
29  if len(str) + len(tag) + 2 > 1024 :
30    log.warn( 'TRUNCATING global attribute %s: %s' % (tag,str) )
31    strl = 1022 - len(tag)
32    str = str[:strl]
33  fh.write( '%s %s\n' % (tag,str) )
34
35fullYearNeeded = False
36fullYearWanted = True
37ok = True
38for k in ad.keys():
39  if not edict.has_key(k):
40    print 'required argument %s [%s] missing' % (k,ad[k])
41    ok = False
42assert ok, 'Not all required arguments found'
43## supported operations are box-mean and annual mean/
44##inst = 'MOHC'
45##model = 'HadGEM2-ES'
46##inst = 'NCC'
47##model = 'NorESM1-M'
48##inst = 'CNRM-CERFACS'
49##model = 'CNRM-CM5'
50##inst = 'CSIRO-QCCCE'
51##model = 'CSIRO-Mk3-6-0'
52##inst = 'IPSL'
53##model = 'IPSL-CM5A-LR'
54##realm = 'ocean'
55##realm = 'atmos'
56##batch = 'b011_'
57##onlyVars = ['tas']
58project = 'cmip5'
59ensem = edict['e']
60inst = edict['i']
61model = edict['m']
62realm = edict['r']
63experiment = edict['x']
64batch = edict['b'] + '_'
65processing_batch = edict['b']
66opdir = edict['o']
67onlyVars = None
68omitVars = None
69product = 'output1'
70assert not( edict.has_key( '-onlyVars' ) and edict.has_key( '-omitVars' ) ), 'Specifying both onlyVars and omitVars is an error'
71
72## qcDir = '%s%s/%s/%s/%s/' % (qcBaseDir,product,inst, model, expt)
73
74vstring = '%s%2.2i%2.2i' % time.gmtime()[0:3]
75doctag = optag
76if optag in ['cdd','cwd']:
77  doctag = 'precip_indices'
78if optag in ['tr','fd']:
79  doctag = 'temperature_indices'
80doc_filename = 'ceda_cmip5_processing_%s.pdf' % doctag
81tstring = '%s%2.2i%2.2i-%2.2i%2.2i%2.2i' % time.gmtime()[0:6]
82log_filename = edict.get( '-log', 'ceda_cmip5_processing_log_%s_%s.txt' % (optag,tstring) )
83batching='byFile'
84ofreq = 'yr'
85ifreq = 'mon'
86if optag in ['mclim5']:
87  batching = '5yr'
88  monthsPerBatch = 60
89elif optag in ['cdd','cwd','tr','fd']:
90  batching = 'yr'
91  ifreq = 'day'
92  monthsPerBatch = 12
93
94procid = '%s_%s_%s_%s_%s_%s' % (optag,inst,model,experiment,realm,ensem)
95sh = shelve.open( 'cdo_basic_sh' )
96if not sh.has_key( procid ):
97  sh[procid] = {}
98
99scriptName = 'cdo_basic.py'
100scriptVersion = 0.9
101log = logging.getLogger( __name__ )
102fh = logging.FileHandler( log_filename )
103log.level = logging.INFO
104log.addHandler( fh )
105creation_date = time.ctime()
106log.info( 'Starting %s wrapper script, version %5.2f' % (scriptName, scriptVersion) )
107log.info( 'Date: %s' % creation_date )
108log.info( 'Operation: %s' % optag )
109log.info( 'Institute: %s' % inst )
110log.info( 'Model: %s' % model )
111log.info( 'Realm: %s' % realm )
112if edict.has_key( '-onlyVars' ):
113   onlyVars = string.split( edict['-onlyVars'], ',' )
114   log.info( 'Only including variables: %s' % edict['-onlyVars'] )
115if edict.has_key( '-omitVars' ):
116   omitVars = string.split( edict['-omitVars'], ',' )
117   log.info( 'Omitting variables: %s' % edict['-omitVars'] )
118if edict.has_key( '-version' ):
119   version = edict['-version']
120   log.info( 'Processing version: %s' % edict['-version'] )
121else:
122   version = None
123## log.info( 'CDO version information:' )
124## os.popen( 'cdo -V > .cdo_version' ).readlines()
125## for l in open( '.cdo_version' ).readlines():
126  ## log.info( string.strip(l) )
127## log.info( 'End of CDO version information.' )
128
129cx = '0123456789qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM'
130puid = ''
131for k in range(6):
132   puid += cx[random.randint(0,len(cx)-1 )]
133
134tmpdir = edict.get( '-scratch', 'tmp/cdo_basic_wrk/%s/' % puid )
135
136if not os.path.isdir(tmpdir):
137  os.mkdir( tmpdir )
138  log.info( 'Creating temporary directory: %s ' % tmpdir )
139
140if tmpdir[-1] != '/':
141    tmpdir += '/'
142
143assert not os.path.isfile( '%scdo_basic_lock' % tmpdir ), 'Lock file exists in work directory %s' % tmpdir
144
145os.popen( 'touch %scdo_basic_lock' % tmpdir ).readlines()
146assert os.path.isfile( '%scdo_basic_lock' % tmpdir ), 'Failed to make lock file in work directory %s' % tmpdir
147
148plurality = 'single'
149mdbase = "http://badc.nerc.ac.uk/browse/badc/cmip5/metadata/processing/" 
150
151rcout = open( 'cdo_stdout.txt', 'w' )
152rcerr = open( 'cdo_stderr.txt', 'w' )
153def run_cmd( cmd, comment='', strict=True, append=None, out=None  ):
154    if type(cmd) == type('x'):
155      args = map( lambda x: string.strip(x, "'"), string.split( cmd ))
156    else:
157      args = cmd
158    log.info( cmd )
159    if append != None:
160      fho = open( append, 'a' )
161      clo = True
162    elif out != None:
163      fho = open( out, 'w' )
164      clo = True
165    else:
166      fho = rcout
167      clo = False
168    proc = subprocess.Popen( args, stdout=fho, stderr=rcerr, env={ 'LD_LIBRARY_PATH':'/usr/local/lib'} )
169    std = proc.communicate()
170    rc = proc.returncode
171    if rc != 0:
172      print args
173      print std[0]
174      print std[1]
175      if strict:
176        assert rc == 0, 'Failed (%s) [rc=%s] executing %s' % (comment,rc,str(cmd))
177      elif rc != 0:
178        print 'Failed (%s) [rc=%s] executing %s' % (comment,rc,str(cmd))
179    if clo:
180      fho.close()
181    return rc
182
183dir_tmpl_s = '/disks/fuse/cmip5/output1/MOHC/HadGEM2-ES/%(expt)s/%(ifreq)s/atmos/Amon/'
184dir_tmpl_s = '/badc/cmip5/data/cmip5/output1/MOHC/HadGEM2-ES/%(expt)s/%(ifreq)s/atmos/Amon/'
185if ifreq == 'mon':
186  mip = {'atmos':'Amon', 'ocean':'Omon', 'land':'Lmon' }[realm]
187elif ifreq == 'day':
188  mip = 'day'
189else:
190  raise 'Should not be here'
191
192dir_tmpl_r = '/badc/cmip5/data/cmip5/output1/%(inst)s/%(model)s/%(expt)s/%(ifreq)s/%(realm)s/%(mip)s/%(ensem)s/'
193dir_tmpl_e = '/badc/cmip5/data/cmip5/output1/%(inst)s/%(model)s/%(expt)s/%(ifreq)s/%(realm)s/%(mip)s/%(ensem)s/latest/'
194dir_tmpl = dir_tmpl_e + '%(var)s/'
195bldir_tmpl = '/home/users/prototype/cmip_processing/cmip5_bl/output1_%(inst)s_%(model)s_%(expt)s/'
196blfile_tmpl = '%(ifreq)s_%(realm)s_%(mip)s_%(ensem)s_%(version)s.txt'
197
198
199def get_var_from_bl_record( rec ):
200  bits = string.split( rec, ':' )
201  if len(bits) < 2:
202    return None
203  b2 = string.split( bits[1], '/' )
204  if len(b2) == 0:
205    return None
206  var = string.split( b2[-1], '_' )[0]
207  return var
208
209def get_version_dir(version=None):
210
211  pdict = { 'inst':inst, 'model':model, 'expt':expt, 'ensem':ensem, 'ifreq':ifreq, 'realm':realm, 'mip':mip }
212  rdir = dir_tmpl_r % pdict
213  if version != None:
214    this_v = rdir + version
215    assert os.path.isdir( this_v ), 'Version directory %s not found' % this_v
216    return this_v
217
218  vlist = glob.glob( rdir + 'v*' )
219  assert len(vlist) > 0, 'No version directories found in %s' % rdir
220  vlist.sort()
221  return vlist[-1]
222
223def get_expt_fl(expt,var):
224  pdict = { 'inst':inst, 'model':model, 'expt':expt, 'var':var, 'ensem':ensem, 'ifreq':ifreq, 'realm':realm, 'mip':mip }
225  bldir = bldir_tmpl % pdict
226  rdir = dir_tmpl_r % pdict
227  vlist = glob.glob( rdir + 'v*' )
228  vlist.sort()
229  edir = '%s/%s/' % (vlist[-1],var)
230  ## edir = dir_tmpl % { 'inst':inst, 'model':model, 'expt':expt, 'var':var, 'ensem':ensem, 'ifreq':ifreq, 'realm':realm, 'mip':mip }
231  assert os.path.isdir( edir), 'Error in get_expt_fl: %s is not a directory' % edir
232  log.info( 'edir:: %s' % edir )
233  fl = glob.glob( edir + '*.nc' )
234  assert len(fl) > 0, 'No netcdf files found in %s' % edir
235  fl.sort()
236  fm = []
237  blvars = []
238  if os.path.isdir( bldir ):
239    this_version = string.split( string.strip(vlist[-1], '/'), '/' )[-1]
240    pdict['version'] = this_version
241    blfile = blfile_tmpl % pdict
242    if os.path.isfile( bldir + blfile ):
243      blvars = map( get_var_from_bl_record, open( bldir + blfile ).readlines() )
244      if var in blvars:
245        log.info( 'Skipping black-listed ADS expt=%s, var=%s' % (expt,var) )
246        return fm
247      else:
248        log.info( 'Var %s not in black list [%s] %s' % (var, bldir + blfile, str(blvars)) )
249    else:
250      log.info( 'Black list file %s not present' % (bldir+blfile) )
251  else:
252    log.info( 'Black list directory %s not present' % bldir )
253  for f in fl:
254    fn = string.split(f, '/')[-1]
255    stem = string.split( fn, '.')[0]
256    bits = string.split( stem, '_' )
257    if len(bits) > 5:
258      ts = string.split(bits[5],'-')
259      starty = long(ts[0][0:4])
260      if len(ts[0]) >= 6:
261        startm = long(ts[0][4:6])
262        if len(ts) == 2:
263          endm = long(ts[1][4:6])
264        else:
265          endm = startm
266      else:
267        startm = 1
268        endm = 12
269      if len(ts) == 2:
270        endy = long(ts[1][0:4])
271      else:
272        endy = starty
273      fm.append( (stem,starty,endy,startm,endm) )
274    else:
275      fm.append( (stem,None,None,None,None) )
276      assert batching != '5yr', 'No time bounds in filename: %s (batching = %s)' % (fn,batching)
277  return fm
278
279go = False
280go = True
281ofiles = {}
282ifiles = {}
283tids = {}
284varld = {}
285hists = {}
286crds = {}
287
288def listAnd( l1, l2 ):
289  l3 = []
290  for i in l1:
291    if i in l2:
292      l3.append(i)
293  return l3
294
295def listExcl( l1, l2 ):
296  l3 = []
297  for i in l1:
298    if i not in l2:
299      l3.append(i)
300  return l3
301
302expts = [experiment,]
303if optag in ['cdd','cwd']:
304   onlyVars = ['pr']
305   omitVars = None
306if optag in ['tr','fd']:
307   onlyVars = ['tasmin']
308   omitVars = None
309
310for expt in expts:
311  ##dir = dir_tmpl_e % locals()
312  dir = get_version_dir( version=version)
313  vars = glob.glob( '%s/*' % dir )
314  thisvarl = map( lambda x: string.split(x,'/')[-1], vars )
315  thisvarl.sort()
316  if onlyVars == None:
317    if omitVars == None:
318      varld[expt] = thisvarl[:]
319    else:
320      varld[expt] = listExcl( thisvarl, omitVars )
321  else:
322    varld[expt] = listAnd( thisvarl, onlyVars )
323    if len(varld[expt]) == 0:
324       print 'No common elements found:'
325       print thisvarl
326       print onlyVars
327       raise
328 
329  for var in varld[expt]:
330    ofiles['%s_%s' % (expt,var)] = ''
331    ifiles['%s_%s' % (expt,var)] = ''
332    tids['%s_%s' % (expt,var)] = ''
333
334ye = []
335badkeys = []
336for expt in expts:
337  for var in varld[expt]:
338     fm = get_expt_fl( expt, var )
339     if len(fm) > 0:
340        ye.append( (expt,var,fm) )
341     else:
342        badkeys.append( '%s_%s' % (expt,var) )
343
344
345## sys.exit(0)
346
347lon1,lon2,lat1,lat2 = (60.,120.,30.,60.)
348cdoop, ofreq,opName, proc_description = { 'am':('divdpy -yearsum -muldpm', 'yr','Annual mean','Annual mean of monthly mean data'), \
349                        'sm':('divdpy -seassum -muldpm', 'yr','Seasonal mean','Annual seasonal of monthly mean data'), \
350                        'bm':('fldmean -sellonlatbox,%s,%s,%s,%s' % (lon1,lon2,lat1,lat2),ifreq,'Box mean','Spatial mean over a latitude-longitude box'), \
351                        'mclim5':('ymonavg', '5yr','5-year monthly climatology','Monthly climatology for 5 year period: 5 year mean for each month'),
352                        'cdd':('eca_cdd','yr','Continuous dry days','Maximum of continuous dry days in a year'),
353                        'cwd':('eca_cwd','yr','Consecutive wet days','Maximum of consecutive wet days in a year'),
354                        'tr':('eca_tr','yr','Tropical nights','Count of tropical nights per year'),
355                        'fd':('eca_fd','yr','Frost days','Count of frost days (Tmin below zero) per year') }[optag]
356## opdir = '%s%s' % (batch,optag)
357if go and not os.path.isdir( opdir ):
358  os.mkdir( opdir )
359
360step1 = False
361step1 = True
362go1 = True
363log.info( 'CDO command: %s ' % cdoop )
364eeproc = {}
365k=-1
366print 'INFO0080: ',k, str(k), str( [1,2,3] )
367##for expt,var,fm in ye:
368def run_evf( expt,var,fm ):
369    kk = 0
370    dir = get_version_dir( version=version)
371    ##edir = dir_tmpl % locals()
372    edir = '%s/%s/' % (dir,var)
373    key = '%s_%s' % (expt,var)
374    rp1 = os.path.realpath( edir )
375    rpath = os.path.realpath( rp1 +'/' + fm[0][0] + '.nc' )
376    rdir = os.path.split( rpath )[0]
377    mtime = time.localtime(os.path.getmtime(rdir))
378    timenow = '%4.4i%2.2i%2.2i' % time.localtime()[0:3]
379    modtime = '%4.4i%2.2i%2.2i' % mtime[0:3]
380    log.info( 'Starting expt=%s, var=%s [# files = %s, dir mod time: %s]' % (expt,var,len(fm),modtime ) )
381     
382    if not eeproc.has_key( var ):
383      print 'Creating element for %s' % var
384      if sh[procid].has_key(var):
385        eeproc[var] = sh[procid][var]
386        eeproc[var].append( (modtime,len(fm),timenow,rdir) )
387      else:
388        eeproc[var] = [(modtime,len(fm),timenow,rdir),]
389    else:
390      eeproc[var].append( (modtime,len(fm),timenow,rdir) )
391    for file in fm:
392        nc = cdms.open( '%s%s.nc' % (edir,file[0]) )
393        tid = nc.tracking_id
394        crd = nc.creation_date
395        table_id = nc.table_id
396        realization = nc.realization[0]
397        initialization_method = nc.initialization_method[0]
398        physics_version = nc.physics_version[0]
399        if 'time' in nc.axes.keys():
400           calendar = nc.getAxis('time').attributes.get( 'calendar', 'none' )
401           if calendar == 'none':
402             print 'calendar attribute not found'
403        else:
404           print 'time axis not found'
405           calendar = 'none'
406        print 'calendar = %s' % calendar
407        log.info( 'File: %s; tracking id: %s; creation date: %s' % (file[0],tid,crd) )
408 
409        tids[key] += '%s,' % tid
410        crds[key] = crd
411        if not hists.has_key(key):
412           hists[key] = nc.history
413        nc.close()
414        ifiles[key] += '%s,' % file[0]
415
416    startstart = None
417    nbfb = 0
418    nbfbb = 0
419    log.info( 'batching: %s, %s, %s' % (batching,expt,var) )
420    if batching == 'byFile':
421      smem = []
422      syey = []
423      for file in fm:
424        fn = string.split( file[0], '/' )[-1]
425        dseg = string.split( string.split( fn, '.' )[0], '_' )[-1]
426        sd, ed = string.split( dseg, '-' )
427## ta True --> file does not start in Jan.
428        ta = len(sd) > 4 and sd[4:6] != '01'
429        tb = len(ed) > 4 and ed[4:6] != '12'
430        if ta or tb:
431          nbfb +=1
432        if ta and tb:
433          nbfbb += 1
434        else:
435          smem.append( ( int( sd[4:6] ), int( ed[4:6] ) ) )
436          syey.append( ( int( sd[0:4] ), int( ed[0:4] ) ) )
437      ##  if len(sd) > 4 and sd[4:6] != '01':
438            ##nbfb += 1
439        ##if len(ed) > 4 and ed[4:6] != '12':
440            ##nbfb += 1
441     
442      done = False
443      nodir = False
444      if nbfb > 0:
445        log.info( 'File date boundaries do not match end of year (%s,%s): merging: %s, %s' % (nbfb, nbfbb, expt,var) )
446        if nbfbb > 0:
447          cfls = ''
448          ofile = ' %sofile_%s_%s_%s.nc' % (tmpdir,expt,var,kk)
449          if len(fm) > 1:
450            if fullYears and fm[0][3] != 1:
451              assert len(fm) > 1, 'Not ready for this case with single input file'
452              yl = range(fm[0][1]+1,fm[0][2]+1)
453              cyl = string.join( map( lambda x:'%s' % x, yl), ',' )
454              cmd = '/usr/local/2/bin/cdo -s selyear,%s  %s%s.nc  %s/%s' % (cyl, edir, fm[0][0], tmpdir,'.tmp1.nc')
455              rc = run_cmd( cmd, strict=True )
456              cfls = ' %s/.tmp1.nc' % tmpdir
457              fi0 = 1
458            else:
459              fi0 = 0
460
461            if fullYears and fm[-1][4] != 12:
462              fi9 = -1
463            else:
464              fi9 = len(fm)
465
466            log.info( str( fm[-1]  ))
467            for file in fm[fi0:fi9]:
468              cfls += ' %s%s.nc' % (edir, file[0])
469
470            if fullYears and fm[-1][4] != 12:
471              yl = range(fm[-1][1],fm[-1][2])
472              cyl = string.join( map( lambda x:'%s' % x, yl), ',' )
473              cmd = '/usr/local/2/bin/cdo -s selyear,%s  %s%s.nc  %s/%s' % (cyl, edir, fm[-1][0], tmpdir,'.tmp2.nc')
474              rc = run_cmd( cmd, strict=True )
475              cfls += ' %s/.tmp2.nc' % tmpdir
476
477            tfile = '%s/.tmp_ofile_%s_%s.nc' % (tmpdir,expt,var)
478            cmd1 = '/usr/local/2/bin/cdo -s mergetime %s %s' % (cfls,tfile)
479            cmd = '/usr/local/2/bin/cdo -s %s %s %s' % (cdoop, tfile, ofile)
480            log.info( 'Merging file %s\n' % cmd1 )
481          else:
482            yl = range(fm[0][1]+1,fm[0][2])
483            cyl = string.join( map( lambda x:'%s' % x, yl), ',' )
484            cmd = '/usr/local/2/bin/cdo -s selyear,%s  %s%s.nc  %s/%s' % (cyl, edir, fm[0][0], tmpdir,'.tmp1.nc')
485            rc = run_cmd( cmd, strict=True )
486            cmd1 = None
487            cmd = '/usr/local/2/bin/cdo -s %s %s/%s %s' % (cdoop, tmpdir, '.tmp1.nc', ofile)
488          log.info( 'cdo command: %s\n' % cmd )
489          ## raise 'bad file boundary'
490           
491          if go and step1 and go1:
492            if cmd1 != None:
493              rc = run_cmd( cmd1, strict=True )
494            else:
495              rc = 0
496
497            if rc != 0:
498              badkeys.append(key)
499            else:
500              rc = run_cmd( cmd, strict=True )
501              if rc != 0:
502                badkeys.append(key)
503
504            if cleanUpAsUGo:
505              ## short pause to make sure cmd has really finished
506              time.sleep( 1 )
507              for f in [tfile, '%s/%s' % (tmpdir, '.tmp1.nc'), '%s/%s' % (tmpdir, '.tmp2.nc')]:
508                if os.path.isfile( f ):
509                  os.unlink( f )
510          else:
511            print 'single: %s, %s' % (cmd1, cmd)
512          ofiles[key] += ' %s' % (ofile)
513          done = True
514        else:
515          i0 = 0
516          nf = len(fm)
517          if smem[0][0] != 1:
518            log.info(  'Omitting first file: %s' % str(fm[0]) )
519            i0 = 1
520            nf = len(fm)-1
521          nf = (nf/2)*2
522## check to see whether we can process in pairs
523          pairedfiles = True
524          for k in range(0,nf,2):
525            pairedfiles = pairedfiles and smem[k+i0][0] == 1 and smem[k+i0+1][1] == 12
526
527          if pairedfiles:
528            oldfm = fm[:]
529            fm = []
530            nodir = True
531            for kkk in range(0,nf,2):
532              ofile = ' %smergefile_%s_%s_%s.nc' % (tmpdir,expt,var,kkk)
533              cmd = '/usr/local/2/bin/cdo -s mergetime %s%s.nc %s%s.nc %s' % (edir, oldfm[kkk][0], edir, oldfm[kkk+1][0], ofile)
534              fm.append( (ofile,syey[kkk][0],syey[kkk+1][1]) )
535              if go and step1 and go1:
536                rc = run_cmd( cmd, strict=True )
537                if rc != 0:
538                  badkeys.append(key)
539
540          else:
541            assert inst != 'MIROC', 'Cannot handle this file collection'
542         
543
544      if not done:
545        kkk = 0
546        for file in fm:
547          kkk += 1
548          ofile = ' %sofile_%s_%s_%s.nc' % (tmpdir,expt,var,kkk)
549          if nodir:
550            cmd = '/usr/local/2/bin/cdo -s %s %s.nc %s' % (cdoop, file[0], ofile)
551          else:
552            cmd = '/usr/local/2/bin/cdo -s %s %s%s.nc %s' % (cdoop, edir, file[0], ofile)
553          if go and step1 and go1:
554            rc = run_cmd( cmd, strict=False )
555            if rc != 0:
556              badkeys.append(key)
557          else:
558            print 'single: %s' % cmd
559          ofiles[key] += ' %s' % (ofile)
560      if nbfb > 0:
561          log.info( 'FILE DATE BOUNDARIES DO NOT MATCH ASSUMPTIONS: %s, %s' % (expt,var) )
562    elif batching in ['5yr','yr']:
563      startstart = fm[0][1]
564      endend = fm[-1][2]
565      if fullYearNeeded:
566        if fm[0][3] != 1:
567          startstart += 1
568        if fm[-1][4] != 12:
569          endend += -1
570      elif fullYearWanted:
571        if fm[0][3] > 3:
572          startstart += 1
573        if fm[-1][4] < 10:
574          endend += -1
575
576      blen = { 'yr':1, '5yr':5 }[batching]
577      if batching == '5yr':
578        bstart = ((startstart+3)/5)*5 + 1
579        bslast = ((endend)/5 - 1)*5 + 1
580      elif batching == 'yr':
581        bstart = startstart
582        bslast = endend
583      nb = (bslast - bstart)/blen + 1
584
585      for s in range( bstart, bslast+1, blen ):
586        tfm = []
587        kk += 1
588        for f in fm:
589          for y in range( s,s+blen):
590            if f not in tfm and (f[1] == None or ( f[1] <= y and f[2] >= y )):
591              tfm.append(f)
592        assert len(tfm) <= 2 or blen > 1, 'Not expecting more than 2 elements: %s' % str(tfm)
593        if len(tfm) == 0:
594          print 'No files found in segment %s, %s' % (s,s+blen)
595          print expt,var
596          print fm
597          raise 'logic error somewhere'
598        ofile = ' %sofile_%s_%s_%s.nc' % (tmpdir,expt,var,kk)
599        ofiles[key] += ' %s' % (ofile)
600        ylist = string.join( map( str, range( s,s+blen) ), ',' )
601        this_cdoop = cdoop
602        if not ( tfm[0][1] == None or tfm[0][1] < s or ( tfm[0][1] == s and tfm[0][3] == 1) ):
603          error = fullYearNeeded or ( fullYearWanted and not ( tfm[0][1] == s and tfm[0][3] <= 3) )
604          assert not error, 'Data does not start soon enough, %s, %s' % (str(tfm), s )
605          log.warning( 'WARN001: Incomplete climatology: %s, %s, %s' % (s,blen,str(tfm)) )
606
607        if not ( tfm[-1][2] == None or tfm[-1][2] > s + blen -1 or (tfm[-1][2] == s + blen -1 and tfm[-1][4] == 12) ):
608          error = fullYearNeeded or ( fullYearWanted and not ( tfm[-1][2] == s + blen -1 and tfm[-1][4] >= 10) )
609          assert not error, 'Data does not end late enough, %s, %s' % (str(tfm), s )
610          log.warning( 'WARN002: Incomplete climatology: %s, %s, %s' % (s,blen,str(tfm)) )
611
612        for k in range(len(tfm)-1):
613            log.info( 'INFO0100: ',k, str(k), str( [1,2,3] ) )
614            log.info( 'INFO0101: ',k, tfm[k], tfm[k+1] )
615            a = str(tfm[k])
616            b = str(tfm[k+1])
617            testRes = ( tfm[k][2] == tfm[k+1][1] and tfm[k][4] == tfm[k+1][3] -1 ) or ( tfm[k][2] == tfm[k+1][1] - 1 and tfm[k][4] == 12 and tfm[k+1][3] == 1 )
618            if not testRes:
619              assert optag in ['cdd','cwd'], 'Lack of continuity in batch, %s, %s:: %s, %s' % (ofile,k,a,b)
620              log.warning( 'WARN003: Lack of continuity in batch, %s, %s:: %s, %s' % (ofile,k,a,b) )
621       
622        if optag in ['cdd','cwd']:
623          ##this_cdoop = 'settaxis,%s-01-01,12:00,1year -%s -mulc,86400' % (startstart, cdoop )
624          this_cdoop = '%s -mulc,86400' % (cdoop )
625        log.info( '[x1]: len tfm: %s' % (len(tfm)) )
626        if len(tfm) == 0:
627          print 'No files found for year %s ' % y
628        elif len(tfm) == 1:
629          cmd = '/usr/local/2/bin/cdo -s %s -selyear,%s %s%s.nc %s' % (this_cdoop,ylist,edir,tfm[0][0],ofile)
630          if go:
631            rc = run_cmd( cmd )
632            if debug:
633              log.info( 'Trying to log file maximum value' )
634              cmdo = 'echo %s%s' % (edir,tfm[0][0])
635              cmd = '/usr/local/2/bin/cdo info %s' % ofile
636              rc = run_cmd( cmdo, append='/home/users/prototype/cmip_processing/wrk/cdd1/debug.txt' )
637              rc = run_cmd( cmd, append='/home/users/prototype/cmip_processing/wrk/cdd1/debug.txt' )
638          else:
639            print 'single: %s' % cmd
640        elif len(tfm) == 2:
641          yl1 = []
642          yl2 = []
643          for y in range( tfm[0][1],tfm[0][2]+1):
644            if y in range( s,s+5):
645              yl1.append(y)
646          for y in range( tfm[1][1],tfm[1][2]+1):
647            if y in range( s,s+5):
648              yl2.append(y)
649          cyl1 = string.join( map( str, yl1 ), ',' )
650          cyl2 = string.join( map( str, yl2 ), ',' )
651          tfile = '%s/.tmp_ofile_%s.nc' % (tmpdir,s)
652          cmd1 = '/usr/local/2/bin/cdo -s mergetime -selyear,%s %s%s.nc -selyear,%s %s%s.nc %s' % (cyl1,edir,tfm[0][0], cyl2,edir,tfm[1][0],tfile)
653          cmd2 = '/usr/local/2/bin/cdo -s %s %s %s' % (this_cdoop,tfile,ofile)
654          if go:
655            if os.path.isfile( tfile ):
656              os.unlink( tfile )
657            rc = run_cmd( cmd1 )
658            rc = run_cmd( cmd2 )
659            if cleanUpAsUGo:
660              ## short pause to make sure cmd has really finished
661              time.sleep( 1 )
662              os.unlink( tfile )
663          else:
664            print 'double: %s\n     %s' % (cmd1,cmd2)
665        elif tfm[0][3] == 1 and tfm[-1][4] == 12:
666          tfile = '%s/.tmp_ofile_%s.nc' % (tmpdir,s)
667          cmd1 = '/usr/local/2/bin/cdo -s mergetime  %s %s' % (string.join( map(lambda x:edir + x[0] + '.nc', tfm ), ' '),tfile)
668          cmd2 = '/usr/local/2/bin/cdo -s %s %s %s' % (this_cdoop,tfile,ofile)
669          if go:
670            if os.path.isfile( tfile ):
671              os.unlink( tfile )
672            rc = run_cmd( cmd1 )
673            rc = run_cmd( cmd2 )
674            if cleanUpAsUGo:
675              ## short pause to make sure cmd has really finished
676              time.sleep( 1 )
677              os.unlink( tfile )
678          else:
679            print 'multi1: %s\n     %s' % (cmd1,cmd2)
680        else:
681          raise 'Code missing'
682    return tid, crd, table_id, realization, initialization_method, physics_version, startstart,calendar
683
684
685for expt,var,fm in ye:
686    tid, crd, table_id, realization, initialization_method, physics_version, startstart,calendar = run_evf( expt,var,fm )
687
688def proc_expt(expt,version,optag):
689    attfile = '%s/temp_attribute_file.txt' % tmpdir
690    varl = varld[expt]
691    ## for var in varld[expt]:
692    for var in varl:
693      if optag in ['am', 'sm']:
694        if var == 'tasmax':
695          cmval = "time: maximum within days time: mean over days"
696        elif var == 'tasmin':
697          cmval = "time: minimum within days time: mean over days"
698        else:
699          cmval = "time: mean"
700      else:
701        assert True, 'Need to set cell_methods for this operator'
702
703      key = '%s_%s' % (expt,var)
704      if key in badkeys:
705        log.info( 'Skipping %s because or earlier error' % key )
706      elif string.strip( ofiles[key] ) == '':
707        log.info( 'Skipping %s because no output files registered' % key  )
708        log.warning( 'WARN004: unexplained missing output file for key %s' % (key) )
709      else:
710        log.info( 'Trying %s' % key )
711        dir = get_version_dir( version=version)
712    ##    vdir = dir_tmpl % locals()
713        vdir = '%s/%s/' % (dir,var)
714        ovar = var
715        if optag in ['cdd','cwd']:
716          precip_crit = 1.
717          ovar = '%s%i' % (optag, int( precip_crit*10 + 0.5 ) )
718        if optag == 'tr':
719          tasmin_crit = 20.
720          ovar = 'tr%i' % int( tasmin_crit + 0.5 )
721        ofilename = '%s_%s_%s_%s_%s_%s.nc' % (ovar,ofreq,model,expt,ensem,optag)
722        if os.path.isfile( ofilename ):
723           os.unlink( ofilename )
724        tid = ''
725        for k in range(36):
726          tid += cx[random.randint(0,len(cx)-1 )]
727        this_mip  = string.split( table_id )[1]
728        version = string.split( string.strip( dir, '/' ), '/' )[-1]
729        esgf_dsid = '%s.%s.%s.%s.%s.%s.%s.%s.%s.%s' % (project,product,inst,expt,model,ifreq,realm,this_mip,ensem,version)
730##cmip5.output1.MIROC.MIROC5.sstClim.monClim.atmos.Amon.r1i1p1.v20111129.xml
731        oo = open( attfile, 'w' )
732        kkey = '%s_%s' % (expt,var)
733        oo.write( '%s %s\n' % ('proc_institute ', 'British Atmospheric Data Centre' ) )
734        oo.write( '%s %s\n' % ('proc_institute_id ', 'BADC' ) )
735        oo.write( '%s %s\n' % ('proc_machine ', 'cmip-login.badc.rl.ac.uk' ) )
736        oo.write( '%s %s\n' % ('proc_contact ', 'badc@rl.ac.uk, martin.juckes@stfc.ac.uk' ) )
737        oo.write( '%s %s\n' % ('proc_description ', proc_description ) )
738        oo.write( '%s %s\n' % ('creation_date ', creation_date ) )
739        oo.write( '%s %s\n' % ('proc_version ', 'v%s' % vstring ) )
740        oo.write( '%s %s\n' % ('script_version ', '%5.2f' % scriptVersion ) )
741        oo.write( '%s %s\n' % ('operation ', opName ) )
742        oo.write( '%s %s\n' % ('operation_id ', optag ) )
743        oo.write( '%s %s\n' % ('operation_qualifiers ', ' ' ) )
744        oo.write( '%s %s\n' % ('input_frequency ', ifreq ) )
745        oo.write( '%s %s\n' % ('input_esgf_dsid ', esgf_dsid ) )
746        oo.write( '%s %s\n' % ('model_id ', model ) )
747        oo.write( '%s %s\n' % ('experiment_id ', expt ) )
748        oo.write( '%s %s\n' % ('institute_id ', inst ) )
749        oo.write( '%s %s\n' % ('table_id ', table_id ) )
750        oo.write( '%s %s\n' % ('initialization_method ', initialization_method ) )
751        oo.write( '%s %s\n' % ('physics_version ', physics_version ) )
752        oo.write( '%s %s\n' % ('realization ', realization ) )
753        oo.write( '%s %s\n' % ('frequency ', ofreq ) )
754        oo.write( '%s %s\n' % ('modeling_realm ', realm ) )
755        oo.write( '%s %s\n' % ('input_variable ', var ) )
756        oo.write( '%s %s\n' % ('product ', 'derived' ) )
757        oo.write( '%s %s\n' % ('input_dir ', vdir ) )
758        oo.write( '%s %s\n' % ('plurality ', plurality ) )
759        oo.write( '%s %s\n' % ('proc_code_repository_url ', 'http://proj.badc.rl.ac.uk/svn/exarch/CedaCdoScripts' ) )
760        oo.write( '%s %s\n' % ('proc_code_home_url ', 'http://proj.badc.rl.ac.uk/exarch/wiki/PackageRBPS' ) )
761        oo.write( '%s %s\n' % ('proc_method_url ', '%s/documentation/%s/%s' % (mdbase, plurality, doc_filename)  ) )
762        oo.write( '%s %s\n' % ('proc_log_url ', '%s/logfiles/%s/%s/%s' % (mdbase, plurality, optag, log_filename)  ) )
763        oo.write( '%s %s\n' % ('proc_batch ', processing_batch  ) )
764        if optag in ['cdd','cwd']:
765          oo.write( '%s %s\n' % ('proc_pars','precip_crit' ) )
766          oo.write( '%s %s\n' % ('precip_crit',precip_crit) )
767        if optag == 'tr':
768          oo.write( '%s %s\n' % ('proc_pars','tasmin_crit' ) )
769          oo.write( '%s %s\n' % ('tasmin_crit',tasmin_crit) )
770
771        parts = string.split( ifiles[kkey], ',' )
772        if len( parts) > 1:
773          bits = string.split( parts[0], '_' )
774          stem = string.join( bits[0:-1], '_' )
775          dat = string.split( bits[-1], '.' )[-1]
776          attr = '%s_%%(date)s.nc [%s' % (stem,dat)
777          for p in parts[1:]:
778            bits = string.split( p, '_' )
779            dat = string.split( bits[-1], '.' )[-1]
780            attr += ',' + dat
781          attr += ']'
782          writeat(oo, 'input_files ', attr, log )
783        else:
784          writeat(oo, 'input_files ', ifiles[kkey], log )
785
786        if len( tids[kkey] ) > 511:
787           log.warning( 'WARN003: truncating tid list for %s\n%s' % (ofilename, tids[kkey]) )
788           tids[kkey] = tids[kkey][0:500] + '.....'
789
790        oo.write( '%s %s\n' % ('input_tids ', tids[kkey]) )
791        oo.write( '%s %s\n' % ('input_creation_date ', crds[kkey]) )
792        oo.write( '%s %s\n' % ('tracking_id ', tid) )
793        writeat( oo, 'history ', '%s; %s' % (string.replace(hists[kkey],'\n',';;'), '%s: %s: %s' % (creation_date, 'CDO script', proc_description)), log )
794        oo.close()
795        tmpofilename = tmpdir + '.cdo_basic_temp.nc'
796        if os.path.isfile( tmpofilename ):
797           os.unlink( tmpofilename )
798        if len( string.split( ofiles[kkey] ) ) > 1:
799          ##cdoc = 'settaxis,%s-01-01,12:00,365day -mergetime' % startstart
800
801          cmd = '/usr/local/2/bin/cdo -s mergetime %s %s' % (ofiles[kkey],tmpofilename)
802          rc = run_cmd( cmd )
803          cdo_extra = ''
804          if optag in ['cdd','cwd']:
805            varTag = { 'cdd':'dry','cwd':'wet' }[optag]
806            cdo_extra = '-chname,consecutive_%s_days_index_per_time_period,%s' % (varTag,ovar)
807          ##if optag in ['cdd', 'tr']:
808            ##cdo_extra = '-settaxis,%s-01-01,12:00,1year' % startstart
809## the desired outcome does not appear to be achievable with cdo
810          assert os.path.isfile( tmpofilename ),'File "%s" not found [1], kkey = %s' % (tmpofilename,kkey)
811          cmd = '/usr/local/2/bin/cdo -s setgatts,%s %s %s %s/%s' % (attfile,cdo_extra,tmpofilename,opdir,ofilename)
812          rc = run_cmd( cmd )
813
814          if optag in ['cdd','cwd']:
815            import ncExtras
816            thresholdVname = "prthreshold"
817            precip_critMetre = precip_crit*0.001
818            cmdL = ["--time", '%s,"days since %s-01-01",%s,1,years' % (calendar,startstart,-1), '--cb', 'climBounds', \
819                    "--sc", "%s,%s,standard_name=lwe_thickness_of_precipitation_amount;units=m" % (thresholdVname,precip_critMetre), \
820                    '%s/%s' % (opdir,ofilename), 'ncExtras.nc']
821            log.info( str(cmdL ) )
822            ncExtras.run( cmdL )
823## also need a lwe_thickness_of_precipitation_amount variable (units "m") specifying the threshold.
824## "--sc prthreshold,0.01,standard_name=lwe_thickness_of_precipitation_amount;units=m"
825## -A: forces the new variables to be added to existing file
826## -C: prevents ancilliary variables (e.g. associated with the bounds dimension) from being added in
827##
828            cmd = '/usr/local/2/bin/ncks -A -C -v time,climBounds,%s ncExtras.nc %s/%s' % (thresholdVname,opdir,ofilename)
829            log.info( 'xxx: %s' % cmd )
830            if optag == 'cwd':
831              reln = 'above'
832              ancvar = 'number_of_cwd_periods_with_more_than_5days_per_time_period'
833            else:
834              reln = 'below'
835              ancvar = 'number_of_cdd_periods_with_more_than_5days_per_time_period'
836            cmval = 'time: mean within days time: maximum over days'
837            attedExtras = ['-a','standard_name,%s,o,c,spell_length_of_days_with_lwe_thickness_of_precipitation_amount_%s_threshold' % (ovar,reln), \
838                           '-a','units,%s,o,c,1' % (ancvar), \
839                           '-a','units,%s,o,c,days' % (ovar), \
840                           '-a','coordinates,%s,o,c,%s' % (ovar,thresholdVname)]
841### following the example 7.11 of CF Conventions 1.6. Note that "maximum over days" is not very precise -- but "maximum over spells" is not allowed.
842            rc = run_cmd( cmd )
843           
844        else:
845          assert os.path.isfile( string.strip(ofiles[kkey]) ), 'File "%s" not found [2], kkey = %s' % (ofiles[kkey], kkey)
846          cmd = '/usr/local/2/bin/cdo -s setgatts,%s %s %s/%s' % (attfile,ofiles[kkey],opdir,ofilename)
847          log.info( 'Command: %s\n' %  cmd )
848          rc = run_cmd( cmd )
849
850        ###cmd = '/usr/local/2/bin/ncatted -a cell_methods,%s,o,c,%s %s' % (var,cmval,ofilename)
851        cmd = ['/usr/local/2/bin/ncatted','-a','cell_methods,%s,o,c,%s' % (ovar,cmval)]
852        for c in attedExtras:
853          cmd.append(c)
854        cmd.append('%s/%s' % (opdir,ofilename))
855        rc = run_cmd( cmd )
856
857def close_all():
858  log.info( 'Script completed' )
859  fh.close()
860  rcout.close()
861  rcerr.close()
862  for k in eeproc.keys():
863    sh[procid][k] = eeproc[k]
864  sh.close()
865
866if go:
867  log.info( 'Merging files' )
868  for expt in expts:
869    proc_expt(expt,version,optag)
870
871close_all()
872
873os.popen( 'rm %scdo_basic_lock' % tmpdir ).readlines()
Note: See TracBrowser for help on using the repository browser.