GMTB Workflow Documentation
hwrf_pull_inputs.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 import os, sys, logging, getopt, re, StringIO
4 import produtil.sigsafety, produtil.fileop
5 import produtil.run, produtil.setup
6 import hwrf.numerics, hwrf.launcher, hwrf.input
7 
8 from hwrf.numerics import to_datetime_rel
9 from produtil.run import exe
10 from hwrf.input import InputSource, DataCatalog
11 
12 
13 global conf_input_list
14 ## @var inputscript
15 # The script from which the input list is read.
16 inputscript=None
17 
18 ## @var outputdir
19 # The output directory where data will reside
20 outputdir=None
21 
22 ## @var cycles
23 # The list of YYYYMMDDHH cycles to pull
24 cycles=None
25 
26 ## @var HOMEhwrf
27 # Parent directory of HWRF's parm/ and ush/
28 #HOMEhwrf=None
29 
30 ## @var WORKhwrf
31 # Scrub area, the WORKhwrf variable in the [config] section of conf
32 WORKhwrf=None
33 
34 ## @var logger
35 # The logging.Logger object for logging messages.
36 logger=None
37 
38 ## @var conf_input_list
39 # A list of input files, relative to parm, to read in when making conf
40 conf_input_list = [
41  'hwrf_input.conf', 'hwrf.conf', 'hwrf_holdvars.conf',
42  'hwrf_basic.conf', 'system.conf' ]
43 
44 # def setup():
45 # """!Sets up the used parts of the produtil package. This does NOT
46 # call produtil.setup.setup(). Instead, it installs the signal
47 # handlers and sets up the Python logging module directly. No other
48 # parts of the produtil package are initialized."""
49 # # Install SIGTERM, SIGINT, etc. handlers. Needed to handle batch job
50 # # kills correctly.
51 # global logger
52 # produtil.sigsafety.install_handlers()
53 # global handler, logger
54 # # Set up logging. We customize things to look nice.
55 # handler=logging.StreamHandler(sys.stderr)
56 # handler.setFormatter(logging.Formatter(
57 # 'pull-inputs:%(levelname)s:%(asctime)s: %(message)s',
58 # '%Y%m%d.%H%M%S'))
59 # logger=logging.getLogger()
60 # logger.setLevel(logging.INFO)
61 # logger.addHandler(handler)
62 
64  global logger, outputdir, WORKhwrf, HOMEhwrf, cycles, inputscript
65  cycles = set()
66  try:
67  (optlist,args) = getopt.getopt(sys.argv[1:],"o:w:vi:")
68  for opt,val in optlist:
69  logger.info("OPTLIST: ",optlist)
70  if opt=='-v':
71  logger.setLevel(logging.DEBUG)
72  logger.debug('Verbosity enabled.')
73  elif opt=='-w':
74  logger.info('Work area set to %s'%(val,))
75  WORKhwrf=val
76  elif opt=='-o':
77  logger.info('Output area set to %s'%(val,))
78  outputdir=val
79  elif opt=='-i':
80  logger.info('Input script set to %s'%(val,))
81  inputscript=val
82  else:
83  usage('Invalid option %s'%(opt,))
84  except (getopt.GetoptError,ValueError,TypeError) as e:
85  logger.info("FAILED!")
86  usage(str(e))
87  sys.exit(1)
88  HOMEhwrf=os.environ.get('HOMEhwrf','')
89  if not HOMEhwrf:
90  HOMEhwrf=os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
91  cwd=os.getcwd()
92  if not outputdir:
93  outputdir=os.path.join(cwd,'hwrfdata')
94  if not WORKhwrf:
95  WORKhwrf=os.path.join(cwd,'input-temp')
96  if inputscript is None:
97  if len(args)<2:
98  usage('Input script and one cycle must be specified.')
99  inputscript=args[0]
100  cycles.add(args[1])
101  elif len(args)<1:
102  usage('At least one cycle must be specified.')
103  else:
104  cycles=args
105 
106  if not os.path.exists(WORKhwrf): os.makedirs(WORKhwrf)
107  os.chdir(WORKhwrf)
108  logger.info('inputscript is %s'%(inputscript,))
109  logger.info('cycles: '+str(cycles))
110 
111 def make_conf(cycle):
112  conf=hwrf.launcher.HWRFLauncher()
113  conf.set('config','cycle',cycle)
114  conf.cycle=cycle
115  assert(conf.cycle is not None)
116  for basename in conf_input_list:
117  print "CONF_INPUT_LIST ", conf_input_list,basename
118  print "HOMEhwrf", HOMEhwrf
119  fullname=os.path.join(HOMEhwrf,'parm',basename)
120  if not os.path.exists(fullname):
121  logger.error('%s: does not exist. HWRF is not installed in %s'%(
122  fullname,HOMEhwrf))
123  sys.exit(1)
124  conf.read(fullname)
125  conf.set('holdvars','CASE_ROOT','HISTORY')
126  conf.set('hwrfdata','inputroot',outputdir)
127  conf.set('config','case_root','HISTORY')
128  conf.set('config','fcsthist','hist')
129  conf.set('config','realtime','false')
130  conf.guess_default_values()
131  assert(conf.cycle is not None)
132  conf.set('dir','HOMEhwrf',HOMEhwrf)
133  conf.set('dir','WORKhwrf',WORKhwrf)
134  conf.set('dir','com',WORKhwrf)
135  return conf
136 
137 def expand_lists(inputs,stack,names,lists):
138  if not names:
139  # We have finished iterating.
140  inputs.append(stack[-1])
141  return
142  # Iterate into the next list, expanding all items.
143  for item in lists[0]:
144  addme=dict(stack[-1])
145  addme[names[0]]=item
146  stack.append(addme)
147  expand_lists(inputs,stack,names[1:],lists[1:])
148 
149 def pull_command(inputs,dataset,atime,lists,args):
150  assert(isinstance(lists,dict))
151  pull=dict()
152  listflags=set()
153  ftimes=list()
154  pull.update(dataset=dataset,atime=atime,item=args[0])
155  for arg in args[1:]:
156  if arg=='optional':
157  pull['optional']=True
158  elif len(arg)>1 and arg[0]=='+':
159  if arg[1:] not in lists:
160  raise Exception('%s: list not in %s'%(arg[1:],lists))
161  listflags.add(arg[1:])
162  elif '0123456789'.find(arg[0])>=0:
163  parts=arg.split('..')
164  if len(parts)==1:
165  fhr=int(parts[0])
166  sec=fhr*3600
167  ftimes.append(to_datetime_rel(sec ,atime))
168  elif len(parts)==2:
169  start_fhr=int(parts[0])
170  end_fhr=int(parts[1])
171  for hr in xrange(start_fhr,end_fhr+1):
172  sec=hr*3600
173  ftimes.append(to_datetime_rel(sec,atime))
174  elif len(parts)>2:
175  start_fhr=int(parts[0])
176  end_fhr=int(parts[2])
177  step_fhr=int(parts[1])
178  for hr in xrange(start_fhr,end_fhr+1,step_fhr):
179  sec=hr*3600
180  ftimes.append(to_datetime_rel(sec,atime))
181  else:
182  raise Exception('Unrecognized argument %s'%(arg,))
183  if not ftimes:
184  ftimes.append(atime)
185  listnames = [ listname for listname in listflags ]
186  listlists = [ lists[listname] for listname in listnames ]
187  for ftime in ftimes:
188  addme=dict(pull)
189  addme.update(ftime=ftime)
190  stack=list()
191  stack.append(addme)
192  expand_lists(inputs,stack,listnames,listlists)
193 
194 def make_input_list(conf,inputstream):
195  dataset=None
196  cycle=conf.cycle
197  atime=cycle
198  lists=dict()
199  inputs=list()
200  for line in inputstream:
201  stripped=line.strip()
202  if stripped[0:1]=='#': continue
203  parts=stripped.split()
204  if len(parts)<1: continue
205  cmd=parts[0].upper()
206  if cmd=='DATASET' and len(parts)>1:
207  #print 'DATASET',parts[1:]
208  dataset=parts[1]
209  elif cmd=='ATIME' and len(parts)==2:
210  rel=int(parts[1])
211  atime=to_datetime_rel(rel*3600,cycle)
212  elif cmd=='ATIME' and len(parts)==1:
213  atime=cycle
214  elif ( cmd=='FILL' or cmd=='FILLINT') and len(parts)==4:
215  nums=parts[3].split('..')
216  listname=parts[1]
217  format=parts[2]
218  if len(nums)==1:
219  lists[listname]=(format%int(nums[0]))
220  elif len(nums)==2:
221  lists[listname] = \
222  [str(format)%int(num) \
223  for num in xrange(int(nums[0]),int(nums[1])+1)]
224  elif len(nums)>=3:
225  lists[listname] = \
226  [str(format)%int(num) \
227  for num in xrange(int(nums[0]),int(nums[2])+1,int(nums[1]))]
228  if cmd=='FILLINT':
229  old=lists[listname]
230  lists[listname]=[ int(O) for O in old ]
231  elif cmd=='SET' and len(parts)>2:
232  #print 'SET',parts[1:]
233  lists[parts[1]]=parts[2:]
234  elif cmd=='SETINT' and len(parts)>2:
235  #print 'SET',parts[1:]
236  lists[parts[1]]=[ int(part) for part in parts[2:] ]
237  elif cmd=='PULL' and len(parts)>1:
238  #print 'PULL',parts[1:]
239  pull_command(inputs,dataset,atime,lists,parts[1:])
240  else:
241  pass #print 'IGNORE',cmd,parts[1:]
242  return inputs
243 
244 def dump_list(inputs):
245  logger.info('List of inputs to pull:')
246  for i in inputs:
247  s=' '.join([ '%s=%s'%(k,v) for k,v in i.iteritems() ])
248  logger.info(s)
249 
250 def transfer_inputs(inputs,conf,cycle):
251  hwrfdata=DataCatalog(conf,'hwrfdata',cycle)
252  htar=exe(conf.getexe('htar'))
253  hsi=exe(conf.getexe('hsi'))
254  insec=conf.getstr('config','input_sources')
255  ins=InputSource(conf,insec,cycle,htar,logger,hsi)
256  return bool(ins.get(inputs,hwrfdata,False,logger,True))
257 
258 def main():
259  global logger
260  produtil.setup.setup(send_dbn=False,thread_logger=True)
261  logger=logging.getLogger('pull_inputs')
263  okay=True
264  for cycle in cycles:
265  conf=make_conf(cycle)
266  with open(inputscript,'rt') as inf:
267  inputs=make_input_list(conf,inf)
268  dump_list(inputs)
269  okay=okay and transfer_inputs(inputs,conf,cycle)
270  if not okay:
271  sys.exit(1)
272 
273 if __name__=='__main__':
274  main()
def expand_lists(inputs, stack, names, lists)
def pull_command(inputs, dataset, atime, lists, args)
def dump_list(inputs)
def make_input_list(conf, inputstream)
def transfer_inputs(inputs, conf, cycle)