f71bfc543ace446d5729dc22776f5c3b3441b02f
[htsworkflow.git] / gaworkflow / pipeline / configure_run.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile
9 from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404
10 from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
11
12 from pyinotify import WatchManager, ThreadedNotifier
13 from pyinotify import EventsCodes, ProcessEvent
14
15 logging.basicConfig(level=logging.DEBUG,
16                     format='%(asctime)s %(levelname)-8s %(message)s',
17                     datefmt='%a, %d %b %Y %H:%M:%S',
18                     filename='pipeline_main.log',
19                     filemode='w')
20
21 class ConfigInfo:
22   
23   def __init__(self):
24     self.run_path = None
25     self.bustard_path = None
26     self.config_filepath = None
27
28
29 ####################################
30 # inotify event processor
31
32 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
33 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
34 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
35
36 class RunEvent(ProcessEvent):
37
38   def __init__(self):
39
40     self.run_status_dict = {'firecrest': False,
41                             'bustard': False,
42                             'gerald': False}
43
44     ProcessEvent.__init__(self)
45
46   def process_IN_CREATE(self, event):
47     fullpath = os.path.join(event.path, event.name)
48     if s_finished.search(fullpath):
49       logging.info("File Found: %s" % (fullpath))
50
51       if s_firecrest_finished.search(fullpath):
52         self.run_status_dict['firecrest'] = True
53       elif s_bustard_finished.search(fullpath):
54         self.run_status_dict['bustard'] = True
55       elif s_gerald_finished.search(fullpath):
56         self.run_status_dict['gerald'] = True
57       
58     print "Create: %s" % (os.path.join(event.path, event.name))
59
60   def process_IN_DELETE(self, event):
61     print "Remove %s" % (os.path.join(event.path, event.name))
62
63 #FLAGS
64 # Config Step Error
65 RUN_ABORT = 'abort'
66 # Run Step Error
67 RUN_FAILED = 'failed'
68
69
70 #####################################
71 # Configure Step (goat_pipeline.py)
72 #Info
73 s_start = re.compile('Starting Genome Analyzer Pipeline')
74 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
75 s_generating = re.compile('Generating journals, Makefiles and parameter files')
76 s_seq_folder = re.compile('^Sequence folder: ')
77 s_seq_folder_sub = re.compile('want to make ')
78 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
79
80 #Errors
81 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
82 s_species_dir_err = re.compile('Error: Lane [1-8]:')
83 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
84
85
86 ##Ignore - Example of out above each ignore regex.
87 #NOTE: Commenting out an ignore will cause it to be
88 # logged as DEBUG with the logging module.
89 #CF_STDERR_IGNORE_LIST = []
90 s_skip = re.compile('s_[0-8]_[0-9]+')
91
92
93 ##########################################
94 # Pipeline Run Step (make -j8 recursive)
95
96 ##Info
97 s_finished = re.compile('finished')
98
99 ##Errors
100 s_make_error = re.compile('^make[\S\s]+Error')
101 s_no_gnuplot = re.compile('gnuplot: command not found')
102 s_no_convert = re.compile('^Can\'t exec "convert"')
103 s_no_ghostscript = re.compile('gs: command not found')
104
105 ##Ignore - Example of out above each ignore regex.
106 #NOTE: Commenting out an ignore will cause it to be
107 # logged as DEBUG with the logging module.
108 #
109 PL_STDERR_IGNORE_LIST = []
110 # Info: PF 11802
111 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
112 # About to analyse intensity file s_4_0101_sig2.txt
113 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
114 # Will send output to standard output
115 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
116 # Found 31877 clusters
117 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
118 # Will use quality criterion ((CHASTITY>=0.6)
119 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
120 # Quality criterion translated to (($F[5]>=0.6))
121 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
122 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
123 #  AND
124 # opened s_4_0103_qhg.txt
125 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
126 # 81129 sequences out of 157651 passed filter criteria
127 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
128
129
130 def pl_stderr_ignore(line):
131   """
132   Searches lines for lines to ignore (i.e. not to log)
133
134   returns True if line should be ignored
135   returns False if line should NOT be ignored
136   """
137   for s in PL_STDERR_IGNORE_LIST:
138     if s.search(line):
139       return True
140   return False
141
142
143 def config_stdout_handler(line, conf_info):
144   """
145   Processes each line of output from GOAT
146   and stores useful information using the logging module
147
148   Loads useful information into conf_info as well, for future
149   use outside the function.
150
151   returns True if found condition that signifies success.
152   """
153
154   # Skip irrelevant line (without logging)
155   if s_skip.search(line):
156     pass
157
158   # Detect invalid command-line arguments
159   elif s_invalid_cmdline.search(line):
160     logging.error("Invalid commandline options!")
161
162   # Detect starting of configuration
163   elif s_start.search(line):
164     logging.info('START: Configuring pipeline')
165
166   # Detect it made it past invalid arguments
167   elif s_gerald.search(line):
168     logging.info('Running make now')
169
170   # Detect that make files have been generated (based on output)
171   elif s_generating.search(line):
172     logging.info('Make files generted')
173     return True
174
175   # Capture run directory
176   elif s_seq_folder.search(line):
177     mo = s_seq_folder_sub.search(line)
178     #Output changed when using --tiles=<tiles>
179     # at least in pipeline v0.3.0b2
180     if mo:
181       firecrest_bustard_gerald_makefile = line[mo.end():]
182       firecrest_bustard_gerald, junk = \
183                                 os.path.split(firecrest_bustard_gerald_makefile)
184       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
185       firecrest, junk = os.path.split(firecrest_bustard)
186
187       conf_info.bustard_path = firecrest_bustard
188       conf_info.run_path = firecrest
189     
190     #Standard output handling
191     else:
192       print 'Sequence line:', line
193       mo = s_seq_folder.search(line)
194       conf_info.bustard_path = line[mo.end():]
195       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
196
197   # Log all other output for debugging purposes
198   else:
199     logging.warning('CONF:?: %s' % (line))
200
201   return False
202
203
204
205 def config_stderr_handler(line, conf_info):
206   """
207   Processes each line of output from GOAT
208   and stores useful information using the logging module
209
210   Loads useful information into conf_info as well, for future
211   use outside the function.
212
213   returns RUN_ABORT upon detecting failure;
214           True on success message;
215           False if neutral message
216             (i.e. doesn't signify failure or success)
217   """
218
219   # Detect invalid species directory error
220   if s_species_dir_err.search(line):
221     logging.error(line)
222     return RUN_ABORT
223   # Detect goat_pipeline.py traceback
224   elif s_goat_traceb.search(line):
225     logging.error("Goat config script died, traceback in debug output")
226     return RUN_ABORT
227   # Detect indication of successful configuration (from stderr; odd, but ok)
228   elif s_stderr_taskcomplete.search(line):
229     logging.info('Configure step successful (from: stderr)')
230     return True
231   # Log all other output as debug output
232   else:
233     logging.debug('CONF:STDERR:?: %s' % (line))
234
235   # Neutral (not failure; nor success)
236   return False
237
238
239 #def pipeline_stdout_handler(line, conf_info):
240 #  """
241 #  Processes each line of output from running the pipeline
242 #  and stores useful information using the logging module
243 #
244 #  Loads useful information into conf_info as well, for future
245 #  use outside the function.
246 #
247 #  returns True if found condition that signifies success.
248 #  """
249 #
250 #  #f.write(line + '\n')
251 #
252 #  return True
253
254
255
256 def pipeline_stderr_handler(line, conf_info):
257   """
258   Processes each line of stderr from pipelien run
259   and stores useful information using the logging module
260
261   ##FIXME: Future feature (doesn't actually do this yet)
262   #Loads useful information into conf_info as well, for future
263   #use outside the function.
264
265   returns RUN_FAILED upon detecting failure;
266           #True on success message; (no clear success state)
267           False if neutral message
268             (i.e. doesn't signify failure or success)
269   """
270
271   if pl_stderr_ignore(line):
272     pass
273   elif s_make_error.search(line):
274     logging.error("make error detected; run failed")
275     return RUN_FAILED
276   elif s_no_gnuplot.search(line):
277     logging.error("gnuplot not found")
278     return RUN_FAILED
279   elif s_no_convert.search(line):
280     logging.error("imagemagick's convert command not found")
281     return RUN_FAILED
282   elif s_no_ghostscript.search(line):
283     logging.error("ghostscript not found")
284     return RUN_FAILED
285   else:
286     logging.debug('PIPE:STDERR:?: %s' % (line))
287
288   return False
289
290
291 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
292   """
293   Gets the config file from server...
294   requires config file in:
295     /etc/ga_frontend/ga_frontend.conf
296    or
297     ~/.ga_frontend.conf
298
299   with:
300   [config_file_server]
301   base_host_url: http://host:port
302
303   return True if successful, False is failure
304   """
305   options = getCombinedOptions()
306
307   if options.url is None:
308     logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \
309                   " missing base_host_url option")
310     return False
311
312   try:
313     saveConfigFile(flowcell, options.url, cfg_filepath)
314     conf_info.config_filepath = cfg_filepath
315   except FlowCellNotFound, e:
316     logging.error(e)
317     return False
318   except WebError404, e:
319     logging.error(e)
320     return False
321   except IOError, e:
322     logging.error(e)
323     return False
324   except Exception, e:
325     logging.error(e)
326     return False
327
328   f = open(cfg_filepath, 'r')
329   data = f.read()
330   f.close()
331
332   genome_dict = getAvailableGenomes(genome_dir)
333   mapper_dict = constructMapperDict(genome_dict)
334
335   f = open(cfg_filepath, 'w')
336   f.write(data % (mapper_dict))
337   f.close()
338   
339   return True  
340   
341
342
343 def configure(conf_info):
344   """
345   Attempts to configure the GA pipeline using goat.
346
347   Uses logging module to store information about status.
348
349   returns True if configuration successful, otherwise False.
350   """
351   #ERROR Test:
352   #pipe = subprocess.Popen(['goat_pipeline.py',
353   #                         '--GERALD=config32bk.txt',
354   #                         '--make .',],
355   #                         #'.'],
356   #                        stdout=subprocess.PIPE,
357   #                        stderr=subprocess.PIPE)
358
359   #ERROR Test (2), causes goat_pipeline.py traceback
360   #pipe = subprocess.Popen(['goat_pipeline.py',
361   #                  '--GERALD=%s' % (conf_info.config_filepath),
362   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
363   #                         '--make',
364   #                         '.'],
365   #                        stdout=subprocess.PIPE,
366   #                        stderr=subprocess.PIPE)
367
368   ##########################
369   # Run configuration step
370   #   Not a test; actual configure attempt.
371   #pipe = subprocess.Popen(['goat_pipeline.py',
372   #                  '--GERALD=%s' % (conf_info.config_filepath),
373   #                         '--make',
374   #                         '.'],
375   #                        stdout=subprocess.PIPE,
376   #                        stderr=subprocess.PIPE)
377
378   # CONTINUE HERE
379   #FIXME: this only does a run on 5 tiles on lane 4
380   pipe = subprocess.Popen(['goat_pipeline.py',
381                     '--GERALD=%s' % (conf_info.config_filepath),
382                            '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
383                            '--make',
384                            '.'],
385                           stdout=subprocess.PIPE,
386                           stderr=subprocess.PIPE)
387   ##################
388   # Process stdout
389   stdout_line = pipe.stdout.readline()
390
391   complete = False
392   while stdout_line != '':
393     # Handle stdout
394     if config_stdout_handler(stdout_line, conf_info):
395       complete = True
396     stdout_line = pipe.stdout.readline()
397
398
399   error_code = pipe.wait()
400   if error_code:
401     logging.error('Recieved error_code: %s' % (error_code))
402   else:
403     logging.info('We are go for launch!')
404
405   #Process stderr
406   stderr_line = pipe.stderr.readline()
407
408   abort = 'NO!'
409   stderr_success = False
410   while stderr_line != '':
411     stderr_status = config_stderr_handler(stderr_line, conf_info)
412     if stderr_status == RUN_ABORT:
413       abort = RUN_ABORT
414     elif stderr_status is True:
415       stderr_success = True
416     stderr_line = pipe.stderr.readline()
417
418
419   #Success requirements:
420   # 1) The stdout completed without error
421   # 2) The program exited with status 0
422   # 3) No errors found in stdout
423   print '#Expect: True, False, True, True'
424   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
425   status = complete is True and \
426            bool(error_code) is False and \
427            abort != RUN_ABORT and \
428            stderr_success is True
429
430   # If everything was successful, but for some reason
431   #  we didn't retrieve the path info, log it.
432   if status is True:
433     if conf_info.bustard_path is None or conf_info.run_path is None:
434       logging.error("Failed to retrieve run_path")
435       return False
436   
437   return status
438
439
440 def run_pipeline(conf_info):
441   """
442   Run the pipeline and monitor status.
443   """
444   # Fail if the run_path doesn't actually exist
445   if not os.path.exists(conf_info.run_path):
446     logging.error('Run path does not exist: %s' \
447               % (conf_info.run_path))
448     return False
449
450   # Change cwd to run_path
451   os.chdir(conf_info.run_path)
452   stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
453   stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
454
455   # Monitor file creation
456   wm = WatchManager()
457   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
458   event = RunEvent()
459   notifier = ThreadedNotifier(wm, event)
460   notifier.start()
461   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
462
463   # Log pipeline starting
464   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
465   
466   # Start the pipeline (and hide!)
467   #pipe = subprocess.Popen(['make',
468   #                         '-j8',
469   #                         'recursive'],
470   #                        stdout=subprocess.PIPE,
471   #                        stderr=subprocess.PIPE)
472
473   fout = open(stdout_filepath, 'w')
474   ferr = open(stderr_filepath, 'w')
475
476   pipe = subprocess.Popen(['make',
477                              '-j8',
478                              'recursive'],
479                              stdout=fout,
480                              stderr=ferr)
481                              #shell=True)
482   # Wait for run to finish
483   retcode = pipe.wait()
484
485
486   # Clean up
487   notifier.stop()
488   fout.close()
489   ferr.close()
490
491   # Process stderr
492   ferr = open(stderr_filepath, 'r')
493
494   run_failed_stderr = False
495   for line in ferr:
496     err_status = pipeline_stderr_handler(line, conf_info)
497     if err_status == RUN_FAILED:
498       run_failed_stderr = True
499
500   ferr.close()
501
502   # Finished file check!
503   print 'RUN SUCCESS CHECK:'
504   for key, value in event.run_status_dict.items():
505     print '  %s: %s' % (key, value)
506
507   dstatus = event.run_status_dict
508
509   # Success or failure check
510   status = (retcode == 0) and \
511            run_failed_stderr is False and \
512            dstatus['firecrest'] is True and \
513            dstatus['bustard'] is True and \
514            dstatus['gerald'] is True
515
516   return status
517
518