remove cruft from an unimplemented feature
[htsworkflow.git] / htsworkflow / pipelines / configure_run.py
1 #!/usr/bin/python
2 __docformat__ = "restructuredtext en"
3
4 import subprocess
5 import logging
6 import time
7 import re
8 import os
9
10 from htsworkflow.pipelines.retrieve_config import \
11      CONFIG_SYSTEM, CONFIG_USER, \
12      FlowCellNotFound, getCombinedOptions, saveConfigFile, WebError404
13 from htsworkflow.pipelines.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
14 from htsworkflow.pipelines.run_status import GARunStatus
15
16 from pyinotify import WatchManager, ThreadedNotifier
17 from pyinotify import EventsCodes, ProcessEvent
18
19 class ConfigInfo:
20   
21   def __init__(self):
22     #run_path = firecrest analysis directory to run analysis from
23     self.run_path = None
24     self.bustard_path = None
25     self.config_filepath = None
26     self.status = None
27
28     #top level directory where all analyses are placed
29     self.base_analysis_dir = None
30     #analysis_dir, top level analysis dir...
31     # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
32     self.analysis_dir = None
33
34
35   def createStatusObject(self):
36     """
37     Creates a status object which can be queried for
38     status of running the pipeline
39
40     returns True if object created
41     returns False if object cannot be created
42     """
43     if self.config_filepath is None:
44       return False
45
46     self.status = GARunStatus(self.config_filepath)
47     return True
48
49
50
51 ####################################
52 # inotify event processor
53
54 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
55 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
56 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
57
58 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
59 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
60 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
61
62 class RunEvent(ProcessEvent):
63
64   def __init__(self, conf_info):
65
66     self.run_status_dict = {'firecrest': False,
67                             'bustard': False,
68                             'gerald': False}
69
70     self._ci = conf_info
71
72     ProcessEvent.__init__(self)
73     
74
75   def process_IN_CREATE(self, event):
76     fullpath = os.path.join(event.path, event.name)
77     if s_finished.search(fullpath):
78       logging.info("File Found: %s" % (fullpath))
79
80       if s_firecrest_finished.search(fullpath):
81         self.run_status_dict['firecrest'] = True
82         self._ci.status.updateFirecrest(event.name)
83       elif s_bustard_finished.search(fullpath):
84         self.run_status_dict['bustard'] = True
85         self._ci.status.updateBustard(event.name)
86       elif s_gerald_finished.search(fullpath):
87         self.run_status_dict['gerald'] = True
88         self._ci.status.updateGerald(event.name)
89
90     #WARNING: The following order is important!!
91     # Firecrest regex will catch all gerald, bustard, and firecrest
92     # Bustard regex will catch all gerald and bustard
93     # Gerald regex will catch all gerald
94     # So, order needs to be Gerald, Bustard, Firecrest, or this
95     #  won't work properly.
96     elif s_gerald_all.search(fullpath):
97       self._ci.status.updateGerald(event.name)
98     elif s_bustard_all.search(fullpath):
99       self._ci.status.updateBustard(event.name)
100     elif s_firecrest_all.search(fullpath):
101       self._ci.status.updateFirecrest(event.name)
102       
103     #print "Create: %s" % (os.path.join(event.path, event.name))
104
105   def process_IN_DELETE(self, event):
106     #print "Remove %s" % (os.path.join(event.path, event.name))
107     pass
108
109
110
111
112 #FLAGS
113 # Config Step Error
114 RUN_ABORT = 'abort'
115 # Run Step Error
116 RUN_FAILED = 'failed'
117
118
119 #####################################
120 # Configure Step (goat_pipeline.py)
121 #Info
122 s_start = re.compile('Starting Genome Analyzer Pipeline')
123 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
124 s_generating = re.compile('^Generating journals, Makefiles')
125 s_seq_folder = re.compile('^Sequence folder: ')
126 s_seq_folder_sub = re.compile('want to make ')
127 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
128
129 #Errors
130 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
131 s_species_dir_err = re.compile('Error: Lane [1-8]:')
132 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
133 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
134
135 SUPPRESS_MISSING_CYCLES = False
136
137
138 ##Ignore - Example of out above each ignore regex.
139 #NOTE: Commenting out an ignore will cause it to be
140 # logged as DEBUG with the logging module.
141 #CF_STDERR_IGNORE_LIST = []
142 s_skip = re.compile('s_[0-8]_[0-9]+')
143
144
145 ##########################################
146 # Pipeline Run Step (make -j8 recursive)
147
148 ##Info
149 s_finished = re.compile('finished')
150
151 ##Errors
152 s_make_error = re.compile('^make[\S\s]+Error')
153 s_no_gnuplot = re.compile('gnuplot: command not found')
154 s_no_convert = re.compile('^Can\'t exec "convert"')
155 s_no_ghostscript = re.compile('gs: command not found')
156
157 ##Ignore - Example of out above each ignore regex.
158 #NOTE: Commenting out an ignore will cause it to be
159 # logged as DEBUG with the logging module.
160 #
161 PL_STDERR_IGNORE_LIST = []
162 # Info: PF 11802
163 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
164 # About to analyse intensity file s_4_0101_sig2.txt
165 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
166 # Will send output to standard output
167 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
168 # Found 31877 clusters
169 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
170 # Will use quality criterion ((CHASTITY>=0.6)
171 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
172 # Quality criterion translated to (($F[5]>=0.6))
173 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
174 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
175 #  AND
176 # opened s_4_0103_qhg.txt
177 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
178 # 81129 sequences out of 157651 passed filter criteria
179 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
180
181
182 def pl_stderr_ignore(line):
183   """
184   Searches lines for lines to ignore (i.e. not to log)
185
186   returns True if line should be ignored
187   returns False if line should NOT be ignored
188   """
189   for s in PL_STDERR_IGNORE_LIST:
190     if s.search(line):
191       return True
192   return False
193
194
195 def config_stdout_handler(line, conf_info):
196   """
197   Processes each line of output from GOAT
198   and stores useful information using the logging module
199
200   Loads useful information into conf_info as well, for future
201   use outside the function.
202
203   returns True if found condition that signifies success.
204   """
205
206   # Skip irrelevant line (without logging)
207   if s_skip.search(line):
208     pass
209
210   # Detect invalid command-line arguments
211   elif s_invalid_cmdline.search(line):
212     logging.error("Invalid commandline options!")
213
214   # Detect starting of configuration
215   elif s_start.search(line):
216     logging.info('START: Configuring pipeline')
217
218   # Detect it made it past invalid arguments
219   elif s_gerald.search(line):
220     logging.info('Running make now')
221
222   # Detect that make files have been generated (based on output)
223   elif s_generating.search(line):
224     logging.info('Make files generted')
225     return True
226
227   # Capture run directory
228   elif s_seq_folder.search(line):
229     mo = s_seq_folder_sub.search(line)
230     #Output changed when using --tiles=<tiles>
231     # at least in pipeline v0.3.0b2
232     if mo:
233       firecrest_bustard_gerald_makefile = line[mo.end():]
234       firecrest_bustard_gerald, junk = \
235                                 os.path.split(firecrest_bustard_gerald_makefile)
236       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
237       firecrest, junk = os.path.split(firecrest_bustard)
238
239       conf_info.bustard_path = firecrest_bustard
240       conf_info.run_path = firecrest
241     
242     #Standard output handling
243     else:
244       print 'Sequence line:', line
245       mo = s_seq_folder.search(line)
246       conf_info.bustard_path = line[mo.end():]
247       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
248
249   # Log all other output for debugging purposes
250   else:
251     logging.warning('CONF:?: %s' % (line))
252
253   return False
254
255
256
257 def config_stderr_handler(line, conf_info):
258   """
259   Processes each line of output from GOAT
260   and stores useful information using the logging module
261
262   Loads useful information into conf_info as well, for future
263   use outside the function.
264
265   returns RUN_ABORT upon detecting failure;
266           True on success message;
267           False if neutral message
268             (i.e. doesn't signify failure or success)
269   """
270   global SUPPRESS_MISSING_CYCLES
271
272   # Detect invalid species directory error
273   if s_species_dir_err.search(line):
274     logging.error(line)
275     return RUN_ABORT
276   # Detect goat_pipeline.py traceback
277   elif s_goat_traceb.search(line):
278     logging.error("Goat config script died, traceback in debug output")
279     return RUN_ABORT
280   # Detect indication of successful configuration (from stderr; odd, but ok)
281   elif s_stderr_taskcomplete.search(line):
282     logging.info('Configure step successful (from: stderr)')
283     return True
284   # Detect missing cycles
285   elif s_missing_cycles.search(line):
286
287     # Only display error once
288     if not SUPPRESS_MISSING_CYCLES:
289       logging.error("Missing cycles detected; Not all cycles copied?")
290       logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
291       SUPPRESS_MISSING_CYCLES = True
292     return RUN_ABORT
293   
294   # Log all other output as debug output
295   else:
296     logging.debug('CONF:STDERR:?: %s' % (line))
297
298   # Neutral (not failure; nor success)
299   return False
300
301
302 #def pipeline_stdout_handler(line, conf_info):
303 #  """
304 #  Processes each line of output from running the pipeline
305 #  and stores useful information using the logging module
306 #
307 #  Loads useful information into conf_info as well, for future
308 #  use outside the function.
309 #
310 #  returns True if found condition that signifies success.
311 #  """
312 #
313 #  #f.write(line + '\n')
314 #
315 #  return True
316
317
318
319 def pipeline_stderr_handler(line, conf_info):
320   """
321   Processes each line of stderr from pipelien run
322   and stores useful information using the logging module
323
324   ##FIXME: Future feature (doesn't actually do this yet)
325   #Loads useful information into conf_info as well, for future
326   #use outside the function.
327
328   returns RUN_FAILED upon detecting failure;
329           #True on success message; (no clear success state)
330           False if neutral message
331             (i.e. doesn't signify failure or success)
332   """
333
334   if pl_stderr_ignore(line):
335     pass
336   elif s_make_error.search(line):
337     logging.error("make error detected; run failed")
338     return RUN_FAILED
339   elif s_no_gnuplot.search(line):
340     logging.error("gnuplot not found")
341     return RUN_FAILED
342   elif s_no_convert.search(line):
343     logging.error("imagemagick's convert command not found")
344     return RUN_FAILED
345   elif s_no_ghostscript.search(line):
346     logging.error("ghostscript not found")
347     return RUN_FAILED
348   else:
349     logging.debug('PIPE:STDERR:?: %s' % (line))
350
351   return False
352
353
354 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
355   """
356   Gets the config file from server...
357   requires config file in:
358     /etc/ga_frontend/ga_frontend.conf
359    or
360     ~/.ga_frontend.conf
361
362   with:
363   [config_file_server]
364   base_host_url: http://host:port
365
366   return True if successful, False is failure
367   """
368   options = getCombinedOptions()
369
370   if options.url is None:
371     logging.error("%s or %s missing base_host_url option" % \
372                   (CONFIG_USER, CONFIG_SYSTEM))
373     return False
374
375   try:
376     saveConfigFile(flowcell, options.url, cfg_filepath)
377     conf_info.config_filepath = cfg_filepath
378   except FlowCellNotFound, e:
379     logging.error(e)
380     return False
381   except WebError404, e:
382     logging.error(e)
383     return False
384   except IOError, e:
385     logging.error(e)
386     return False
387   except Exception, e:
388     logging.error(e)
389     return False
390
391   f = open(cfg_filepath, 'r')
392   data = f.read()
393   f.close()
394
395   genome_dict = getAvailableGenomes(genome_dir)
396   mapper_dict = constructMapperDict(genome_dict)
397
398   logging.debug(data)
399
400   f = open(cfg_filepath, 'w')
401   f.write(data % (mapper_dict))
402   f.close()
403   
404   return True
405   
406
407
408 def configure(conf_info):
409   """
410   Attempts to configure the GA pipeline using goat.
411
412   Uses logging module to store information about status.
413
414   returns True if configuration successful, otherwise False.
415   """
416   #ERROR Test:
417   #pipe = subprocess.Popen(['goat_pipeline.py',
418   #                         '--GERALD=config32bk.txt',
419   #                         '--make .',],
420   #                         #'.'],
421   #                        stdout=subprocess.PIPE,
422   #                        stderr=subprocess.PIPE)
423
424   #ERROR Test (2), causes goat_pipeline.py traceback
425   #pipe = subprocess.Popen(['goat_pipeline.py',
426   #                  '--GERALD=%s' % (conf_info.config_filepath),
427   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
428   #                         '--make',
429   #                         '.'],
430   #                        stdout=subprocess.PIPE,
431   #                        stderr=subprocess.PIPE)
432
433   ##########################
434   # Run configuration step
435   #   Not a test; actual configure attempt.
436   #pipe = subprocess.Popen(['goat_pipeline.py',
437   #                  '--GERALD=%s' % (conf_info.config_filepath),
438   #                         '--make',
439   #                         '.'],
440   #                        stdout=subprocess.PIPE,
441   #                        stderr=subprocess.PIPE)
442
443
444   stdout_filepath = os.path.join(conf_info.analysis_dir,
445                                  "pipeline_configure_stdout.txt")
446   stderr_filepath = os.path.join(conf_info.analysis_dir,
447                                  "pipeline_configure_stderr.txt")
448
449   fout = open(stdout_filepath, 'w')
450   ferr = open(stderr_filepath, 'w')
451   
452   pipe = subprocess.Popen(['goat_pipeline.py',
453                            '--GERALD=%s' % (conf_info.config_filepath),
454                            '--make',
455                            conf_info.analysis_dir],
456                            stdout=fout,
457                            stderr=ferr)
458
459   print "Configuring pipeline: %s" % (time.ctime())
460   error_code = pipe.wait()
461
462   # Clean up
463   fout.close()
464   ferr.close()
465   
466   
467   ##################
468   # Process stdout
469   fout = open(stdout_filepath, 'r')
470   
471   stdout_line = fout.readline()
472
473   complete = False
474   while stdout_line != '':
475     # Handle stdout
476     if config_stdout_handler(stdout_line, conf_info):
477       complete = True
478     stdout_line = fout.readline()
479
480   fout.close()
481
482
483   #error_code = pipe.wait()
484   if error_code:
485     logging.error('Recieved error_code: %s' % (error_code))
486   else:
487     logging.info('We are go for launch!')
488
489   #Process stderr
490   ferr = open(stderr_filepath, 'r')
491   stderr_line = ferr.readline()
492
493   abort = 'NO!'
494   stderr_success = False
495   while stderr_line != '':
496     stderr_status = config_stderr_handler(stderr_line, conf_info)
497     if stderr_status == RUN_ABORT:
498       abort = RUN_ABORT
499     elif stderr_status is True:
500       stderr_success = True
501     stderr_line = ferr.readline()
502
503   ferr.close()
504
505
506   #Success requirements:
507   # 1) The stdout completed without error
508   # 2) The program exited with status 0
509   # 3) No errors found in stdout
510   print '#Expect: True, False, True, True'
511   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
512   status = complete is True and \
513            bool(error_code) is False and \
514            abort != RUN_ABORT and \
515            stderr_success is True
516
517   # If everything was successful, but for some reason
518   #  we didn't retrieve the path info, log it.
519   if status is True:
520     if conf_info.bustard_path is None or conf_info.run_path is None:
521       logging.error("Failed to retrieve run_path")
522       return False
523   
524   return status
525
526
527 def run_pipeline(conf_info):
528   """
529   Run the pipeline and monitor status.
530   """
531   # Fail if the run_path doesn't actually exist
532   if not os.path.exists(conf_info.run_path):
533     logging.error('Run path does not exist: %s' \
534               % (conf_info.run_path))
535     return False
536
537   # Change cwd to run_path
538   stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
539   stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
540
541   # Create status object
542   conf_info.createStatusObject()
543
544   # Monitor file creation
545   wm = WatchManager()
546   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
547   event = RunEvent(conf_info)
548   notifier = ThreadedNotifier(wm, event)
549   notifier.start()
550   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
551
552   # Log pipeline starting
553   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
554   
555   # Start the pipeline (and hide!)
556   #pipe = subprocess.Popen(['make',
557   #                         '-j8',
558   #                         'recursive'],
559   #                        stdout=subprocess.PIPE,
560   #                        stderr=subprocess.PIPE)
561
562   fout = open(stdout_filepath, 'w')
563   ferr = open(stderr_filepath, 'w')
564
565   pipe = subprocess.Popen(['make',
566                            '--directory=%s' % (conf_info.run_path),
567                            '-j8',
568                            'recursive'],
569                            stdout=fout,
570                            stderr=ferr)
571                            #shell=True)
572   # Wait for run to finish
573   retcode = pipe.wait()
574
575
576   # Clean up
577   notifier.stop()
578   fout.close()
579   ferr.close()
580
581   # Process stderr
582   ferr = open(stderr_filepath, 'r')
583
584   run_failed_stderr = False
585   for line in ferr:
586     err_status = pipeline_stderr_handler(line, conf_info)
587     if err_status == RUN_FAILED:
588       run_failed_stderr = True
589
590   ferr.close()
591
592   # Finished file check!
593   print 'RUN SUCCESS CHECK:'
594   for key, value in event.run_status_dict.items():
595     print '  %s: %s' % (key, value)
596
597   dstatus = event.run_status_dict
598
599   # Success or failure check
600   status = (retcode == 0) and \
601            run_failed_stderr is False and \
602            dstatus['firecrest'] is True and \
603            dstatus['bustard'] is True and \
604            dstatus['gerald'] is True
605
606   return status
607
608