Update to not hard code the config file name and the error message
[htsworkflow.git] / htsworkflow / pipelines / configure_run.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from htsworkflow.pipelines.retrieve_config import \
9      CONFIG_SYSTEM, CONFIG_USER, \
10      FlowCellNotFound, getCombinedOptions, saveConfigFile, WebError404
11 from htsworkflow.pipelines.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
12 from htsworkflow.pipelines.run_status import GARunStatus
13
14 from pyinotify import WatchManager, ThreadedNotifier
15 from pyinotify import EventsCodes, ProcessEvent
16
17 class ConfigInfo:
18   
19   def __init__(self):
20     #run_path = firecrest analysis directory to run analysis from
21     self.run_path = None
22     self.bustard_path = None
23     self.config_filepath = None
24     self.status = None
25
26     #top level directory where all analyses are placed
27     self.base_analysis_dir = None
28     #analysis_dir, top level analysis dir...
29     # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
30     self.analysis_dir = None
31
32
33   def createStatusObject(self):
34     """
35     Creates a status object which can be queried for
36     status of running the pipeline
37
38     returns True if object created
39     returns False if object cannot be created
40     """
41     if self.config_filepath is None:
42       return False
43
44     self.status = GARunStatus(self.config_filepath)
45     return True
46
47
48
49 ####################################
50 # inotify event processor
51
52 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
53 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
54 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
55
56 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
57 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
58 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
59
60 class RunEvent(ProcessEvent):
61
62   def __init__(self, conf_info):
63
64     self.run_status_dict = {'firecrest': False,
65                             'bustard': False,
66                             'gerald': False}
67
68     self._ci = conf_info
69
70     ProcessEvent.__init__(self)
71     
72
73   def process_IN_CREATE(self, event):
74     fullpath = os.path.join(event.path, event.name)
75     if s_finished.search(fullpath):
76       logging.info("File Found: %s" % (fullpath))
77
78       if s_firecrest_finished.search(fullpath):
79         self.run_status_dict['firecrest'] = True
80         self._ci.status.updateFirecrest(event.name)
81       elif s_bustard_finished.search(fullpath):
82         self.run_status_dict['bustard'] = True
83         self._ci.status.updateBustard(event.name)
84       elif s_gerald_finished.search(fullpath):
85         self.run_status_dict['gerald'] = True
86         self._ci.status.updateGerald(event.name)
87
88     #WARNING: The following order is important!!
89     # Firecrest regex will catch all gerald, bustard, and firecrest
90     # Bustard regex will catch all gerald and bustard
91     # Gerald regex will catch all gerald
92     # So, order needs to be Gerald, Bustard, Firecrest, or this
93     #  won't work properly.
94     elif s_gerald_all.search(fullpath):
95       self._ci.status.updateGerald(event.name)
96     elif s_bustard_all.search(fullpath):
97       self._ci.status.updateBustard(event.name)
98     elif s_firecrest_all.search(fullpath):
99       self._ci.status.updateFirecrest(event.name)
100       
101     #print "Create: %s" % (os.path.join(event.path, event.name))
102
103   def process_IN_DELETE(self, event):
104     #print "Remove %s" % (os.path.join(event.path, event.name))
105     pass
106
107
108
109
110 #FLAGS
111 # Config Step Error
112 RUN_ABORT = 'abort'
113 # Run Step Error
114 RUN_FAILED = 'failed'
115
116
117 #####################################
118 # Configure Step (goat_pipeline.py)
119 #Info
120 s_start = re.compile('Starting Genome Analyzer Pipeline')
121 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
122 s_generating = re.compile('^Generating journals, Makefiles')
123 s_seq_folder = re.compile('^Sequence folder: ')
124 s_seq_folder_sub = re.compile('want to make ')
125 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
126
127 #Errors
128 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
129 s_species_dir_err = re.compile('Error: Lane [1-8]:')
130 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
131 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
132
133 SUPPRESS_MISSING_CYCLES = False
134
135
136 ##Ignore - Example of out above each ignore regex.
137 #NOTE: Commenting out an ignore will cause it to be
138 # logged as DEBUG with the logging module.
139 #CF_STDERR_IGNORE_LIST = []
140 s_skip = re.compile('s_[0-8]_[0-9]+')
141
142
143 ##########################################
144 # Pipeline Run Step (make -j8 recursive)
145
146 ##Info
147 s_finished = re.compile('finished')
148
149 ##Errors
150 s_make_error = re.compile('^make[\S\s]+Error')
151 s_no_gnuplot = re.compile('gnuplot: command not found')
152 s_no_convert = re.compile('^Can\'t exec "convert"')
153 s_no_ghostscript = re.compile('gs: command not found')
154
155 ##Ignore - Example of out above each ignore regex.
156 #NOTE: Commenting out an ignore will cause it to be
157 # logged as DEBUG with the logging module.
158 #
159 PL_STDERR_IGNORE_LIST = []
160 # Info: PF 11802
161 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
162 # About to analyse intensity file s_4_0101_sig2.txt
163 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
164 # Will send output to standard output
165 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
166 # Found 31877 clusters
167 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
168 # Will use quality criterion ((CHASTITY>=0.6)
169 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
170 # Quality criterion translated to (($F[5]>=0.6))
171 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
172 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
173 #  AND
174 # opened s_4_0103_qhg.txt
175 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
176 # 81129 sequences out of 157651 passed filter criteria
177 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
178
179
180 def pl_stderr_ignore(line):
181   """
182   Searches lines for lines to ignore (i.e. not to log)
183
184   returns True if line should be ignored
185   returns False if line should NOT be ignored
186   """
187   for s in PL_STDERR_IGNORE_LIST:
188     if s.search(line):
189       return True
190   return False
191
192
193 def config_stdout_handler(line, conf_info):
194   """
195   Processes each line of output from GOAT
196   and stores useful information using the logging module
197
198   Loads useful information into conf_info as well, for future
199   use outside the function.
200
201   returns True if found condition that signifies success.
202   """
203
204   # Skip irrelevant line (without logging)
205   if s_skip.search(line):
206     pass
207
208   # Detect invalid command-line arguments
209   elif s_invalid_cmdline.search(line):
210     logging.error("Invalid commandline options!")
211
212   # Detect starting of configuration
213   elif s_start.search(line):
214     logging.info('START: Configuring pipeline')
215
216   # Detect it made it past invalid arguments
217   elif s_gerald.search(line):
218     logging.info('Running make now')
219
220   # Detect that make files have been generated (based on output)
221   elif s_generating.search(line):
222     logging.info('Make files generted')
223     return True
224
225   # Capture run directory
226   elif s_seq_folder.search(line):
227     mo = s_seq_folder_sub.search(line)
228     #Output changed when using --tiles=<tiles>
229     # at least in pipeline v0.3.0b2
230     if mo:
231       firecrest_bustard_gerald_makefile = line[mo.end():]
232       firecrest_bustard_gerald, junk = \
233                                 os.path.split(firecrest_bustard_gerald_makefile)
234       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
235       firecrest, junk = os.path.split(firecrest_bustard)
236
237       conf_info.bustard_path = firecrest_bustard
238       conf_info.run_path = firecrest
239     
240     #Standard output handling
241     else:
242       print 'Sequence line:', line
243       mo = s_seq_folder.search(line)
244       conf_info.bustard_path = line[mo.end():]
245       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
246
247   # Log all other output for debugging purposes
248   else:
249     logging.warning('CONF:?: %s' % (line))
250
251   return False
252
253
254
255 def config_stderr_handler(line, conf_info):
256   """
257   Processes each line of output from GOAT
258   and stores useful information using the logging module
259
260   Loads useful information into conf_info as well, for future
261   use outside the function.
262
263   returns RUN_ABORT upon detecting failure;
264           True on success message;
265           False if neutral message
266             (i.e. doesn't signify failure or success)
267   """
268   global SUPPRESS_MISSING_CYCLES
269
270   # Detect invalid species directory error
271   if s_species_dir_err.search(line):
272     logging.error(line)
273     return RUN_ABORT
274   # Detect goat_pipeline.py traceback
275   elif s_goat_traceb.search(line):
276     logging.error("Goat config script died, traceback in debug output")
277     return RUN_ABORT
278   # Detect indication of successful configuration (from stderr; odd, but ok)
279   elif s_stderr_taskcomplete.search(line):
280     logging.info('Configure step successful (from: stderr)')
281     return True
282   # Detect missing cycles
283   elif s_missing_cycles.search(line):
284
285     # Only display error once
286     if not SUPPRESS_MISSING_CYCLES:
287       logging.error("Missing cycles detected; Not all cycles copied?")
288       logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
289       SUPPRESS_MISSING_CYCLES = True
290     return RUN_ABORT
291   
292   # Log all other output as debug output
293   else:
294     logging.debug('CONF:STDERR:?: %s' % (line))
295
296   # Neutral (not failure; nor success)
297   return False
298
299
300 #def pipeline_stdout_handler(line, conf_info):
301 #  """
302 #  Processes each line of output from running the pipeline
303 #  and stores useful information using the logging module
304 #
305 #  Loads useful information into conf_info as well, for future
306 #  use outside the function.
307 #
308 #  returns True if found condition that signifies success.
309 #  """
310 #
311 #  #f.write(line + '\n')
312 #
313 #  return True
314
315
316
317 def pipeline_stderr_handler(line, conf_info):
318   """
319   Processes each line of stderr from pipelien run
320   and stores useful information using the logging module
321
322   ##FIXME: Future feature (doesn't actually do this yet)
323   #Loads useful information into conf_info as well, for future
324   #use outside the function.
325
326   returns RUN_FAILED upon detecting failure;
327           #True on success message; (no clear success state)
328           False if neutral message
329             (i.e. doesn't signify failure or success)
330   """
331
332   if pl_stderr_ignore(line):
333     pass
334   elif s_make_error.search(line):
335     logging.error("make error detected; run failed")
336     return RUN_FAILED
337   elif s_no_gnuplot.search(line):
338     logging.error("gnuplot not found")
339     return RUN_FAILED
340   elif s_no_convert.search(line):
341     logging.error("imagemagick's convert command not found")
342     return RUN_FAILED
343   elif s_no_ghostscript.search(line):
344     logging.error("ghostscript not found")
345     return RUN_FAILED
346   else:
347     logging.debug('PIPE:STDERR:?: %s' % (line))
348
349   return False
350
351
352 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir, 
353                     cfg_defaults=None):
354   """
355   Gets the config file from server...
356   requires config file in:
357     /etc/ga_frontend/ga_frontend.conf
358    or
359     ~/.ga_frontend.conf
360
361   with:
362   [config_file_server]
363   base_host_url: http://host:port
364
365   return True if successful, False is failure
366   """
367   options = getCombinedOptions(cfg_defaults)
368
369   if options.url is None:
370     logging.error("%s or %s missing base_host_url option" % \
371                   (CONFIG_USER, CONFIG_SYSTEM))
372     return False
373
374   try:
375     saveConfigFile(flowcell, options.url, cfg_filepath)
376     conf_info.config_filepath = cfg_filepath
377   except FlowCellNotFound, e:
378     logging.error(e)
379     return False
380   except WebError404, e:
381     logging.error(e)
382     return False
383   except IOError, e:
384     logging.error(e)
385     return False
386   except Exception, e:
387     logging.error(e)
388     return False
389
390   f = open(cfg_filepath, 'r')
391   data = f.read()
392   f.close()
393
394   genome_dict = getAvailableGenomes(genome_dir)
395   mapper_dict = constructMapperDict(genome_dict)
396
397   logging.debug(data)
398
399   f = open(cfg_filepath, 'w')
400   f.write(data % (mapper_dict))
401   f.close()
402   
403   return True
404   
405
406
407 def configure(conf_info):
408   """
409   Attempts to configure the GA pipeline using goat.
410
411   Uses logging module to store information about status.
412
413   returns True if configuration successful, otherwise False.
414   """
415   #ERROR Test:
416   #pipe = subprocess.Popen(['goat_pipeline.py',
417   #                         '--GERALD=config32bk.txt',
418   #                         '--make .',],
419   #                         #'.'],
420   #                        stdout=subprocess.PIPE,
421   #                        stderr=subprocess.PIPE)
422
423   #ERROR Test (2), causes goat_pipeline.py traceback
424   #pipe = subprocess.Popen(['goat_pipeline.py',
425   #                  '--GERALD=%s' % (conf_info.config_filepath),
426   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
427   #                         '--make',
428   #                         '.'],
429   #                        stdout=subprocess.PIPE,
430   #                        stderr=subprocess.PIPE)
431
432   ##########################
433   # Run configuration step
434   #   Not a test; actual configure attempt.
435   #pipe = subprocess.Popen(['goat_pipeline.py',
436   #                  '--GERALD=%s' % (conf_info.config_filepath),
437   #                         '--make',
438   #                         '.'],
439   #                        stdout=subprocess.PIPE,
440   #                        stderr=subprocess.PIPE)
441
442
443   stdout_filepath = os.path.join(conf_info.analysis_dir,
444                                  "pipeline_configure_stdout.txt")
445   stderr_filepath = os.path.join(conf_info.analysis_dir,
446                                  "pipeline_configure_stderr.txt")
447
448   fout = open(stdout_filepath, 'w')
449   ferr = open(stderr_filepath, 'w')
450   
451   pipe = subprocess.Popen(['goat_pipeline.py',
452                            '--GERALD=%s' % (conf_info.config_filepath),
453                            '--make',
454                            conf_info.analysis_dir],
455                            stdout=fout,
456                            stderr=ferr)
457
458   print "Configuring pipeline: %s" % (time.ctime())
459   error_code = pipe.wait()
460
461   # Clean up
462   fout.close()
463   ferr.close()
464   
465   
466   ##################
467   # Process stdout
468   fout = open(stdout_filepath, 'r')
469   
470   stdout_line = fout.readline()
471
472   complete = False
473   while stdout_line != '':
474     # Handle stdout
475     if config_stdout_handler(stdout_line, conf_info):
476       complete = True
477     stdout_line = fout.readline()
478
479   fout.close()
480
481
482   #error_code = pipe.wait()
483   if error_code:
484     logging.error('Recieved error_code: %s' % (error_code))
485   else:
486     logging.info('We are go for launch!')
487
488   #Process stderr
489   ferr = open(stderr_filepath, 'r')
490   stderr_line = ferr.readline()
491
492   abort = 'NO!'
493   stderr_success = False
494   while stderr_line != '':
495     stderr_status = config_stderr_handler(stderr_line, conf_info)
496     if stderr_status == RUN_ABORT:
497       abort = RUN_ABORT
498     elif stderr_status is True:
499       stderr_success = True
500     stderr_line = ferr.readline()
501
502   ferr.close()
503
504
505   #Success requirements:
506   # 1) The stdout completed without error
507   # 2) The program exited with status 0
508   # 3) No errors found in stdout
509   print '#Expect: True, False, True, True'
510   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
511   status = complete is True and \
512            bool(error_code) is False and \
513            abort != RUN_ABORT and \
514            stderr_success is True
515
516   # If everything was successful, but for some reason
517   #  we didn't retrieve the path info, log it.
518   if status is True:
519     if conf_info.bustard_path is None or conf_info.run_path is None:
520       logging.error("Failed to retrieve run_path")
521       return False
522   
523   return status
524
525
526 def run_pipeline(conf_info):
527   """
528   Run the pipeline and monitor status.
529   """
530   # Fail if the run_path doesn't actually exist
531   if not os.path.exists(conf_info.run_path):
532     logging.error('Run path does not exist: %s' \
533               % (conf_info.run_path))
534     return False
535
536   # Change cwd to run_path
537   stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
538   stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
539
540   # Create status object
541   conf_info.createStatusObject()
542
543   # Monitor file creation
544   wm = WatchManager()
545   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
546   event = RunEvent(conf_info)
547   notifier = ThreadedNotifier(wm, event)
548   notifier.start()
549   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
550
551   # Log pipeline starting
552   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
553   
554   # Start the pipeline (and hide!)
555   #pipe = subprocess.Popen(['make',
556   #                         '-j8',
557   #                         'recursive'],
558   #                        stdout=subprocess.PIPE,
559   #                        stderr=subprocess.PIPE)
560
561   fout = open(stdout_filepath, 'w')
562   ferr = open(stderr_filepath, 'w')
563
564   pipe = subprocess.Popen(['make',
565                            '--directory=%s' % (conf_info.run_path),
566                            '-j8',
567                            'recursive'],
568                            stdout=fout,
569                            stderr=ferr)
570                            #shell=True)
571   # Wait for run to finish
572   retcode = pipe.wait()
573
574
575   # Clean up
576   notifier.stop()
577   fout.close()
578   ferr.close()
579
580   # Process stderr
581   ferr = open(stderr_filepath, 'r')
582
583   run_failed_stderr = False
584   for line in ferr:
585     err_status = pipeline_stderr_handler(line, conf_info)
586     if err_status == RUN_FAILED:
587       run_failed_stderr = True
588
589   ferr.close()
590
591   # Finished file check!
592   print 'RUN SUCCESS CHECK:'
593   for key, value in event.run_status_dict.items():
594     print '  %s: %s' % (key, value)
595
596   dstatus = event.run_status_dict
597
598   # Success or failure check
599   status = (retcode == 0) and \
600            run_failed_stderr is False and \
601            dstatus['firecrest'] is True and \
602            dstatus['bustard'] is True and \
603            dstatus['gerald'] is True
604
605   return status
606
607