rename config file to something that doesn't include the read length
[htsworkflow.git] / htsworkflow / pipelines / configure_run.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from htsworkflow.pipelines.retrieve_config import getCombinedOptions, saveConfigFile
9 from htsworkflow.pipelines.retrieve_config import FlowCellNotFound, WebError404
10 from htsworkflow.pipelines.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
11 from htsworkflow.pipelines.run_status import GARunStatus
12
13 from pyinotify import WatchManager, ThreadedNotifier
14 from pyinotify import EventsCodes, ProcessEvent
15
16 class ConfigInfo:
17   
18   def __init__(self):
19     #run_path = firecrest analysis directory to run analysis from
20     self.run_path = None
21     self.bustard_path = None
22     self.config_filepath = None
23     self.status = None
24
25     #top level directory where all analyses are placed
26     self.base_analysis_dir = None
27     #analysis_dir, top level analysis dir...
28     # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
29     self.analysis_dir = None
30
31
32   def createStatusObject(self):
33     """
34     Creates a status object which can be queried for
35     status of running the pipeline
36
37     returns True if object created
38     returns False if object cannot be created
39     """
40     if self.config_filepath is None:
41       return False
42
43     self.status = GARunStatus(self.config_filepath)
44     return True
45
46
47
48 ####################################
49 # inotify event processor
50
51 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
52 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
53 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
54
55 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
56 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
57 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
58
59 class RunEvent(ProcessEvent):
60
61   def __init__(self, conf_info):
62
63     self.run_status_dict = {'firecrest': False,
64                             'bustard': False,
65                             'gerald': False}
66
67     self._ci = conf_info
68
69     ProcessEvent.__init__(self)
70     
71
72   def process_IN_CREATE(self, event):
73     fullpath = os.path.join(event.path, event.name)
74     if s_finished.search(fullpath):
75       logging.info("File Found: %s" % (fullpath))
76
77       if s_firecrest_finished.search(fullpath):
78         self.run_status_dict['firecrest'] = True
79         self._ci.status.updateFirecrest(event.name)
80       elif s_bustard_finished.search(fullpath):
81         self.run_status_dict['bustard'] = True
82         self._ci.status.updateBustard(event.name)
83       elif s_gerald_finished.search(fullpath):
84         self.run_status_dict['gerald'] = True
85         self._ci.status.updateGerald(event.name)
86
87     #WARNING: The following order is important!!
88     # Firecrest regex will catch all gerald, bustard, and firecrest
89     # Bustard regex will catch all gerald and bustard
90     # Gerald regex will catch all gerald
91     # So, order needs to be Gerald, Bustard, Firecrest, or this
92     #  won't work properly.
93     elif s_gerald_all.search(fullpath):
94       self._ci.status.updateGerald(event.name)
95     elif s_bustard_all.search(fullpath):
96       self._ci.status.updateBustard(event.name)
97     elif s_firecrest_all.search(fullpath):
98       self._ci.status.updateFirecrest(event.name)
99       
100     #print "Create: %s" % (os.path.join(event.path, event.name))
101
102   def process_IN_DELETE(self, event):
103     #print "Remove %s" % (os.path.join(event.path, event.name))
104     pass
105
106
107
108
109 #FLAGS
110 # Config Step Error
111 RUN_ABORT = 'abort'
112 # Run Step Error
113 RUN_FAILED = 'failed'
114
115
116 #####################################
117 # Configure Step (goat_pipeline.py)
118 #Info
119 s_start = re.compile('Starting Genome Analyzer Pipeline')
120 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
121 s_generating = re.compile('^Generating journals, Makefiles')
122 s_seq_folder = re.compile('^Sequence folder: ')
123 s_seq_folder_sub = re.compile('want to make ')
124 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
125
126 #Errors
127 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
128 s_species_dir_err = re.compile('Error: Lane [1-8]:')
129 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
130 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
131
132 SUPPRESS_MISSING_CYCLES = False
133
134
135 ##Ignore - Example of out above each ignore regex.
136 #NOTE: Commenting out an ignore will cause it to be
137 # logged as DEBUG with the logging module.
138 #CF_STDERR_IGNORE_LIST = []
139 s_skip = re.compile('s_[0-8]_[0-9]+')
140
141
142 ##########################################
143 # Pipeline Run Step (make -j8 recursive)
144
145 ##Info
146 s_finished = re.compile('finished')
147
148 ##Errors
149 s_make_error = re.compile('^make[\S\s]+Error')
150 s_no_gnuplot = re.compile('gnuplot: command not found')
151 s_no_convert = re.compile('^Can\'t exec "convert"')
152 s_no_ghostscript = re.compile('gs: command not found')
153
154 ##Ignore - Example of out above each ignore regex.
155 #NOTE: Commenting out an ignore will cause it to be
156 # logged as DEBUG with the logging module.
157 #
158 PL_STDERR_IGNORE_LIST = []
159 # Info: PF 11802
160 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
161 # About to analyse intensity file s_4_0101_sig2.txt
162 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
163 # Will send output to standard output
164 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
165 # Found 31877 clusters
166 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
167 # Will use quality criterion ((CHASTITY>=0.6)
168 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
169 # Quality criterion translated to (($F[5]>=0.6))
170 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
171 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
172 #  AND
173 # opened s_4_0103_qhg.txt
174 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
175 # 81129 sequences out of 157651 passed filter criteria
176 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
177
178
179 def pl_stderr_ignore(line):
180   """
181   Searches lines for lines to ignore (i.e. not to log)
182
183   returns True if line should be ignored
184   returns False if line should NOT be ignored
185   """
186   for s in PL_STDERR_IGNORE_LIST:
187     if s.search(line):
188       return True
189   return False
190
191
192 def config_stdout_handler(line, conf_info):
193   """
194   Processes each line of output from GOAT
195   and stores useful information using the logging module
196
197   Loads useful information into conf_info as well, for future
198   use outside the function.
199
200   returns True if found condition that signifies success.
201   """
202
203   # Skip irrelevant line (without logging)
204   if s_skip.search(line):
205     pass
206
207   # Detect invalid command-line arguments
208   elif s_invalid_cmdline.search(line):
209     logging.error("Invalid commandline options!")
210
211   # Detect starting of configuration
212   elif s_start.search(line):
213     logging.info('START: Configuring pipeline')
214
215   # Detect it made it past invalid arguments
216   elif s_gerald.search(line):
217     logging.info('Running make now')
218
219   # Detect that make files have been generated (based on output)
220   elif s_generating.search(line):
221     logging.info('Make files generted')
222     return True
223
224   # Capture run directory
225   elif s_seq_folder.search(line):
226     mo = s_seq_folder_sub.search(line)
227     #Output changed when using --tiles=<tiles>
228     # at least in pipeline v0.3.0b2
229     if mo:
230       firecrest_bustard_gerald_makefile = line[mo.end():]
231       firecrest_bustard_gerald, junk = \
232                                 os.path.split(firecrest_bustard_gerald_makefile)
233       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
234       firecrest, junk = os.path.split(firecrest_bustard)
235
236       conf_info.bustard_path = firecrest_bustard
237       conf_info.run_path = firecrest
238     
239     #Standard output handling
240     else:
241       print 'Sequence line:', line
242       mo = s_seq_folder.search(line)
243       conf_info.bustard_path = line[mo.end():]
244       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
245
246   # Log all other output for debugging purposes
247   else:
248     logging.warning('CONF:?: %s' % (line))
249
250   return False
251
252
253
254 def config_stderr_handler(line, conf_info):
255   """
256   Processes each line of output from GOAT
257   and stores useful information using the logging module
258
259   Loads useful information into conf_info as well, for future
260   use outside the function.
261
262   returns RUN_ABORT upon detecting failure;
263           True on success message;
264           False if neutral message
265             (i.e. doesn't signify failure or success)
266   """
267   global SUPPRESS_MISSING_CYCLES
268
269   # Detect invalid species directory error
270   if s_species_dir_err.search(line):
271     logging.error(line)
272     return RUN_ABORT
273   # Detect goat_pipeline.py traceback
274   elif s_goat_traceb.search(line):
275     logging.error("Goat config script died, traceback in debug output")
276     return RUN_ABORT
277   # Detect indication of successful configuration (from stderr; odd, but ok)
278   elif s_stderr_taskcomplete.search(line):
279     logging.info('Configure step successful (from: stderr)')
280     return True
281   # Detect missing cycles
282   elif s_missing_cycles.search(line):
283
284     # Only display error once
285     if not SUPPRESS_MISSING_CYCLES:
286       logging.error("Missing cycles detected; Not all cycles copied?")
287       logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
288       SUPPRESS_MISSING_CYCLES = True
289     return RUN_ABORT
290   
291   # Log all other output as debug output
292   else:
293     logging.debug('CONF:STDERR:?: %s' % (line))
294
295   # Neutral (not failure; nor success)
296   return False
297
298
299 #def pipeline_stdout_handler(line, conf_info):
300 #  """
301 #  Processes each line of output from running the pipeline
302 #  and stores useful information using the logging module
303 #
304 #  Loads useful information into conf_info as well, for future
305 #  use outside the function.
306 #
307 #  returns True if found condition that signifies success.
308 #  """
309 #
310 #  #f.write(line + '\n')
311 #
312 #  return True
313
314
315
316 def pipeline_stderr_handler(line, conf_info):
317   """
318   Processes each line of stderr from pipelien run
319   and stores useful information using the logging module
320
321   ##FIXME: Future feature (doesn't actually do this yet)
322   #Loads useful information into conf_info as well, for future
323   #use outside the function.
324
325   returns RUN_FAILED upon detecting failure;
326           #True on success message; (no clear success state)
327           False if neutral message
328             (i.e. doesn't signify failure or success)
329   """
330
331   if pl_stderr_ignore(line):
332     pass
333   elif s_make_error.search(line):
334     logging.error("make error detected; run failed")
335     return RUN_FAILED
336   elif s_no_gnuplot.search(line):
337     logging.error("gnuplot not found")
338     return RUN_FAILED
339   elif s_no_convert.search(line):
340     logging.error("imagemagick's convert command not found")
341     return RUN_FAILED
342   elif s_no_ghostscript.search(line):
343     logging.error("ghostscript not found")
344     return RUN_FAILED
345   else:
346     logging.debug('PIPE:STDERR:?: %s' % (line))
347
348   return False
349
350
351 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
352   """
353   Gets the config file from server...
354   requires config file in:
355     /etc/ga_frontend/ga_frontend.conf
356    or
357     ~/.ga_frontend.conf
358
359   with:
360   [config_file_server]
361   base_host_url: http://host:port
362
363   return True if successful, False is failure
364   """
365   options = getCombinedOptions()
366
367   if options.url is None:
368     logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \
369                   " missing base_host_url option")
370     return False
371
372   try:
373     saveConfigFile(flowcell, options.url, cfg_filepath)
374     conf_info.config_filepath = cfg_filepath
375   except FlowCellNotFound, e:
376     logging.error(e)
377     return False
378   except WebError404, e:
379     logging.error(e)
380     return False
381   except IOError, e:
382     logging.error(e)
383     return False
384   except Exception, e:
385     logging.error(e)
386     return False
387
388   f = open(cfg_filepath, 'r')
389   data = f.read()
390   f.close()
391
392   genome_dict = getAvailableGenomes(genome_dir)
393   mapper_dict = constructMapperDict(genome_dict)
394
395   logging.debug(data)
396
397   f = open(cfg_filepath, 'w')
398   f.write(data % (mapper_dict))
399   f.close()
400   
401   return True
402   
403
404
405 def configure(conf_info):
406   """
407   Attempts to configure the GA pipeline using goat.
408
409   Uses logging module to store information about status.
410
411   returns True if configuration successful, otherwise False.
412   """
413   #ERROR Test:
414   #pipe = subprocess.Popen(['goat_pipeline.py',
415   #                         '--GERALD=config32bk.txt',
416   #                         '--make .',],
417   #                         #'.'],
418   #                        stdout=subprocess.PIPE,
419   #                        stderr=subprocess.PIPE)
420
421   #ERROR Test (2), causes goat_pipeline.py traceback
422   #pipe = subprocess.Popen(['goat_pipeline.py',
423   #                  '--GERALD=%s' % (conf_info.config_filepath),
424   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
425   #                         '--make',
426   #                         '.'],
427   #                        stdout=subprocess.PIPE,
428   #                        stderr=subprocess.PIPE)
429
430   ##########################
431   # Run configuration step
432   #   Not a test; actual configure attempt.
433   #pipe = subprocess.Popen(['goat_pipeline.py',
434   #                  '--GERALD=%s' % (conf_info.config_filepath),
435   #                         '--make',
436   #                         '.'],
437   #                        stdout=subprocess.PIPE,
438   #                        stderr=subprocess.PIPE)
439
440
441   stdout_filepath = os.path.join(conf_info.analysis_dir,
442                                  "pipeline_configure_stdout.txt")
443   stderr_filepath = os.path.join(conf_info.analysis_dir,
444                                  "pipeline_configure_stderr.txt")
445
446   fout = open(stdout_filepath, 'w')
447   ferr = open(stderr_filepath, 'w')
448   
449   pipe = subprocess.Popen(['goat_pipeline.py',
450                            '--GERALD=%s' % (conf_info.config_filepath),
451                            '--make',
452                            conf_info.analysis_dir],
453                            stdout=fout,
454                            stderr=ferr)
455
456   print "Configuring pipeline: %s" % (time.ctime())
457   error_code = pipe.wait()
458
459   # Clean up
460   fout.close()
461   ferr.close()
462   
463   
464   ##################
465   # Process stdout
466   fout = open(stdout_filepath, 'r')
467   
468   stdout_line = fout.readline()
469
470   complete = False
471   while stdout_line != '':
472     # Handle stdout
473     if config_stdout_handler(stdout_line, conf_info):
474       complete = True
475     stdout_line = fout.readline()
476
477   fout.close()
478
479
480   #error_code = pipe.wait()
481   if error_code:
482     logging.error('Recieved error_code: %s' % (error_code))
483   else:
484     logging.info('We are go for launch!')
485
486   #Process stderr
487   ferr = open(stderr_filepath, 'r')
488   stderr_line = ferr.readline()
489
490   abort = 'NO!'
491   stderr_success = False
492   while stderr_line != '':
493     stderr_status = config_stderr_handler(stderr_line, conf_info)
494     if stderr_status == RUN_ABORT:
495       abort = RUN_ABORT
496     elif stderr_status is True:
497       stderr_success = True
498     stderr_line = ferr.readline()
499
500   ferr.close()
501
502
503   #Success requirements:
504   # 1) The stdout completed without error
505   # 2) The program exited with status 0
506   # 3) No errors found in stdout
507   print '#Expect: True, False, True, True'
508   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
509   status = complete is True and \
510            bool(error_code) is False and \
511            abort != RUN_ABORT and \
512            stderr_success is True
513
514   # If everything was successful, but for some reason
515   #  we didn't retrieve the path info, log it.
516   if status is True:
517     if conf_info.bustard_path is None or conf_info.run_path is None:
518       logging.error("Failed to retrieve run_path")
519       return False
520   
521   return status
522
523
524 def run_pipeline(conf_info):
525   """
526   Run the pipeline and monitor status.
527   """
528   # Fail if the run_path doesn't actually exist
529   if not os.path.exists(conf_info.run_path):
530     logging.error('Run path does not exist: %s' \
531               % (conf_info.run_path))
532     return False
533
534   # Change cwd to run_path
535   stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
536   stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
537
538   # Create status object
539   conf_info.createStatusObject()
540
541   # Monitor file creation
542   wm = WatchManager()
543   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
544   event = RunEvent(conf_info)
545   notifier = ThreadedNotifier(wm, event)
546   notifier.start()
547   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
548
549   # Log pipeline starting
550   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
551   
552   # Start the pipeline (and hide!)
553   #pipe = subprocess.Popen(['make',
554   #                         '-j8',
555   #                         'recursive'],
556   #                        stdout=subprocess.PIPE,
557   #                        stderr=subprocess.PIPE)
558
559   fout = open(stdout_filepath, 'w')
560   ferr = open(stderr_filepath, 'w')
561
562   pipe = subprocess.Popen(['make',
563                            '--directory=%s' % (conf_info.run_path),
564                            '-j8',
565                            'recursive'],
566                            stdout=fout,
567                            stderr=ferr)
568                            #shell=True)
569   # Wait for run to finish
570   retcode = pipe.wait()
571
572
573   # Clean up
574   notifier.stop()
575   fout.close()
576   ferr.close()
577
578   # Process stderr
579   ferr = open(stderr_filepath, 'r')
580
581   run_failed_stderr = False
582   for line in ferr:
583     err_status = pipeline_stderr_handler(line, conf_info)
584     if err_status == RUN_FAILED:
585       run_failed_stderr = True
586
587   ferr.close()
588
589   # Finished file check!
590   print 'RUN SUCCESS CHECK:'
591   for key, value in event.run_status_dict.items():
592     print '  %s: %s' % (key, value)
593
594   dstatus = event.run_status_dict
595
596   # Success or failure check
597   status = (retcode == 0) and \
598            run_failed_stderr is False and \
599            dstatus['firecrest'] is True and \
600            dstatus['bustard'] is True and \
601            dstatus['gerald'] is True
602
603   return status
604
605