add additional debugging logging to retrieve_config and configure_pipeline
[htsworkflow.git] / gaworkflow / pipeline / configure_run.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile
9 from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404
10 from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
11 from gaworkflow.pipeline.run_status import GARunStatus
12
13 from pyinotify import WatchManager, ThreadedNotifier
14 from pyinotify import EventsCodes, ProcessEvent
15
16 class ConfigInfo:
17   
18   def __init__(self):
19     #run_path = firecrest analysis directory to run analysis from
20     self.run_path = None
21     self.bustard_path = None
22     self.config_filepath = None
23     self.status = None
24
25     #top level directory where all analyses are placed
26     self.base_analysis_dir = None
27     #analysis_dir, top level analysis dir...
28     # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
29     self.analysis_dir = None
30
31
32   def createStatusObject(self):
33     """
34     Creates a status object which can be queried for
35     status of running the pipeline
36
37     returns True if object created
38     returns False if object cannot be created
39     """
40     if self.config_filepath is None:
41       return False
42
43     self.status = GARunStatus(self.config_filepath)
44     return True
45
46
47
48 ####################################
49 # inotify event processor
50
51 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
52 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
53 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
54
55 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
56 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
57 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
58
59 class RunEvent(ProcessEvent):
60
61   def __init__(self, conf_info):
62
63     self.run_status_dict = {'firecrest': False,
64                             'bustard': False,
65                             'gerald': False}
66
67     self._ci = conf_info
68
69     ProcessEvent.__init__(self)
70     
71
72   def process_IN_CREATE(self, event):
73     fullpath = os.path.join(event.path, event.name)
74     if s_finished.search(fullpath):
75       logging.info("File Found: %s" % (fullpath))
76
77       if s_firecrest_finished.search(fullpath):
78         self.run_status_dict['firecrest'] = True
79         self._ci.status.updateFirecrest(event.name)
80       elif s_bustard_finished.search(fullpath):
81         self.run_status_dict['bustard'] = True
82         self._ci.status.updateBustard(event.name)
83       elif s_gerald_finished.search(fullpath):
84         self.run_status_dict['gerald'] = True
85         self._ci.status.updateGerald(event.name)
86
87     #WARNING: The following order is important!!
88     # Firecrest regex will catch all gerald, bustard, and firecrest
89     # Bustard regex will catch all gerald and bustard
90     # Gerald regex will catch all gerald
91     # So, order needs to be Gerald, Bustard, Firecrest, or this
92     #  won't work properly.
93     elif s_gerald_all.search(fullpath):
94       self._ci.status.updateGerald(event.name)
95     elif s_bustard_all.search(fullpath):
96       self._ci.status.updateBustard(event.name)
97     elif s_firecrest_all.search(fullpath):
98       self._ci.status.updateFirecrest(event.name)
99       
100     #print "Create: %s" % (os.path.join(event.path, event.name))
101
102   def process_IN_DELETE(self, event):
103     #print "Remove %s" % (os.path.join(event.path, event.name))
104     pass
105
106
107
108
109 #FLAGS
110 # Config Step Error
111 RUN_ABORT = 'abort'
112 # Run Step Error
113 RUN_FAILED = 'failed'
114
115
116 #####################################
117 # Configure Step (goat_pipeline.py)
118 #Info
119 s_start = re.compile('Starting Genome Analyzer Pipeline')
120 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
121 s_generating = re.compile('^Generating journals, Makefiles')
122 s_seq_folder = re.compile('^Sequence folder: ')
123 s_seq_folder_sub = re.compile('want to make ')
124 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
125
126 #Errors
127 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
128 s_species_dir_err = re.compile('Error: Lane [1-8]:')
129 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
130 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
131
132 SUPPRESS_MISSING_CYCLES = False
133
134
135 ##Ignore - Example of out above each ignore regex.
136 #NOTE: Commenting out an ignore will cause it to be
137 # logged as DEBUG with the logging module.
138 #CF_STDERR_IGNORE_LIST = []
139 s_skip = re.compile('s_[0-8]_[0-9]+')
140
141
142 ##########################################
143 # Pipeline Run Step (make -j8 recursive)
144
145 ##Info
146 s_finished = re.compile('finished')
147
148 ##Errors
149 s_make_error = re.compile('^make[\S\s]+Error')
150 s_no_gnuplot = re.compile('gnuplot: command not found')
151 s_no_convert = re.compile('^Can\'t exec "convert"')
152 s_no_ghostscript = re.compile('gs: command not found')
153
154 ##Ignore - Example of out above each ignore regex.
155 #NOTE: Commenting out an ignore will cause it to be
156 # logged as DEBUG with the logging module.
157 #
158 PL_STDERR_IGNORE_LIST = []
159 # Info: PF 11802
160 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
161 # About to analyse intensity file s_4_0101_sig2.txt
162 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
163 # Will send output to standard output
164 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
165 # Found 31877 clusters
166 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
167 # Will use quality criterion ((CHASTITY>=0.6)
168 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
169 # Quality criterion translated to (($F[5]>=0.6))
170 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
171 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
172 #  AND
173 # opened s_4_0103_qhg.txt
174 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
175 # 81129 sequences out of 157651 passed filter criteria
176 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
177
178
179 def pl_stderr_ignore(line):
180   """
181   Searches lines for lines to ignore (i.e. not to log)
182
183   returns True if line should be ignored
184   returns False if line should NOT be ignored
185   """
186   for s in PL_STDERR_IGNORE_LIST:
187     if s.search(line):
188       return True
189   return False
190
191
192 def config_stdout_handler(line, conf_info):
193   """
194   Processes each line of output from GOAT
195   and stores useful information using the logging module
196
197   Loads useful information into conf_info as well, for future
198   use outside the function.
199
200   returns True if found condition that signifies success.
201   """
202
203   # Skip irrelevant line (without logging)
204   if s_skip.search(line):
205     pass
206
207   # Detect invalid command-line arguments
208   elif s_invalid_cmdline.search(line):
209     logging.error("Invalid commandline options!")
210
211   # Detect starting of configuration
212   elif s_start.search(line):
213     logging.info('START: Configuring pipeline')
214
215   # Detect it made it past invalid arguments
216   elif s_gerald.search(line):
217     logging.info('Running make now')
218
219   # Detect that make files have been generated (based on output)
220   elif s_generating.search(line):
221     logging.info('Make files generted')
222     return True
223
224   # Capture run directory
225   elif s_seq_folder.search(line):
226     mo = s_seq_folder_sub.search(line)
227     #Output changed when using --tiles=<tiles>
228     # at least in pipeline v0.3.0b2
229     if mo:
230       firecrest_bustard_gerald_makefile = line[mo.end():]
231       firecrest_bustard_gerald, junk = \
232                                 os.path.split(firecrest_bustard_gerald_makefile)
233       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
234       firecrest, junk = os.path.split(firecrest_bustard)
235
236       conf_info.bustard_path = firecrest_bustard
237       conf_info.run_path = firecrest
238     
239     #Standard output handling
240     else:
241       print 'Sequence line:', line
242       mo = s_seq_folder.search(line)
243       conf_info.bustard_path = line[mo.end():]
244       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
245
246   # Log all other output for debugging purposes
247   else:
248     logging.warning('CONF:?: %s' % (line))
249
250   return False
251
252
253
254 def config_stderr_handler(line, conf_info):
255   """
256   Processes each line of output from GOAT
257   and stores useful information using the logging module
258
259   Loads useful information into conf_info as well, for future
260   use outside the function.
261
262   returns RUN_ABORT upon detecting failure;
263           True on success message;
264           False if neutral message
265             (i.e. doesn't signify failure or success)
266   """
267   global SUPPRESS_MISSING_CYCLES
268
269   # Detect invalid species directory error
270   if s_species_dir_err.search(line):
271     logging.error(line)
272     return RUN_ABORT
273   # Detect goat_pipeline.py traceback
274   elif s_goat_traceb.search(line):
275     logging.error("Goat config script died, traceback in debug output")
276     return RUN_ABORT
277   # Detect indication of successful configuration (from stderr; odd, but ok)
278   elif s_stderr_taskcomplete.search(line):
279     logging.info('Configure step successful (from: stderr)')
280     return True
281   # Detect missing cycles
282   elif s_missing_cycles.search(line):
283
284     # Only display error once
285     if not SUPPRESS_MISSING_CYCLES:
286       logging.error("Missing cycles detected; Not all cycles copied?")
287       logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
288       SUPPRESS_MISSING_CYCLES = True
289     return RUN_ABORT
290   
291   # Log all other output as debug output
292   else:
293     logging.debug('CONF:STDERR:?: %s' % (line))
294
295   # Neutral (not failure; nor success)
296   return False
297
298
299 #def pipeline_stdout_handler(line, conf_info):
300 #  """
301 #  Processes each line of output from running the pipeline
302 #  and stores useful information using the logging module
303 #
304 #  Loads useful information into conf_info as well, for future
305 #  use outside the function.
306 #
307 #  returns True if found condition that signifies success.
308 #  """
309 #
310 #  #f.write(line + '\n')
311 #
312 #  return True
313
314
315
316 def pipeline_stderr_handler(line, conf_info):
317   """
318   Processes each line of stderr from pipelien run
319   and stores useful information using the logging module
320
321   ##FIXME: Future feature (doesn't actually do this yet)
322   #Loads useful information into conf_info as well, for future
323   #use outside the function.
324
325   returns RUN_FAILED upon detecting failure;
326           #True on success message; (no clear success state)
327           False if neutral message
328             (i.e. doesn't signify failure or success)
329   """
330
331   if pl_stderr_ignore(line):
332     pass
333   elif s_make_error.search(line):
334     logging.error("make error detected; run failed")
335     return RUN_FAILED
336   elif s_no_gnuplot.search(line):
337     logging.error("gnuplot not found")
338     return RUN_FAILED
339   elif s_no_convert.search(line):
340     logging.error("imagemagick's convert command not found")
341     return RUN_FAILED
342   elif s_no_ghostscript.search(line):
343     logging.error("ghostscript not found")
344     return RUN_FAILED
345   else:
346     logging.debug('PIPE:STDERR:?: %s' % (line))
347
348   return False
349
350
351 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
352   """
353   Gets the config file from server...
354   requires config file in:
355     /etc/ga_frontend/ga_frontend.conf
356    or
357     ~/.ga_frontend.conf
358
359   with:
360   [config_file_server]
361   base_host_url: http://host:port
362
363   return True if successful, False is failure
364   """
365   options = getCombinedOptions()
366
367   if options.url is None:
368     logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \
369                   " missing base_host_url option")
370     return False
371
372   try:
373     saveConfigFile(flowcell, options.url, cfg_filepath)
374     conf_info.config_filepath = cfg_filepath
375   except FlowCellNotFound, e:
376     logging.error(e)
377     return False
378   except WebError404, e:
379     logging.error(e)
380     return False
381   except IOError, e:
382     logging.error(e)
383     return False
384   except Exception, e:
385     logging.error(e)
386     return False
387
388   f = open(cfg_filepath, 'r')
389   data = f.read()
390   f.close()
391
392   genome_dict = getAvailableGenomes(genome_dir)
393   mapper_dict = constructMapperDict(genome_dict)
394
395   logging.debug(data)
396
397   f = open(cfg_filepath, 'w')
398   f.write(data % (mapper_dict))
399   f.close()
400   
401   return True
402   
403
404
405 def configure(conf_info):
406   """
407   Attempts to configure the GA pipeline using goat.
408
409   Uses logging module to store information about status.
410
411   returns True if configuration successful, otherwise False.
412   """
413   #ERROR Test:
414   #pipe = subprocess.Popen(['goat_pipeline.py',
415   #                         '--GERALD=config32bk.txt',
416   #                         '--make .',],
417   #                         #'.'],
418   #                        stdout=subprocess.PIPE,
419   #                        stderr=subprocess.PIPE)
420
421   #ERROR Test (2), causes goat_pipeline.py traceback
422   #pipe = subprocess.Popen(['goat_pipeline.py',
423   #                  '--GERALD=%s' % (conf_info.config_filepath),
424   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
425   #                         '--make',
426   #                         '.'],
427   #                        stdout=subprocess.PIPE,
428   #                        stderr=subprocess.PIPE)
429
430   ##########################
431   # Run configuration step
432   #   Not a test; actual configure attempt.
433   #pipe = subprocess.Popen(['goat_pipeline.py',
434   #                  '--GERALD=%s' % (conf_info.config_filepath),
435   #                         '--make',
436   #                         '.'],
437   #                        stdout=subprocess.PIPE,
438   #                        stderr=subprocess.PIPE)
439
440
441   stdout_filepath = os.path.join(conf_info.analysis_dir,
442                                  "pipeline_configure_stdout.txt")
443   stderr_filepath = os.path.join(conf_info.analysis_dir,
444                                  "pipeline_configure_stderr.txt")
445
446   fout = open(stdout_filepath, 'w')
447   ferr = open(stderr_filepath, 'w')
448   
449   pipe = subprocess.Popen(['goat_pipeline.py',
450                     '--GERALD=%s' % (conf_info.config_filepath),
451                            #'--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
452                            '--make',
453                            conf_info.analysis_dir],
454                           stdout=fout,
455                           stderr=ferr)
456
457   print "Configuring pipeline: %s" % (time.ctime())
458   error_code = pipe.wait()
459
460   # Clean up
461   fout.close()
462   ferr.close()
463   
464   
465   ##################
466   # Process stdout
467   fout = open(stdout_filepath, 'r')
468   
469   stdout_line = fout.readline()
470
471   complete = False
472   while stdout_line != '':
473     # Handle stdout
474     if config_stdout_handler(stdout_line, conf_info):
475       complete = True
476     stdout_line = fout.readline()
477
478   fout.close()
479
480
481   #error_code = pipe.wait()
482   if error_code:
483     logging.error('Recieved error_code: %s' % (error_code))
484   else:
485     logging.info('We are go for launch!')
486
487   #Process stderr
488   ferr = open(stderr_filepath, 'r')
489   stderr_line = ferr.readline()
490
491   abort = 'NO!'
492   stderr_success = False
493   while stderr_line != '':
494     stderr_status = config_stderr_handler(stderr_line, conf_info)
495     if stderr_status == RUN_ABORT:
496       abort = RUN_ABORT
497     elif stderr_status is True:
498       stderr_success = True
499     stderr_line = ferr.readline()
500
501   ferr.close()
502
503
504   #Success requirements:
505   # 1) The stdout completed without error
506   # 2) The program exited with status 0
507   # 3) No errors found in stdout
508   print '#Expect: True, False, True, True'
509   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
510   status = complete is True and \
511            bool(error_code) is False and \
512            abort != RUN_ABORT and \
513            stderr_success is True
514
515   # If everything was successful, but for some reason
516   #  we didn't retrieve the path info, log it.
517   if status is True:
518     if conf_info.bustard_path is None or conf_info.run_path is None:
519       logging.error("Failed to retrieve run_path")
520       return False
521   
522   return status
523
524
525 def run_pipeline(conf_info):
526   """
527   Run the pipeline and monitor status.
528   """
529   # Fail if the run_path doesn't actually exist
530   if not os.path.exists(conf_info.run_path):
531     logging.error('Run path does not exist: %s' \
532               % (conf_info.run_path))
533     return False
534
535   # Change cwd to run_path
536   stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
537   stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
538
539   # Create status object
540   conf_info.createStatusObject()
541
542   # Monitor file creation
543   wm = WatchManager()
544   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
545   event = RunEvent(conf_info)
546   notifier = ThreadedNotifier(wm, event)
547   notifier.start()
548   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
549
550   # Log pipeline starting
551   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
552   
553   # Start the pipeline (and hide!)
554   #pipe = subprocess.Popen(['make',
555   #                         '-j8',
556   #                         'recursive'],
557   #                        stdout=subprocess.PIPE,
558   #                        stderr=subprocess.PIPE)
559
560   fout = open(stdout_filepath, 'w')
561   ferr = open(stderr_filepath, 'w')
562
563   pipe = subprocess.Popen(['make',
564                            '--directory=%s' % (conf_info.run_path),
565                            '-j8',
566                            'recursive'],
567                            stdout=fout,
568                            stderr=ferr)
569                            #shell=True)
570   # Wait for run to finish
571   retcode = pipe.wait()
572
573
574   # Clean up
575   notifier.stop()
576   fout.close()
577   ferr.close()
578
579   # Process stderr
580   ferr = open(stderr_filepath, 'r')
581
582   run_failed_stderr = False
583   for line in ferr:
584     err_status = pipeline_stderr_handler(line, conf_info)
585     if err_status == RUN_FAILED:
586       run_failed_stderr = True
587
588   ferr.close()
589
590   # Finished file check!
591   print 'RUN SUCCESS CHECK:'
592   for key, value in event.run_status_dict.items():
593     print '  %s: %s' % (key, value)
594
595   dstatus = event.run_status_dict
596
597   # Success or failure check
598   status = (retcode == 0) and \
599            run_failed_stderr is False and \
600            dstatus['firecrest'] is True and \
601            dstatus['bustard'] is True and \
602            dstatus['gerald'] is True
603
604   return status
605
606