convert several not covered by unit-test modules to use print function
[htsworkflow.git] / htsworkflow / pipelines / configure_run.py
1 #!/usr/bin/python
2 from __future__ import print_function
3
4 __docformat__ = "restructuredtext en"
5
6 import subprocess
7 import logging
8 import time
9 import re
10 import os
11
12 from htsworkflow.pipelines.retrieve_config import \
13      CONFIG_SYSTEM, CONFIG_USER, \
14      FlowCellNotFound, getCombinedOptions, saveConfigFile, WebError404
15 from htsworkflow.pipelines.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
16 from htsworkflow.pipelines.run_status import GARunStatus
17
18 from pyinotify import WatchManager, ThreadedNotifier
19 from pyinotify import EventsCodes, ProcessEvent
20
21 LOGGER = logging.getLogger(__name__)
22
23 class ConfigInfo:
24
25   def __init__(self):
26     #run_path = firecrest analysis directory to run analysis from
27     self.run_path = None
28     self.bustard_path = None
29     self.config_filepath = None
30     self.status = None
31
32     #top level directory where all analyses are placed
33     self.base_analysis_dir = None
34     #analysis_dir, top level analysis dir...
35     # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
36     self.analysis_dir = None
37
38
39   def createStatusObject(self):
40     """
41     Creates a status object which can be queried for
42     status of running the pipeline
43
44     returns True if object created
45     returns False if object cannot be created
46     """
47     if self.config_filepath is None:
48       return False
49
50     self.status = GARunStatus(self.config_filepath)
51     return True
52
53
54
55 ####################################
56 # inotify event processor
57
58 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
59 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
60 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
61
62 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
63 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
64 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
65
66 class RunEvent(ProcessEvent):
67
68   def __init__(self, conf_info):
69
70     self.run_status_dict = {'firecrest': False,
71                             'bustard': False,
72                             'gerald': False}
73
74     self._ci = conf_info
75
76     ProcessEvent.__init__(self)
77
78
79   def process_IN_CREATE(self, event):
80     fullpath = os.path.join(event.path, event.name)
81     if s_finished.search(fullpath):
82       LOGGER.info("File Found: %s" % (fullpath))
83
84       if s_firecrest_finished.search(fullpath):
85         self.run_status_dict['firecrest'] = True
86         self._ci.status.updateFirecrest(event.name)
87       elif s_bustard_finished.search(fullpath):
88         self.run_status_dict['bustard'] = True
89         self._ci.status.updateBustard(event.name)
90       elif s_gerald_finished.search(fullpath):
91         self.run_status_dict['gerald'] = True
92         self._ci.status.updateGerald(event.name)
93
94     #WARNING: The following order is important!!
95     # Firecrest regex will catch all gerald, bustard, and firecrest
96     # Bustard regex will catch all gerald and bustard
97     # Gerald regex will catch all gerald
98     # So, order needs to be Gerald, Bustard, Firecrest, or this
99     #  won't work properly.
100     elif s_gerald_all.search(fullpath):
101       self._ci.status.updateGerald(event.name)
102     elif s_bustard_all.search(fullpath):
103       self._ci.status.updateBustard(event.name)
104     elif s_firecrest_all.search(fullpath):
105       self._ci.status.updateFirecrest(event.name)
106
107     #print "Create: %s" % (os.path.join(event.path, event.name))
108
109   def process_IN_DELETE(self, event):
110     #print "Remove %s" % (os.path.join(event.path, event.name))
111     pass
112
113
114
115
116 #FLAGS
117 # Config Step Error
118 RUN_ABORT = 'abort'
119 # Run Step Error
120 RUN_FAILED = 'failed'
121
122
123 #####################################
124 # Configure Step (goat_pipeline.py)
125 #Info
126 s_start = re.compile('Starting Genome Analyzer Pipeline')
127 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
128 s_generating = re.compile('^Generating journals, Makefiles')
129 s_seq_folder = re.compile('^Sequence folder: ')
130 s_seq_folder_sub = re.compile('want to make ')
131 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
132
133 #Errors
134 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
135 s_species_dir_err = re.compile('Error: Lane [1-8]:')
136 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
137 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
138
139 SUPPRESS_MISSING_CYCLES = False
140
141
142 ##Ignore - Example of out above each ignore regex.
143 #NOTE: Commenting out an ignore will cause it to be
144 # logged as DEBUG with the logging module.
145 #CF_STDERR_IGNORE_LIST = []
146 s_skip = re.compile('s_[0-8]_[0-9]+')
147
148
149 ##########################################
150 # Pipeline Run Step (make -j8 recursive)
151
152 ##Info
153 s_finished = re.compile('finished')
154
155 ##Errors
156 s_make_error = re.compile('^make[\S\s]+Error')
157 s_no_gnuplot = re.compile('gnuplot: command not found')
158 s_no_convert = re.compile('^Can\'t exec "convert"')
159 s_no_ghostscript = re.compile('gs: command not found')
160
161 ##Ignore - Example of out above each ignore regex.
162 #NOTE: Commenting out an ignore will cause it to be
163 # logged as DEBUG with the logging module.
164 #
165 PL_STDERR_IGNORE_LIST = []
166 # Info: PF 11802
167 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
168 # About to analyse intensity file s_4_0101_sig2.txt
169 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
170 # Will send output to standard output
171 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
172 # Found 31877 clusters
173 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
174 # Will use quality criterion ((CHASTITY>=0.6)
175 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
176 # Quality criterion translated to (($F[5]>=0.6))
177 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
178 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
179 #  AND
180 # opened s_4_0103_qhg.txt
181 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
182 # 81129 sequences out of 157651 passed filter criteria
183 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
184
185
186 def pl_stderr_ignore(line):
187   """
188   Searches lines for lines to ignore (i.e. not to log)
189
190   returns True if line should be ignored
191   returns False if line should NOT be ignored
192   """
193   for s in PL_STDERR_IGNORE_LIST:
194     if s.search(line):
195       return True
196   return False
197
198
199 def config_stdout_handler(line, conf_info):
200   """
201   Processes each line of output from GOAT
202   and stores useful information using the logging module
203
204   Loads useful information into conf_info as well, for future
205   use outside the function.
206
207   returns True if found condition that signifies success.
208   """
209
210   # Skip irrelevant line (without logging)
211   if s_skip.search(line):
212     pass
213
214   # Detect invalid command-line arguments
215   elif s_invalid_cmdline.search(line):
216     LOGGER.error("Invalid commandline options!")
217
218   # Detect starting of configuration
219   elif s_start.search(line):
220     LOGGER.info('START: Configuring pipeline')
221
222   # Detect it made it past invalid arguments
223   elif s_gerald.search(line):
224     LOGGER.info('Running make now')
225
226   # Detect that make files have been generated (based on output)
227   elif s_generating.search(line):
228     LOGGER.info('Make files generted')
229     return True
230
231   # Capture run directory
232   elif s_seq_folder.search(line):
233     mo = s_seq_folder_sub.search(line)
234     #Output changed when using --tiles=<tiles>
235     # at least in pipeline v0.3.0b2
236     if mo:
237       firecrest_bustard_gerald_makefile = line[mo.end():]
238       firecrest_bustard_gerald, junk = \
239                                 os.path.split(firecrest_bustard_gerald_makefile)
240       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
241       firecrest, junk = os.path.split(firecrest_bustard)
242
243       conf_info.bustard_path = firecrest_bustard
244       conf_info.run_path = firecrest
245
246     #Standard output handling
247     else:
248       print('Sequence line:', line)
249       mo = s_seq_folder.search(line)
250       conf_info.bustard_path = line[mo.end():]
251       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
252
253   # Log all other output for debugging purposes
254   else:
255     LOGGER.warning('CONF:?: %s' % (line))
256
257   return False
258
259
260
261 def config_stderr_handler(line, conf_info):
262   """
263   Processes each line of output from GOAT
264   and stores useful information using the logging module
265
266   Loads useful information into conf_info as well, for future
267   use outside the function.
268
269   returns RUN_ABORT upon detecting failure;
270           True on success message;
271           False if neutral message
272             (i.e. doesn't signify failure or success)
273   """
274   global SUPPRESS_MISSING_CYCLES
275
276   # Detect invalid species directory error
277   if s_species_dir_err.search(line):
278     LOGGER.error(line)
279     return RUN_ABORT
280   # Detect goat_pipeline.py traceback
281   elif s_goat_traceb.search(line):
282     LOGGER.error("Goat config script died, traceback in debug output")
283     return RUN_ABORT
284   # Detect indication of successful configuration (from stderr; odd, but ok)
285   elif s_stderr_taskcomplete.search(line):
286     LOGGER.info('Configure step successful (from: stderr)')
287     return True
288   # Detect missing cycles
289   elif s_missing_cycles.search(line):
290
291     # Only display error once
292     if not SUPPRESS_MISSING_CYCLES:
293       LOGGER.error("Missing cycles detected; Not all cycles copied?")
294       LOGGER.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
295       SUPPRESS_MISSING_CYCLES = True
296     return RUN_ABORT
297
298   # Log all other output as debug output
299   else:
300     LOGGER.debug('CONF:STDERR:?: %s' % (line))
301
302   # Neutral (not failure; nor success)
303   return False
304
305
306 #def pipeline_stdout_handler(line, conf_info):
307 #  """
308 #  Processes each line of output from running the pipeline
309 #  and stores useful information using the logging module
310 #
311 #  Loads useful information into conf_info as well, for future
312 #  use outside the function.
313 #
314 #  returns True if found condition that signifies success.
315 #  """
316 #
317 #  #f.write(line + '\n')
318 #
319 #  return True
320
321
322
323 def pipeline_stderr_handler(line, conf_info):
324   """
325   Processes each line of stderr from pipelien run
326   and stores useful information using the logging module
327
328   ##FIXME: Future feature (doesn't actually do this yet)
329   #Loads useful information into conf_info as well, for future
330   #use outside the function.
331
332   returns RUN_FAILED upon detecting failure;
333           #True on success message; (no clear success state)
334           False if neutral message
335             (i.e. doesn't signify failure or success)
336   """
337
338   if pl_stderr_ignore(line):
339     pass
340   elif s_make_error.search(line):
341     LOGGER.error("make error detected; run failed")
342     return RUN_FAILED
343   elif s_no_gnuplot.search(line):
344     LOGGER.error("gnuplot not found")
345     return RUN_FAILED
346   elif s_no_convert.search(line):
347     LOGGER.error("imagemagick's convert command not found")
348     return RUN_FAILED
349   elif s_no_ghostscript.search(line):
350     LOGGER.error("ghostscript not found")
351     return RUN_FAILED
352   else:
353     LOGGER.debug('PIPE:STDERR:?: %s' % (line))
354
355   return False
356
357
358 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
359   """
360   Gets the config file from server...
361   requires config file in:
362     /etc/ga_frontend/ga_frontend.conf
363    or
364     ~/.ga_frontend.conf
365
366   with:
367   [config_file_server]
368   base_host_url: http://host:port
369
370   return True if successful, False is failure
371   """
372   options = getCombinedOptions()
373
374   if options.url is None:
375     LOGGER.error("%s or %s missing base_host_url option" % \
376                   (CONFIG_USER, CONFIG_SYSTEM))
377     return False
378
379   try:
380     saveConfigFile(flowcell, options.url, cfg_filepath)
381     conf_info.config_filepath = cfg_filepath
382   except FlowCellNotFound as e:
383     LOGGER.error(e)
384     return False
385   except WebError404 as e:
386     LOGGER.error(e)
387     return False
388   except IOError as e:
389     LOGGER.error(e)
390     return False
391   except Exception as e:
392     LOGGER.error(e)
393     return False
394
395   f = open(cfg_filepath, 'r')
396   data = f.read()
397   f.close()
398
399   genome_dict = getAvailableGenomes(genome_dir)
400   mapper_dict = constructMapperDict(genome_dict)
401
402   LOGGER.debug(data)
403
404   f = open(cfg_filepath, 'w')
405   f.write(data % (mapper_dict))
406   f.close()
407
408   return True
409
410
411
412 def configure(conf_info):
413   """
414   Attempts to configure the GA pipeline using goat.
415
416   Uses logging module to store information about status.
417
418   returns True if configuration successful, otherwise False.
419   """
420   #ERROR Test:
421   #pipe = subprocess.Popen(['goat_pipeline.py',
422   #                         '--GERALD=config32bk.txt',
423   #                         '--make .',],
424   #                         #'.'],
425   #                        stdout=subprocess.PIPE,
426   #                        stderr=subprocess.PIPE)
427
428   #ERROR Test (2), causes goat_pipeline.py traceback
429   #pipe = subprocess.Popen(['goat_pipeline.py',
430   #                  '--GERALD=%s' % (conf_info.config_filepath),
431   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
432   #                         '--make',
433   #                         '.'],
434   #                        stdout=subprocess.PIPE,
435   #                        stderr=subprocess.PIPE)
436
437   ##########################
438   # Run configuration step
439   #   Not a test; actual configure attempt.
440   #pipe = subprocess.Popen(['goat_pipeline.py',
441   #                  '--GERALD=%s' % (conf_info.config_filepath),
442   #                         '--make',
443   #                         '.'],
444   #                        stdout=subprocess.PIPE,
445   #                        stderr=subprocess.PIPE)
446
447
448   stdout_filepath = os.path.join(conf_info.analysis_dir,
449                                  "pipeline_configure_stdout.txt")
450   stderr_filepath = os.path.join(conf_info.analysis_dir,
451                                  "pipeline_configure_stderr.txt")
452
453   fout = open(stdout_filepath, 'w')
454   ferr = open(stderr_filepath, 'w')
455
456   pipe = subprocess.Popen(['goat_pipeline.py',
457                            '--GERALD=%s' % (conf_info.config_filepath),
458                            '--make',
459                            conf_info.analysis_dir],
460                            stdout=fout,
461                            stderr=ferr)
462
463   print("Configuring pipeline: %s" % (time.ctime()))
464   error_code = pipe.wait()
465
466   # Clean up
467   fout.close()
468   ferr.close()
469
470
471   ##################
472   # Process stdout
473   fout = open(stdout_filepath, 'r')
474
475   stdout_line = fout.readline()
476
477   complete = False
478   while stdout_line != '':
479     # Handle stdout
480     if config_stdout_handler(stdout_line, conf_info):
481       complete = True
482     stdout_line = fout.readline()
483
484   fout.close()
485
486
487   #error_code = pipe.wait()
488   if error_code:
489     LOGGER.error('Recieved error_code: %s' % (error_code))
490   else:
491     LOGGER.info('We are go for launch!')
492
493   #Process stderr
494   ferr = open(stderr_filepath, 'r')
495   stderr_line = ferr.readline()
496
497   abort = 'NO!'
498   stderr_success = False
499   while stderr_line != '':
500     stderr_status = config_stderr_handler(stderr_line, conf_info)
501     if stderr_status == RUN_ABORT:
502       abort = RUN_ABORT
503     elif stderr_status is True:
504       stderr_success = True
505     stderr_line = ferr.readline()
506
507   ferr.close()
508
509
510   #Success requirements:
511   # 1) The stdout completed without error
512   # 2) The program exited with status 0
513   # 3) No errors found in stdout
514   print('#Expect: True, False, True, True')
515   print(complete, bool(error_code), abort != RUN_ABORT, stderr_success is True)
516   status = complete is True and \
517            bool(error_code) is False and \
518            abort != RUN_ABORT and \
519            stderr_success is True
520
521   # If everything was successful, but for some reason
522   #  we didn't retrieve the path info, log it.
523   if status is True:
524     if conf_info.bustard_path is None or conf_info.run_path is None:
525       LOGGER.error("Failed to retrieve run_path")
526       return False
527
528   return status
529
530
531 def run_pipeline(conf_info):
532   """
533   Run the pipeline and monitor status.
534   """
535   # Fail if the run_path doesn't actually exist
536   if not os.path.exists(conf_info.run_path):
537     LOGGER.error('Run path does not exist: %s' \
538               % (conf_info.run_path))
539     return False
540
541   # Change cwd to run_path
542   stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
543   stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
544
545   # Create status object
546   conf_info.createStatusObject()
547
548   # Monitor file creation
549   wm = WatchManager()
550   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
551   event = RunEvent(conf_info)
552   notifier = ThreadedNotifier(wm, event)
553   notifier.start()
554   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
555
556   # Log pipeline starting
557   LOGGER.info('STARTING PIPELINE @ %s' % (time.ctime()))
558
559   # Start the pipeline (and hide!)
560   #pipe = subprocess.Popen(['make',
561   #                         '-j8',
562   #                         'recursive'],
563   #                        stdout=subprocess.PIPE,
564   #                        stderr=subprocess.PIPE)
565
566   fout = open(stdout_filepath, 'w')
567   ferr = open(stderr_filepath, 'w')
568
569   pipe = subprocess.Popen(['make',
570                            '--directory=%s' % (conf_info.run_path),
571                            '-j8',
572                            'recursive'],
573                            stdout=fout,
574                            stderr=ferr)
575                            #shell=True)
576   # Wait for run to finish
577   retcode = pipe.wait()
578
579
580   # Clean up
581   notifier.stop()
582   fout.close()
583   ferr.close()
584
585   # Process stderr
586   ferr = open(stderr_filepath, 'r')
587
588   run_failed_stderr = False
589   for line in ferr:
590     err_status = pipeline_stderr_handler(line, conf_info)
591     if err_status == RUN_FAILED:
592       run_failed_stderr = True
593
594   ferr.close()
595
596   # Finished file check!
597   print('RUN SUCCESS CHECK:')
598   for key, value in event.run_status_dict.items():
599     print('  %s: %s' % (key, value))
600
601   dstatus = event.run_status_dict
602
603   # Success or failure check
604   status = (retcode == 0) and \
605            run_failed_stderr is False and \
606            dstatus['firecrest'] is True and \
607            dstatus['bustard'] is True and \
608            dstatus['gerald'] is True
609
610   return status
611
612