source: CMIP6dreqbuild/trunk/src/framework/ingest/util_checkUpd.py @ 713

Subversion URL: http://proj.badc.rl.ac.uk/svn/exarch/CMIP6dreqbuild/trunk/src/framework/ingest/util_checkUpd.py@713
Revision 713, 18.3 KB checked in by mjuckes, 5 years ago (diff)

New ingestion pathway

Line 
1
2##
3## this will check for changes cleanly ... need a better view of what dreq_consol_tables is doing before this can be used.
4
5import os, stat, shelve, uuid, collections
6import dreq_cfg
7import dreq_utils
8from utils_wb import workbook, uniCleanFunc
9import utils
10
11vl3 = None
12nt__grphd = collections.namedtuple( 'grphd', ['withPriority','thisl', 'iv', 'it','start','tv'] )
13nt__grptbl = collections.namedtuple( 'grptbl', ['grp','var','srcTable','freq','description','shape','levels','timeProc','mask','priority','mip','id'] )
14
15class maps(object):
16  ttranstem  = {'icesheetmon':'LImon','icesheetyear':'LIyr','icesheetfx':'LIfx'}
17
18logFarm = utils.dreqLog('logs')
19log = logFarm.getLog( 'soUpd' )
20
21def getCmip5Sns():
22    fname = '/data/work/documents/CMIP5_standard_output.xls'
23    wb = workbook( fname )
24    wb.sns.sort()
25    omit1 = [u'dims', u'general', u'other output',u'CFMIP output']
26    sns = []
27    for s in wb.sns:
28      if s not in omit1:
29        sns.append(s)
30
31    return sns
32
33def rval(x):
34  if x.ctype == 1:
35    return x.value.strip()
36  else:
37    return x.value
38
39def cleanStr(x):
40  if x.ctype == 1:
41    return uniCleanFunc(x.value).strip()
42  else:
43    return str(x.value)
44
45def getRowValues( ll, minLen=0, maxLen=0):
46  oo = []
47  for i in ll:
48    oo.append( i.value )
49  if len(oo) >= minLen:
50    return oo[:minLen]
51  for i in range(minLen+1):
52    if len(oo) == minLen:
53      return oo
54    oo.append( '' )
55  if maxLen > 0:
56    return oo[:maxLen]
57  return oo
58
59class templateStat(object):
60  ibase = '/home/martin/2014/wip/dreq/input'
61  def __init__(self,sdir='inSh'):
62    cfg = dreq_cfg.rqcfg()
63    cfg.ff['CMIP5'] = ['/data/work/documents/CMIP5_standard_output.xls']
64    self.changed = []
65    self.new = []
66
67    k2 = cfg.ff.keys()
68    self.sh = shelve.open( '%s/sh__templateStat' % (sdir) )
69    for k in sorted( cfg.ee.keys() ):
70      self.verify( k,[cfg.ee[k],] )
71    for k in sorted( cfg.ff.keys() ):
72      self.verify( k,cfg.ff[k] )
73    self.sh.close()
74
75  def verify( self, k, fl ):
76    for f in fl:
77      fpath = '%s/%s/%s' % (self.ibase,k,f)
78      if not os.path.isfile( fpath ):
79        self.msg( 'ERROR.001.0001: file not found: %s' % fpath )
80        if fpath in self.sh:
81          self.msg( 'ERROR.001.0002: file lost or moved: %s' % fpath )
82      else:
83        stt = os.stat( fpath )
84        self.ctime = stt[stat.ST_CTIME]
85        self.fsize = stt[stat.ST_SIZE]
86        if fpath in self.sh:
87          fhist = self.sh[fpath]
88          if fhist[-1] != (self.ctime,self.fsize):
89            self.msg( 'INFO.001.0002: CHANGED: %s: %s (%s)' % (fpath,self.ctime,self.fsize) )
90            fhist.append( (self.ctime,self.fsize) )
91            self.sh[fpath] = fhist
92            self.changed.append( (k,fpath,self.ctime,self.fsize) )
93          else:
94            self.msg( 'INFO.001.0001: SAME: %s: %s (%s)' % (fpath,self.ctime,self.fsize) )
95        else:
96          self.msg( 'INFO.001.0003: NEW: %s: %s (%s)' % (fpath,self.ctime,self.fsize) )
97          self.new.append( (k,fpath,self.ctime,self.fsize) )
98          self.sh[fpath] = [(self.ctime,self.fsize)]
99
100  def msg(self, txt ):
101    print txt
102
103class varGroups(object):
104  rq = dreq_cfg.rqcfg()
105  freqmap = {'daily':'day', 'Annual':'yr', 'Timestep':'subhr',  '1day':'day', '1mon':'mon', 
106             'month':'mon', 'year':'yr', 'monthly':'mon', 'Day':'day', '6h':'6hr', 
107             '3 hourly':'3hr', '3 Hourly':'3hr'  }
108
109  def __init__(self):
110
111    omit = ['ALL VARIABLES', 'Objectives','Experiments','Experiment Groups','Request scoping','New variables','__lists__']
112    self.cmip5sns = getCmip5Sns()
113    keys = sorted( self.rq.ee.keys() )
114    vdate = '20160309'
115##
116## this file has reference uids ... should be redundant ....
117##
118    vkdir = '/data/tmp/svn3/exarch/CMIP6dreqbuild/trunk/srcMisc'
119    self.vark = dreq_utils.varKeys( '%s/dreq_consol_tables_shelve_v%s' % (vkdir,vdate))
120    ee = {}
121    self.nvd = {}
122    self.idx = 0
123    self.shCols = ['Short name of group', 'Variable short name', 'Table', 'Frequency', 'Description extension (optional)', 'Shape', 'Levels', 'Time mean, point or climatology', 'Mask (optional)', 'Priority', 'MIP','uid','Prev. Var Name']
124##
125## array for information and debugging
126##
127    for mip in keys:
128        self.mip = mip
129        self.actions = collections.defaultdict( int )
130        self.shInfo = {'prov': '%s request template' % mip, 'label': mip, 'title': '%s request'}
131        fn = self.rq.ee[mip]
132        path = '%s%s/%s' % (self.rq.dir0,mip,fn)
133        self.path = path
134        stt = os.stat( path )
135        self.ctime = stt[stat.ST_CTIME]
136        self.fsize = stt[stat.ST_SIZE]
137        wb = workbook( path )
138        ss = []
139        for s in wb.sns:
140          if s not in omit:
141            ss.append(s)
142        self.sh = shelve.open( 'inSh/sh__grp_%s' % mip, 'n' )
143        self.sh['__cols__'] = self.shCols
144        self.sh['__info__'] = self.shInfo
145        self.sh['__src__'] = {'path':path, 'size':self.fsize, 'time':self.ctime}
146        ghr = None
147        for s in ss:
148          if s[:5] != 'CMIP5':
149           sh = wb.book.sheet_by_name( s )
150##
151## parse headers of sheet into named tuple "gh"
152##
153           self.gh = self.parseGrpHead(sh,path,mip,s)
154           assert ghr == None or ghr[:5] == self.gh[:5], 'Multiple sheet structures in single workbook .. NOT SUPPORTED: %s\n%s\n%s' % (path,self.gh,ghr)
155           ghr = self.gh
156           self.scanSheet( sh, self.gh, path, mip, s )
157        ee = {}
158        for k in ['withPriority', 'thisl', 'iv', 'it', 'start']:
159          ee = self.gh.__dict__[k]
160        self.sh['__recStr__'] =  ee
161        self.sh.close()
162
163        if 'New variables' in wb.sns:
164           sh = wb.book.sheet_by_name( 'New variables' )
165        self.sh = shelve.open( 'inSh/sh__newVar_%s' % mip, 'n' )
166        self.sh['__cols__'] = ['Short name', 'CF standard_name', 'standard name status', 'Native grid', 'units', 'Long Name', 'description/comments', 'Priority', 'associated observational dataset']
167        self.scanVars(sh)
168
169        self.sh.close()
170
171        for k in sorted( self.actions.keys() ):
172          print 'ACTIONS [%s]: %s (%s)' % (mip,k,self.actions[k])
173
174  def scanVars( self, sh ):
175     rh = [str(x.value) for x in sh.row(2)]
176     mode = 'nominal'
177
178     lumip_variant = ['Short name', 'CF proposed standard name for variable separated by land use type', 'CF standard_name', 'standard name status', 'Native grid', 'units', 'Long Name', 'description/comments', 'Priority', 'associated observational dataset']
179     if len(rh) >=10 and rh[:10] == lumip_variant:
180       mode = 'lumip'
181     elif len(rh) < 9 or rh[:9] != self.sh['__cols__']:
182        print 'ERROR .. change in column headings %s:\n ********** %s\n +++++++++++ %s' % (self.path,str(rh),str( self.sh['__cols__']) )
183
184     for i in range(3,sh.nrows):
185        r = sh.row(i)
186        if r[0].value == "**end**":
187          break
188
189        v = str(r[0].value)
190        l = r[4].value
191        novar = v == '' and l == ''
192        if not novar:
193          rr = [x.value for x in r]
194          assert v not in self.sh, 'duplicate variable definition %s in %s .. %s' % (v,self.path, str(r))
195          nbl = 0
196          for x in rr:
197            if x != '':
198              nbl += 1
199          if nbl < 3:
200            self.actions['Skipping mainly blank new variable record'] += 1
201          else:
202            if v.find('_') != -1:
203              v = v.replace( '_', '')
204              self.actions['Replacing underscore in new variable name'] += 1
205            if mode == 'lumip':
206              rr = rr[0:1] + rr[2:]
207              self.actions['LUMIP new variable adjustment'] += 1
208            self.sh[v] = rr
209
210  def scanSheet(self, sh, gh, path, k, s ):
211       """Skips efforts to find a priority which were incorporated into the scan in dreq_consol_tables"""
212       irsh = 5
213       for i in range(gh.start,sh.nrows):
214          rowIndex = i
215          thisr = sh.row(i)
216          v0 = str( thisr[0].value ) + '__'
217          if v0[0] != '#':
218             lll = getRowValues( thisr, minLen=gh.thisl, maxLen=gh.thisl )
219             if gh.iv == 0:
220                lll = getRowValues( thisr, minLen=gh.thisl, maxLen=gh.thisl )
221                lll[1] = lll[0]
222                lll[0] = gh.tv
223             else:
224                assert gh.iv == 1, 'gh.iv should be 0 or 1: %s' % gh.iv
225
226             if gh.thisl == 9:
227               lll.append( 105 )
228
229             assert len(lll) == 10,'bad record length ....'
230###
231### add mip name and space ...
232###
233             lll += [k,'']
234
235             self.ntr = nt__grptbl._make( lll )
236             if lll[2].find( '(' ) != -1:
237               bb = [x.strip() for x in lll[2].split( '(' )]
238               lll[2] = bb[0]
239               oldv = bb[1][:-1]
240             else:
241               oldv = ''
242##
243             if lll[0] in ['icesheetmon','icesheetyear','icesheetfx']:
244               otab = lll[0]
245               oshp = lll[5]
246##
247##  reduce doubly specified variables to single
248##
249               if lll[4] == 'Greenland':
250                 self.actions['skip Greenland'] += 1
251                 break
252               if lll[4] == 'Antarctica':
253                 lll[4] = ''
254##
255##  create two copies of all variables
256##
257               for targ in ['ant','gre']:
258                 lll[0] = maps.ttranstem[otab] + targ
259                 ku = self.vark.lookuprec( lll )
260                 lll[11] = ku
261                 lll[5] = oshp + targ
262                 self.saveRec( ku, lll + [oldv,rowIndex,] )
263               self.actions['duplicate ant/gre record'] += 1
264             else:
265               ku = self.vark.lookuprec( lll )
266               lll[11] = ku
267               self.saveRec( ku, lll + [oldv,rowIndex,] )
268
269  def saveRec( self, key, rr ):
270     
271       nbl = 0
272       for x in rr:
273           if x != '':
274             nbl += 1
275##
276## 3 non-blank elements added above ...
277##
278       if nbl < 5:
279         self.actions['skipped mainly blank record %s' % self.gh.tv] += 1
280       else:
281         freq = self.freqmap.get( rr[3], rr[3] )
282         if freq != rr[3]:
283           self.actions['frequency map'] += 1
284           rr[3] = freq
285         if freq == '':
286           self.actions['blank frequency found in table %s' % self.gh.tv] += 1
287         grp = rr[0]
288         tbl = rr[2]
289         if grp == '':
290           print 'ERROR .. blank group: %s.%s:' % (self.mip,self.gh.tv), rr
291         if tbl == '':
292           print 'ERROR .. blank table: %s.%s:' % (self.mip,self.gh.tv), rr
293         if tbl in ['New Variables','NEW']:
294           rr[2] = 'new'
295           self.actions['table map'] += 1
296         var = rr[1]
297         if var.find('_') != -1:
298           rr[1] = var.replace('_','')
299           self.actions['underscore in variable name removed'] += 1
300         self.sh[key] = tuple( rr )
301
302  def parseGrpHead(self,sh,path,k,s):
303    rh1 = ['Short name', 'Standard Name', 'Table', 'Frequency', 'Description extension (optional)', 'Shape', 'Levels', 'Time mean, point or climatology', 'Mask (optional)']
304    rh2 = ['Short name of group', 'Variable short name', 'Table', 'Frequency', 'Description extension (optional)', 'Shape', 'Levels', 'Time mean, point or climatology', 'Mask (optional)']
305
306##
307## initial loop over rows in variable group sheet
308##
309    ll = []
310    for i in range(sh.nrows):
311             thisr = sh.row(i)
312             tv = thisr[0].value
313             if tv[:10] == 'Short name':
314               ll.append(i)
315
316    assert len(ll) in [1,2], 'Could not parse sheet  %s, %s, %s: %s' % (path,k,s,len(ll))
317    withPriority = False
318    hr = sh.row( ll[-1] )
319    if len(ll) == 1:
320             iv = 1
321             it = 0
322             ok = len( hr ) >= 9 and all( map( lambda x: hr[x].value.strip() == rh2[x], range(9) ) )
323             assert ok, '001: Sheet heading not recognised: %s' % str(hr)
324             if len(hr) > 9 and hr[9].value == u'Priority':
325               withPriority = True
326               thisl = 10
327             else:
328               thisl = 9
329             tv = s
330    else:
331             ok = len( hr ) >= 9 and all( map( lambda x: hr[x].value.strip() == rh1[x], range(9) ) )
332             assert ok, '002: Sheet heading not recognised: %s' % str(hr)
333             iv = 0
334             it = -1
335             tv = sh.row(2)[1].value
336             thisl = 9
337    if tv == '':
338       print 'ERROR: blank group encountered: %s, %s' % (self.mip,s)
339    return nt__grphd( withPriority, thisl, iv, it, ll[-1]+1, tv)
340
341class stdo(object):
342  rq = dreq_cfg.rqcfg()
343
344  def __init__(self):
345    mips = ['OMIP','CFMIP']
346    self.rq.ff['CMIP5'] = ['/data/work/documents/CMIP5_standard_output.xls']
347    mips = self.rq.ff.keys()
348    ##taboi = {'OMIP':['fx','Oyr','Omon','Oday','day'], 'CFMIP':['cfMon','cfOff', 'cfDay', 'cf3hr', 'cfSites'] }
349    for mip in mips:
350     for fn in self.rq.ff[mip]:
351
352## look for names of subsections, for OMIP.
353      if fn in self.rq.fff:
354        self.sss = self.rq.fff[fn]
355      else:
356        self.sss = mip
357
358      self.shInfo = {'prov': '%s standard_output' % mip, 'label': mip, 'title': '%s reviewed CMIP5 tables'}
359      self.shCols = ['priority', 'long name', 'units', 'comment', 'questions & notes', 'output variable name', 'CF Standard name', 'unconfirmed or proposed standard name', 'unformatted units', 'cell_methods', 'valid min', 'valid max', 'mean absolute min', 'mean absolute max', 'positive', 'type', 'CMOR dimensions', 'CMOR variable name', 'realm', 'frequency', 'cell_measures', 'flag_values', 'flag_meanings','table','section','row']
360
361      if fn[0] == '/':
362        self.fpath = fn
363      else:
364        self.fpath = '%s%s/%s' % (self.rq.dir0,mip,fn)
365
366      log.info( 'STARTING: %s' % self.fpath ) 
367
368      wb = workbook( self.fpath )
369      self.sh = shelve.open( 'inSh/sh__so_%s' % mip, 'n' )
370      self.sh['__cols__'] = self.shCols
371      self.sh['__info__'] = self.shInfo
372      stt = os.stat( self.fpath )
373      self.ctime = stt[stat.ST_CTIME]
374      self.fsize = stt[stat.ST_SIZE]
375      self.sh['__src__'] = {'path':self.fpath, 'size':self.fsize, 'time':self.ctime}
376      for sn in wb.sns:
377        if sn not in ['general','dims','other output','CFMIP output']:
378          sht = wb.book.sheet_by_name( sn )
379          self.scanSheet( sht, mip, sn)
380      self.sh.close()
381
382  def scanSheet(self,sht,mip,sname):
383      if mip == 'OMIP':
384             snn = {'day':'Oday', 'fx':'Ofx'}.get( sname, sname )
385      else:
386             snn = sname
387      kl = []
388      for k in range(min(40,sht.nrows)):
389        if sht.row(k)[0].value == u'priority':
390          kl.append(k)
391         
392      if len(kl) == 0:
393        kk = 0
394        while len(sht.row(kk)) == 0 or sht.row(kk)[0].ctype not in (2,3):
395          kk += 1
396          assert kk < 40, 'No start found in %s %s %s' % (self.fpath,sname,str(kl))
397      else:
398        kk = kl[0] + 1
399
400      ##assert len(kl) >= 1, 'No start found in %s %s %s' % (self.fpath,sname,str(kl))
401      vl = []
402      s1 = set()
403      for k in range(kk,sht.nrows):
404        ok = True
405        rr  = sht.row(k)
406        rrr = [cleanStr(x) for x in rr]
407        for i in range( len(self.shCols) - len( rr) ):
408          rrr.append( '' )
409
410        ll = [x in ['','0.0'] for x in rrr]
411        if all( ll[1:6] ) and  all( ll[7:12] ):
412          if not all(ll):
413            log.info( 'INFO.skip.0001: %s:%s[%s]: skipping: %s' % (mip,sname,k,str(rrr)) )
414        else:
415            if rr[17].ctype == 1:
416                  vr = rrr[17]
417            else:
418                  vr = rrr[5]
419            v1 = rrr[5]
420            rrr[17] = vr
421
422            f1 = v1 in ['','0.0']
423            f2 = vr in ['','0.0']
424            if f1 or f2:
425              log.error( 'ERROR: no variable found: %s' % str(rrr))
426              ok = False
427            v1 = rrr[2]
428            v2 = rrr[8]
429            f1 = v1 in ['','0.0']
430            f2 = v2 in ['','0.0']
431            if f1 and f2:
432              log.error( 'ERROR: no units found: %s' % str(rrr))
433              ok = False
434            elif f1:
435              rrr[2] = v2
436
437            if ok:
438              rrr += [snn,self.sss,str(k) ]
439              self.sh[ str( uuid.uuid1() ) ] = rrr[:]
440              vl.append( rr[0].value )
441              if rr[0].ctype == 2 and int(rr[0].value) in [1,2,3]:
442                r = map( rval, rr )
443
444class requestScope(object):
445  rq = dreq_cfg.rqcfg()
446
447  def __init__(self,sdir='inSh'):
448
449    keys = sorted( self.rq.ee.keys() )
450##
451    ee = {}
452    self.shCols = ['Short name of group', 'Variable short name', 'Table', 'Frequency', 'Description extension (optional)', 'Shape', 'Levels', 'Time mean, point or climatology', 'Mask (optional)', 'Priority', 'MIP','uid','Prev. Var Name']
453##
454## array for information and debugging
455##
456    self.pr4 = dreq_utils.pr4()
457 
458    eh = {}
459    er = {}
460    for mip in keys:
461        self.mip = mip
462        self.actions = collections.defaultdict( int )
463        self.shInfo = {'prov': '%s request template, request scoping sheet' % mip, 'label': mip, 'title': '%s request'}
464        fn = self.rq.ee[mip]
465        path = '%s%s/%s' % (self.rq.dir0,mip,fn)
466        self.path = path
467        stt = os.stat( path )
468        self.ctime = stt[stat.ST_CTIME]
469        self.fsize = stt[stat.ST_SIZE]
470        wb = workbook( path )
471        if 'Request scoping' in wb.sns:
472          sh = wb.book.sheet_by_name( 'Request scoping' )
473          r4 = map( lambda x: x.value, sh.row(3) )
474          self.pr4.parse( self.mip, r4 )
475          eh[mip] = list( self.pr4.r4info )
476          rl = self.scanSheet( sh, self.pr4.r4info )
477          er[mip] = rl
478        else:
479          print 'Skipping ... no request scoping sheet: ',mip
480    sh = shelve.open( '%s/sh__requestScoping' % sdir )
481    sh['__headings__'] = eh
482    sh['__records__'] = er
483    sh.close()
484
485  def scanSheet( self,sh, r4info ):
486    rl = []
487    gpmaps = {}
488    gpmaps['ISMIP6'] = {'icesheetfx':['LIfxgre','LIfxant'], 'icesheetyear':['LIyrgre','LIyrant'], 'icesheetmon':['LImongre','LImonant']}
489    gpmaps['VolMIP'] = {'DYVR_monthly':['DYVR_monthly_a','DYVR_monthly_b','DYVR_monthly_c','DYVR_monthly_d']}
490    for i in range(4,sh.nrows):
491      rr = [x.value for x in sh.row(i)]
492      if not (rr[0] in ['',u''] or rr[0][0] == '#'):
493        if self.mip == 'DCPP':
494          j0 = r4info.ownix[0]
495        else:
496          j0 = r4info.ixcntl
497        if all( [x in {u'','',0,0.0} for x in rr[j0:]] ):
498          if rr[1] != 'none':
499            print 'skipping blank line: ',self.mip,rr
500        elif rr[0] == 'Short name of variable group':
501            print 'skipping title line: ',self.mip,rr
502        elif rr[1] == 'none':
503            print 'skipping non-blank line marked as *none*:',self.mip,rr
504        else:
505          if self.mip in gpmaps and rr[0] in gpmaps[self.mip]:
506            rr0 = rr[0]
507            for x in gpmaps[self.mip][rr0]:
508              rr[0] = x
509              rl.append( rr[:] )
510          else:
511            rl.append( rr )
512    return rl
513
514if __name__ == '__main__':
515  #ts = templateStat()
516  rs = requestScope()
517  logFarm.shutdown() 
Note: See TracBrowser for help on using the repository browser.