2174de1b31dc084b1cc67930a87184be65fb78d3
[htsworkflow.git] / htsworkflow / pipelines / configure_run.py
1 #!/usr/bin/python
2 __docformat__ = "restructuredtext en"
3
4 import subprocess
5 import logging
6 import time
7 import re
8 import os
9
10 from htsworkflow.pipelines.retrieve_config import \
11      CONFIG_SYSTEM, CONFIG_USER, \
12      FlowCellNotFound, getCombinedOptions, saveConfigFile, WebError404
13 from htsworkflow.pipelines.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
14 from htsworkflow.pipelines.run_status import GARunStatus
15
16 from pyinotify import WatchManager, ThreadedNotifier
17 from pyinotify import EventsCodes, ProcessEvent
18
19 class ConfigInfo:
20   
21   def __init__(self):
22     #run_path = firecrest analysis directory to run analysis from
23     self.run_path = None
24     self.bustard_path = None
25     self.config_filepath = None
26     self.status = None
27
28     #top level directory where all analyses are placed
29     self.base_analysis_dir = None
30     #analysis_dir, top level analysis dir...
31     # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
32     self.analysis_dir = None
33
34
35   def createStatusObject(self):
36     """
37     Creates a status object which can be queried for
38     status of running the pipeline
39
40     returns True if object created
41     returns False if object cannot be created
42     """
43     if self.config_filepath is None:
44       return False
45
46     self.status = GARunStatus(self.config_filepath)
47     return True
48
49
50
51 ####################################
52 # inotify event processor
53
54 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
55 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
56 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
57
58 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
59 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
60 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
61
62 class RunEvent(ProcessEvent):
63
64   def __init__(self, conf_info):
65
66     self.run_status_dict = {'firecrest': False,
67                             'bustard': False,
68                             'gerald': False}
69
70     self._ci = conf_info
71
72     ProcessEvent.__init__(self)
73     
74
75   def process_IN_CREATE(self, event):
76     fullpath = os.path.join(event.path, event.name)
77     if s_finished.search(fullpath):
78       logging.info("File Found: %s" % (fullpath))
79
80       if s_firecrest_finished.search(fullpath):
81         self.run_status_dict['firecrest'] = True
82         self._ci.status.updateFirecrest(event.name)
83       elif s_bustard_finished.search(fullpath):
84         self.run_status_dict['bustard'] = True
85         self._ci.status.updateBustard(event.name)
86       elif s_gerald_finished.search(fullpath):
87         self.run_status_dict['gerald'] = True
88         self._ci.status.updateGerald(event.name)
89
90     #WARNING: The following order is important!!
91     # Firecrest regex will catch all gerald, bustard, and firecrest
92     # Bustard regex will catch all gerald and bustard
93     # Gerald regex will catch all gerald
94     # So, order needs to be Gerald, Bustard, Firecrest, or this
95     #  won't work properly.
96     elif s_gerald_all.search(fullpath):
97       self._ci.status.updateGerald(event.name)
98     elif s_bustard_all.search(fullpath):
99       self._ci.status.updateBustard(event.name)
100     elif s_firecrest_all.search(fullpath):
101       self._ci.status.updateFirecrest(event.name)
102       
103     #print "Create: %s" % (os.path.join(event.path, event.name))
104
105   def process_IN_DELETE(self, event):
106     #print "Remove %s" % (os.path.join(event.path, event.name))
107     pass
108
109
110
111
112 #FLAGS
113 # Config Step Error
114 RUN_ABORT = 'abort'
115 # Run Step Error
116 RUN_FAILED = 'failed'
117
118
119 #####################################
120 # Configure Step (goat_pipeline.py)
121 #Info
122 s_start = re.compile('Starting Genome Analyzer Pipeline')
123 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
124 s_generating = re.compile('^Generating journals, Makefiles')
125 s_seq_folder = re.compile('^Sequence folder: ')
126 s_seq_folder_sub = re.compile('want to make ')
127 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
128
129 #Errors
130 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
131 s_species_dir_err = re.compile('Error: Lane [1-8]:')
132 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
133 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
134
135 SUPPRESS_MISSING_CYCLES = False
136
137
138 ##Ignore - Example of out above each ignore regex.
139 #NOTE: Commenting out an ignore will cause it to be
140 # logged as DEBUG with the logging module.
141 #CF_STDERR_IGNORE_LIST = []
142 s_skip = re.compile('s_[0-8]_[0-9]+')
143
144
145 ##########################################
146 # Pipeline Run Step (make -j8 recursive)
147
148 ##Info
149 s_finished = re.compile('finished')
150
151 ##Errors
152 s_make_error = re.compile('^make[\S\s]+Error')
153 s_no_gnuplot = re.compile('gnuplot: command not found')
154 s_no_convert = re.compile('^Can\'t exec "convert"')
155 s_no_ghostscript = re.compile('gs: command not found')
156
157 ##Ignore - Example of out above each ignore regex.
158 #NOTE: Commenting out an ignore will cause it to be
159 # logged as DEBUG with the logging module.
160 #
161 PL_STDERR_IGNORE_LIST = []
162 # Info: PF 11802
163 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
164 # About to analyse intensity file s_4_0101_sig2.txt
165 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
166 # Will send output to standard output
167 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
168 # Found 31877 clusters
169 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
170 # Will use quality criterion ((CHASTITY>=0.6)
171 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
172 # Quality criterion translated to (($F[5]>=0.6))
173 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
174 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
175 #  AND
176 # opened s_4_0103_qhg.txt
177 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
178 # 81129 sequences out of 157651 passed filter criteria
179 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
180
181
182 def pl_stderr_ignore(line):
183   """
184   Searches lines for lines to ignore (i.e. not to log)
185
186   returns True if line should be ignored
187   returns False if line should NOT be ignored
188   """
189   for s in PL_STDERR_IGNORE_LIST:
190     if s.search(line):
191       return True
192   return False
193
194
195 def config_stdout_handler(line, conf_info):
196   """
197   Processes each line of output from GOAT
198   and stores useful information using the logging module
199
200   Loads useful information into conf_info as well, for future
201   use outside the function.
202
203   returns True if found condition that signifies success.
204   """
205
206   # Skip irrelevant line (without logging)
207   if s_skip.search(line):
208     pass
209
210   # Detect invalid command-line arguments
211   elif s_invalid_cmdline.search(line):
212     logging.error("Invalid commandline options!")
213
214   # Detect starting of configuration
215   elif s_start.search(line):
216     logging.info('START: Configuring pipeline')
217
218   # Detect it made it past invalid arguments
219   elif s_gerald.search(line):
220     logging.info('Running make now')
221
222   # Detect that make files have been generated (based on output)
223   elif s_generating.search(line):
224     logging.info('Make files generted')
225     return True
226
227   # Capture run directory
228   elif s_seq_folder.search(line):
229     mo = s_seq_folder_sub.search(line)
230     #Output changed when using --tiles=<tiles>
231     # at least in pipeline v0.3.0b2
232     if mo:
233       firecrest_bustard_gerald_makefile = line[mo.end():]
234       firecrest_bustard_gerald, junk = \
235                                 os.path.split(firecrest_bustard_gerald_makefile)
236       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
237       firecrest, junk = os.path.split(firecrest_bustard)
238
239       conf_info.bustard_path = firecrest_bustard
240       conf_info.run_path = firecrest
241     
242     #Standard output handling
243     else:
244       print 'Sequence line:', line
245       mo = s_seq_folder.search(line)
246       conf_info.bustard_path = line[mo.end():]
247       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
248
249   # Log all other output for debugging purposes
250   else:
251     logging.warning('CONF:?: %s' % (line))
252
253   return False
254
255
256
257 def config_stderr_handler(line, conf_info):
258   """
259   Processes each line of output from GOAT
260   and stores useful information using the logging module
261
262   Loads useful information into conf_info as well, for future
263   use outside the function.
264
265   returns RUN_ABORT upon detecting failure;
266           True on success message;
267           False if neutral message
268             (i.e. doesn't signify failure or success)
269   """
270   global SUPPRESS_MISSING_CYCLES
271
272   # Detect invalid species directory error
273   if s_species_dir_err.search(line):
274     logging.error(line)
275     return RUN_ABORT
276   # Detect goat_pipeline.py traceback
277   elif s_goat_traceb.search(line):
278     logging.error("Goat config script died, traceback in debug output")
279     return RUN_ABORT
280   # Detect indication of successful configuration (from stderr; odd, but ok)
281   elif s_stderr_taskcomplete.search(line):
282     logging.info('Configure step successful (from: stderr)')
283     return True
284   # Detect missing cycles
285   elif s_missing_cycles.search(line):
286
287     # Only display error once
288     if not SUPPRESS_MISSING_CYCLES:
289       logging.error("Missing cycles detected; Not all cycles copied?")
290       logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
291       SUPPRESS_MISSING_CYCLES = True
292     return RUN_ABORT
293   
294   # Log all other output as debug output
295   else:
296     logging.debug('CONF:STDERR:?: %s' % (line))
297
298   # Neutral (not failure; nor success)
299   return False
300
301
302 #def pipeline_stdout_handler(line, conf_info):
303 #  """
304 #  Processes each line of output from running the pipeline
305 #  and stores useful information using the logging module
306 #
307 #  Loads useful information into conf_info as well, for future
308 #  use outside the function.
309 #
310 #  returns True if found condition that signifies success.
311 #  """
312 #
313 #  #f.write(line + '\n')
314 #
315 #  return True
316
317
318
319 def pipeline_stderr_handler(line, conf_info):
320   """
321   Processes each line of stderr from pipelien run
322   and stores useful information using the logging module
323
324   ##FIXME: Future feature (doesn't actually do this yet)
325   #Loads useful information into conf_info as well, for future
326   #use outside the function.
327
328   returns RUN_FAILED upon detecting failure;
329           #True on success message; (no clear success state)
330           False if neutral message
331             (i.e. doesn't signify failure or success)
332   """
333
334   if pl_stderr_ignore(line):
335     pass
336   elif s_make_error.search(line):
337     logging.error("make error detected; run failed")
338     return RUN_FAILED
339   elif s_no_gnuplot.search(line):
340     logging.error("gnuplot not found")
341     return RUN_FAILED
342   elif s_no_convert.search(line):
343     logging.error("imagemagick's convert command not found")
344     return RUN_FAILED
345   elif s_no_ghostscript.search(line):
346     logging.error("ghostscript not found")
347     return RUN_FAILED
348   else:
349     logging.debug('PIPE:STDERR:?: %s' % (line))
350
351   return False
352
353
354 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir, 
355                     cfg_defaults=None):
356   """
357   Gets the config file from server...
358   requires config file in:
359     /etc/ga_frontend/ga_frontend.conf
360    or
361     ~/.ga_frontend.conf
362
363   with:
364   [config_file_server]
365   base_host_url: http://host:port
366
367   return True if successful, False is failure
368   """
369   options = getCombinedOptions(cfg_defaults)
370
371   if options.url is None:
372     logging.error("%s or %s missing base_host_url option" % \
373                   (CONFIG_USER, CONFIG_SYSTEM))
374     return False
375
376   try:
377     saveConfigFile(flowcell, options.url, cfg_filepath)
378     conf_info.config_filepath = cfg_filepath
379   except FlowCellNotFound, e:
380     logging.error(e)
381     return False
382   except WebError404, e:
383     logging.error(e)
384     return False
385   except IOError, e:
386     logging.error(e)
387     return False
388   except Exception, e:
389     logging.error(e)
390     return False
391
392   f = open(cfg_filepath, 'r')
393   data = f.read()
394   f.close()
395
396   genome_dict = getAvailableGenomes(genome_dir)
397   mapper_dict = constructMapperDict(genome_dict)
398
399   logging.debug(data)
400
401   f = open(cfg_filepath, 'w')
402   f.write(data % (mapper_dict))
403   f.close()
404   
405   return True
406   
407
408
409 def configure(conf_info):
410   """
411   Attempts to configure the GA pipeline using goat.
412
413   Uses logging module to store information about status.
414
415   returns True if configuration successful, otherwise False.
416   """
417   #ERROR Test:
418   #pipe = subprocess.Popen(['goat_pipeline.py',
419   #                         '--GERALD=config32bk.txt',
420   #                         '--make .',],
421   #                         #'.'],
422   #                        stdout=subprocess.PIPE,
423   #                        stderr=subprocess.PIPE)
424
425   #ERROR Test (2), causes goat_pipeline.py traceback
426   #pipe = subprocess.Popen(['goat_pipeline.py',
427   #                  '--GERALD=%s' % (conf_info.config_filepath),
428   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
429   #                         '--make',
430   #                         '.'],
431   #                        stdout=subprocess.PIPE,
432   #                        stderr=subprocess.PIPE)
433
434   ##########################
435   # Run configuration step
436   #   Not a test; actual configure attempt.
437   #pipe = subprocess.Popen(['goat_pipeline.py',
438   #                  '--GERALD=%s' % (conf_info.config_filepath),
439   #                         '--make',
440   #                         '.'],
441   #                        stdout=subprocess.PIPE,
442   #                        stderr=subprocess.PIPE)
443
444
445   stdout_filepath = os.path.join(conf_info.analysis_dir,
446                                  "pipeline_configure_stdout.txt")
447   stderr_filepath = os.path.join(conf_info.analysis_dir,
448                                  "pipeline_configure_stderr.txt")
449
450   fout = open(stdout_filepath, 'w')
451   ferr = open(stderr_filepath, 'w')
452   
453   pipe = subprocess.Popen(['goat_pipeline.py',
454                            '--GERALD=%s' % (conf_info.config_filepath),
455                            '--make',
456                            conf_info.analysis_dir],
457                            stdout=fout,
458                            stderr=ferr)
459
460   print "Configuring pipeline: %s" % (time.ctime())
461   error_code = pipe.wait()
462
463   # Clean up
464   fout.close()
465   ferr.close()
466   
467   
468   ##################
469   # Process stdout
470   fout = open(stdout_filepath, 'r')
471   
472   stdout_line = fout.readline()
473
474   complete = False
475   while stdout_line != '':
476     # Handle stdout
477     if config_stdout_handler(stdout_line, conf_info):
478       complete = True
479     stdout_line = fout.readline()
480
481   fout.close()
482
483
484   #error_code = pipe.wait()
485   if error_code:
486     logging.error('Recieved error_code: %s' % (error_code))
487   else:
488     logging.info('We are go for launch!')
489
490   #Process stderr
491   ferr = open(stderr_filepath, 'r')
492   stderr_line = ferr.readline()
493
494   abort = 'NO!'
495   stderr_success = False
496   while stderr_line != '':
497     stderr_status = config_stderr_handler(stderr_line, conf_info)
498     if stderr_status == RUN_ABORT:
499       abort = RUN_ABORT
500     elif stderr_status is True:
501       stderr_success = True
502     stderr_line = ferr.readline()
503
504   ferr.close()
505
506
507   #Success requirements:
508   # 1) The stdout completed without error
509   # 2) The program exited with status 0
510   # 3) No errors found in stdout
511   print '#Expect: True, False, True, True'
512   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
513   status = complete is True and \
514            bool(error_code) is False and \
515            abort != RUN_ABORT and \
516            stderr_success is True
517
518   # If everything was successful, but for some reason
519   #  we didn't retrieve the path info, log it.
520   if status is True:
521     if conf_info.bustard_path is None or conf_info.run_path is None:
522       logging.error("Failed to retrieve run_path")
523       return False
524   
525   return status
526
527
528 def run_pipeline(conf_info):
529   """
530   Run the pipeline and monitor status.
531   """
532   # Fail if the run_path doesn't actually exist
533   if not os.path.exists(conf_info.run_path):
534     logging.error('Run path does not exist: %s' \
535               % (conf_info.run_path))
536     return False
537
538   # Change cwd to run_path
539   stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
540   stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
541
542   # Create status object
543   conf_info.createStatusObject()
544
545   # Monitor file creation
546   wm = WatchManager()
547   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
548   event = RunEvent(conf_info)
549   notifier = ThreadedNotifier(wm, event)
550   notifier.start()
551   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
552
553   # Log pipeline starting
554   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
555   
556   # Start the pipeline (and hide!)
557   #pipe = subprocess.Popen(['make',
558   #                         '-j8',
559   #                         'recursive'],
560   #                        stdout=subprocess.PIPE,
561   #                        stderr=subprocess.PIPE)
562
563   fout = open(stdout_filepath, 'w')
564   ferr = open(stderr_filepath, 'w')
565
566   pipe = subprocess.Popen(['make',
567                            '--directory=%s' % (conf_info.run_path),
568                            '-j8',
569                            'recursive'],
570                            stdout=fout,
571                            stderr=ferr)
572                            #shell=True)
573   # Wait for run to finish
574   retcode = pipe.wait()
575
576
577   # Clean up
578   notifier.stop()
579   fout.close()
580   ferr.close()
581
582   # Process stderr
583   ferr = open(stderr_filepath, 'r')
584
585   run_failed_stderr = False
586   for line in ferr:
587     err_status = pipeline_stderr_handler(line, conf_info)
588     if err_status == RUN_FAILED:
589       run_failed_stderr = True
590
591   ferr.close()
592
593   # Finished file check!
594   print 'RUN SUCCESS CHECK:'
595   for key, value in event.run_status_dict.items():
596     print '  %s: %s' % (key, value)
597
598   dstatus = event.run_status_dict
599
600   # Success or failure check
601   status = (retcode == 0) and \
602            run_failed_stderr is False and \
603            dstatus['firecrest'] is True and \
604            dstatus['bustard'] is True and \
605            dstatus['gerald'] is True
606
607   return status
608
609