Initial port to python3
[htsworkflow.git] / htsworkflow / pipelines / configure_run.py
1 #!/usr/bin/python
2 __docformat__ = "restructuredtext en"
3
4 import subprocess
5 import logging
6 import time
7 import re
8 import os
9
10 from htsworkflow.pipelines.retrieve_config import \
11      CONFIG_SYSTEM, CONFIG_USER, \
12      FlowCellNotFound, getCombinedOptions, saveConfigFile, WebError404
13 from htsworkflow.pipelines.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
14 from htsworkflow.pipelines.run_status import GARunStatus
15
16 from pyinotify import WatchManager, ThreadedNotifier
17 from pyinotify import EventsCodes, ProcessEvent
18
19 LOGGER = logging.getLogger(__name__)
20
21 class ConfigInfo:
22
23   def __init__(self):
24     #run_path = firecrest analysis directory to run analysis from
25     self.run_path = None
26     self.bustard_path = None
27     self.config_filepath = None
28     self.status = None
29
30     #top level directory where all analyses are placed
31     self.base_analysis_dir = None
32     #analysis_dir, top level analysis dir...
33     # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
34     self.analysis_dir = None
35
36
37   def createStatusObject(self):
38     """
39     Creates a status object which can be queried for
40     status of running the pipeline
41
42     returns True if object created
43     returns False if object cannot be created
44     """
45     if self.config_filepath is None:
46       return False
47
48     self.status = GARunStatus(self.config_filepath)
49     return True
50
51
52
53 ####################################
54 # inotify event processor
55
56 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
57 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
58 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
59
60 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
61 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
62 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
63
64 class RunEvent(ProcessEvent):
65
66   def __init__(self, conf_info):
67
68     self.run_status_dict = {'firecrest': False,
69                             'bustard': False,
70                             'gerald': False}
71
72     self._ci = conf_info
73
74     ProcessEvent.__init__(self)
75
76
77   def process_IN_CREATE(self, event):
78     fullpath = os.path.join(event.path, event.name)
79     if s_finished.search(fullpath):
80       LOGGER.info("File Found: %s" % (fullpath))
81
82       if s_firecrest_finished.search(fullpath):
83         self.run_status_dict['firecrest'] = True
84         self._ci.status.updateFirecrest(event.name)
85       elif s_bustard_finished.search(fullpath):
86         self.run_status_dict['bustard'] = True
87         self._ci.status.updateBustard(event.name)
88       elif s_gerald_finished.search(fullpath):
89         self.run_status_dict['gerald'] = True
90         self._ci.status.updateGerald(event.name)
91
92     #WARNING: The following order is important!!
93     # Firecrest regex will catch all gerald, bustard, and firecrest
94     # Bustard regex will catch all gerald and bustard
95     # Gerald regex will catch all gerald
96     # So, order needs to be Gerald, Bustard, Firecrest, or this
97     #  won't work properly.
98     elif s_gerald_all.search(fullpath):
99       self._ci.status.updateGerald(event.name)
100     elif s_bustard_all.search(fullpath):
101       self._ci.status.updateBustard(event.name)
102     elif s_firecrest_all.search(fullpath):
103       self._ci.status.updateFirecrest(event.name)
104
105     #print "Create: %s" % (os.path.join(event.path, event.name))
106
107   def process_IN_DELETE(self, event):
108     #print "Remove %s" % (os.path.join(event.path, event.name))
109     pass
110
111
112
113
114 #FLAGS
115 # Config Step Error
116 RUN_ABORT = 'abort'
117 # Run Step Error
118 RUN_FAILED = 'failed'
119
120
121 #####################################
122 # Configure Step (goat_pipeline.py)
123 #Info
124 s_start = re.compile('Starting Genome Analyzer Pipeline')
125 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
126 s_generating = re.compile('^Generating journals, Makefiles')
127 s_seq_folder = re.compile('^Sequence folder: ')
128 s_seq_folder_sub = re.compile('want to make ')
129 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
130
131 #Errors
132 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
133 s_species_dir_err = re.compile('Error: Lane [1-8]:')
134 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
135 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
136
137 SUPPRESS_MISSING_CYCLES = False
138
139
140 ##Ignore - Example of out above each ignore regex.
141 #NOTE: Commenting out an ignore will cause it to be
142 # logged as DEBUG with the logging module.
143 #CF_STDERR_IGNORE_LIST = []
144 s_skip = re.compile('s_[0-8]_[0-9]+')
145
146
147 ##########################################
148 # Pipeline Run Step (make -j8 recursive)
149
150 ##Info
151 s_finished = re.compile('finished')
152
153 ##Errors
154 s_make_error = re.compile('^make[\S\s]+Error')
155 s_no_gnuplot = re.compile('gnuplot: command not found')
156 s_no_convert = re.compile('^Can\'t exec "convert"')
157 s_no_ghostscript = re.compile('gs: command not found')
158
159 ##Ignore - Example of out above each ignore regex.
160 #NOTE: Commenting out an ignore will cause it to be
161 # logged as DEBUG with the logging module.
162 #
163 PL_STDERR_IGNORE_LIST = []
164 # Info: PF 11802
165 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
166 # About to analyse intensity file s_4_0101_sig2.txt
167 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
168 # Will send output to standard output
169 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
170 # Found 31877 clusters
171 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
172 # Will use quality criterion ((CHASTITY>=0.6)
173 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
174 # Quality criterion translated to (($F[5]>=0.6))
175 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
176 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
177 #  AND
178 # opened s_4_0103_qhg.txt
179 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
180 # 81129 sequences out of 157651 passed filter criteria
181 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
182
183
184 def pl_stderr_ignore(line):
185   """
186   Searches lines for lines to ignore (i.e. not to log)
187
188   returns True if line should be ignored
189   returns False if line should NOT be ignored
190   """
191   for s in PL_STDERR_IGNORE_LIST:
192     if s.search(line):
193       return True
194   return False
195
196
197 def config_stdout_handler(line, conf_info):
198   """
199   Processes each line of output from GOAT
200   and stores useful information using the logging module
201
202   Loads useful information into conf_info as well, for future
203   use outside the function.
204
205   returns True if found condition that signifies success.
206   """
207
208   # Skip irrelevant line (without logging)
209   if s_skip.search(line):
210     pass
211
212   # Detect invalid command-line arguments
213   elif s_invalid_cmdline.search(line):
214     LOGGER.error("Invalid commandline options!")
215
216   # Detect starting of configuration
217   elif s_start.search(line):
218     LOGGER.info('START: Configuring pipeline')
219
220   # Detect it made it past invalid arguments
221   elif s_gerald.search(line):
222     LOGGER.info('Running make now')
223
224   # Detect that make files have been generated (based on output)
225   elif s_generating.search(line):
226     LOGGER.info('Make files generted')
227     return True
228
229   # Capture run directory
230   elif s_seq_folder.search(line):
231     mo = s_seq_folder_sub.search(line)
232     #Output changed when using --tiles=<tiles>
233     # at least in pipeline v0.3.0b2
234     if mo:
235       firecrest_bustard_gerald_makefile = line[mo.end():]
236       firecrest_bustard_gerald, junk = \
237                                 os.path.split(firecrest_bustard_gerald_makefile)
238       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
239       firecrest, junk = os.path.split(firecrest_bustard)
240
241       conf_info.bustard_path = firecrest_bustard
242       conf_info.run_path = firecrest
243
244     #Standard output handling
245     else:
246       print('Sequence line:', line)
247       mo = s_seq_folder.search(line)
248       conf_info.bustard_path = line[mo.end():]
249       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
250
251   # Log all other output for debugging purposes
252   else:
253     LOGGER.warning('CONF:?: %s' % (line))
254
255   return False
256
257
258
259 def config_stderr_handler(line, conf_info):
260   """
261   Processes each line of output from GOAT
262   and stores useful information using the logging module
263
264   Loads useful information into conf_info as well, for future
265   use outside the function.
266
267   returns RUN_ABORT upon detecting failure;
268           True on success message;
269           False if neutral message
270             (i.e. doesn't signify failure or success)
271   """
272   global SUPPRESS_MISSING_CYCLES
273
274   # Detect invalid species directory error
275   if s_species_dir_err.search(line):
276     LOGGER.error(line)
277     return RUN_ABORT
278   # Detect goat_pipeline.py traceback
279   elif s_goat_traceb.search(line):
280     LOGGER.error("Goat config script died, traceback in debug output")
281     return RUN_ABORT
282   # Detect indication of successful configuration (from stderr; odd, but ok)
283   elif s_stderr_taskcomplete.search(line):
284     LOGGER.info('Configure step successful (from: stderr)')
285     return True
286   # Detect missing cycles
287   elif s_missing_cycles.search(line):
288
289     # Only display error once
290     if not SUPPRESS_MISSING_CYCLES:
291       LOGGER.error("Missing cycles detected; Not all cycles copied?")
292       LOGGER.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
293       SUPPRESS_MISSING_CYCLES = True
294     return RUN_ABORT
295
296   # Log all other output as debug output
297   else:
298     LOGGER.debug('CONF:STDERR:?: %s' % (line))
299
300   # Neutral (not failure; nor success)
301   return False
302
303
304 #def pipeline_stdout_handler(line, conf_info):
305 #  """
306 #  Processes each line of output from running the pipeline
307 #  and stores useful information using the logging module
308 #
309 #  Loads useful information into conf_info as well, for future
310 #  use outside the function.
311 #
312 #  returns True if found condition that signifies success.
313 #  """
314 #
315 #  #f.write(line + '\n')
316 #
317 #  return True
318
319
320
321 def pipeline_stderr_handler(line, conf_info):
322   """
323   Processes each line of stderr from pipelien run
324   and stores useful information using the logging module
325
326   ##FIXME: Future feature (doesn't actually do this yet)
327   #Loads useful information into conf_info as well, for future
328   #use outside the function.
329
330   returns RUN_FAILED upon detecting failure;
331           #True on success message; (no clear success state)
332           False if neutral message
333             (i.e. doesn't signify failure or success)
334   """
335
336   if pl_stderr_ignore(line):
337     pass
338   elif s_make_error.search(line):
339     LOGGER.error("make error detected; run failed")
340     return RUN_FAILED
341   elif s_no_gnuplot.search(line):
342     LOGGER.error("gnuplot not found")
343     return RUN_FAILED
344   elif s_no_convert.search(line):
345     LOGGER.error("imagemagick's convert command not found")
346     return RUN_FAILED
347   elif s_no_ghostscript.search(line):
348     LOGGER.error("ghostscript not found")
349     return RUN_FAILED
350   else:
351     LOGGER.debug('PIPE:STDERR:?: %s' % (line))
352
353   return False
354
355
356 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
357   """
358   Gets the config file from server...
359   requires config file in:
360     /etc/ga_frontend/ga_frontend.conf
361    or
362     ~/.ga_frontend.conf
363
364   with:
365   [config_file_server]
366   base_host_url: http://host:port
367
368   return True if successful, False is failure
369   """
370   options = getCombinedOptions()
371
372   if options.url is None:
373     LOGGER.error("%s or %s missing base_host_url option" % \
374                   (CONFIG_USER, CONFIG_SYSTEM))
375     return False
376
377   try:
378     saveConfigFile(flowcell, options.url, cfg_filepath)
379     conf_info.config_filepath = cfg_filepath
380   except FlowCellNotFound as e:
381     LOGGER.error(e)
382     return False
383   except WebError404 as e:
384     LOGGER.error(e)
385     return False
386   except IOError as e:
387     LOGGER.error(e)
388     return False
389   except Exception as e:
390     LOGGER.error(e)
391     return False
392
393   f = open(cfg_filepath, 'r')
394   data = f.read()
395   f.close()
396
397   genome_dict = getAvailableGenomes(genome_dir)
398   mapper_dict = constructMapperDict(genome_dict)
399
400   LOGGER.debug(data)
401
402   f = open(cfg_filepath, 'w')
403   f.write(data % (mapper_dict))
404   f.close()
405
406   return True
407
408
409
410 def configure(conf_info):
411   """
412   Attempts to configure the GA pipeline using goat.
413
414   Uses logging module to store information about status.
415
416   returns True if configuration successful, otherwise False.
417   """
418   #ERROR Test:
419   #pipe = subprocess.Popen(['goat_pipeline.py',
420   #                         '--GERALD=config32bk.txt',
421   #                         '--make .',],
422   #                         #'.'],
423   #                        stdout=subprocess.PIPE,
424   #                        stderr=subprocess.PIPE)
425
426   #ERROR Test (2), causes goat_pipeline.py traceback
427   #pipe = subprocess.Popen(['goat_pipeline.py',
428   #                  '--GERALD=%s' % (conf_info.config_filepath),
429   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
430   #                         '--make',
431   #                         '.'],
432   #                        stdout=subprocess.PIPE,
433   #                        stderr=subprocess.PIPE)
434
435   ##########################
436   # Run configuration step
437   #   Not a test; actual configure attempt.
438   #pipe = subprocess.Popen(['goat_pipeline.py',
439   #                  '--GERALD=%s' % (conf_info.config_filepath),
440   #                         '--make',
441   #                         '.'],
442   #                        stdout=subprocess.PIPE,
443   #                        stderr=subprocess.PIPE)
444
445
446   stdout_filepath = os.path.join(conf_info.analysis_dir,
447                                  "pipeline_configure_stdout.txt")
448   stderr_filepath = os.path.join(conf_info.analysis_dir,
449                                  "pipeline_configure_stderr.txt")
450
451   fout = open(stdout_filepath, 'w')
452   ferr = open(stderr_filepath, 'w')
453
454   pipe = subprocess.Popen(['goat_pipeline.py',
455                            '--GERALD=%s' % (conf_info.config_filepath),
456                            '--make',
457                            conf_info.analysis_dir],
458                            stdout=fout,
459                            stderr=ferr)
460
461   print("Configuring pipeline: %s" % (time.ctime()))
462   error_code = pipe.wait()
463
464   # Clean up
465   fout.close()
466   ferr.close()
467
468
469   ##################
470   # Process stdout
471   fout = open(stdout_filepath, 'r')
472
473   stdout_line = fout.readline()
474
475   complete = False
476   while stdout_line != '':
477     # Handle stdout
478     if config_stdout_handler(stdout_line, conf_info):
479       complete = True
480     stdout_line = fout.readline()
481
482   fout.close()
483
484
485   #error_code = pipe.wait()
486   if error_code:
487     LOGGER.error('Recieved error_code: %s' % (error_code))
488   else:
489     LOGGER.info('We are go for launch!')
490
491   #Process stderr
492   ferr = open(stderr_filepath, 'r')
493   stderr_line = ferr.readline()
494
495   abort = 'NO!'
496   stderr_success = False
497   while stderr_line != '':
498     stderr_status = config_stderr_handler(stderr_line, conf_info)
499     if stderr_status == RUN_ABORT:
500       abort = RUN_ABORT
501     elif stderr_status is True:
502       stderr_success = True
503     stderr_line = ferr.readline()
504
505   ferr.close()
506
507
508   #Success requirements:
509   # 1) The stdout completed without error
510   # 2) The program exited with status 0
511   # 3) No errors found in stdout
512   print('#Expect: True, False, True, True')
513   print(complete, bool(error_code), abort != RUN_ABORT, stderr_success is True)
514   status = complete is True and \
515            bool(error_code) is False and \
516            abort != RUN_ABORT and \
517            stderr_success is True
518
519   # If everything was successful, but for some reason
520   #  we didn't retrieve the path info, log it.
521   if status is True:
522     if conf_info.bustard_path is None or conf_info.run_path is None:
523       LOGGER.error("Failed to retrieve run_path")
524       return False
525
526   return status
527
528
529 def run_pipeline(conf_info):
530   """
531   Run the pipeline and monitor status.
532   """
533   # Fail if the run_path doesn't actually exist
534   if not os.path.exists(conf_info.run_path):
535     LOGGER.error('Run path does not exist: %s' \
536               % (conf_info.run_path))
537     return False
538
539   # Change cwd to run_path
540   stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
541   stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
542
543   # Create status object
544   conf_info.createStatusObject()
545
546   # Monitor file creation
547   wm = WatchManager()
548   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
549   event = RunEvent(conf_info)
550   notifier = ThreadedNotifier(wm, event)
551   notifier.start()
552   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
553
554   # Log pipeline starting
555   LOGGER.info('STARTING PIPELINE @ %s' % (time.ctime()))
556
557   # Start the pipeline (and hide!)
558   #pipe = subprocess.Popen(['make',
559   #                         '-j8',
560   #                         'recursive'],
561   #                        stdout=subprocess.PIPE,
562   #                        stderr=subprocess.PIPE)
563
564   fout = open(stdout_filepath, 'w')
565   ferr = open(stderr_filepath, 'w')
566
567   pipe = subprocess.Popen(['make',
568                            '--directory=%s' % (conf_info.run_path),
569                            '-j8',
570                            'recursive'],
571                            stdout=fout,
572                            stderr=ferr)
573                            #shell=True)
574   # Wait for run to finish
575   retcode = pipe.wait()
576
577
578   # Clean up
579   notifier.stop()
580   fout.close()
581   ferr.close()
582
583   # Process stderr
584   ferr = open(stderr_filepath, 'r')
585
586   run_failed_stderr = False
587   for line in ferr:
588     err_status = pipeline_stderr_handler(line, conf_info)
589     if err_status == RUN_FAILED:
590       run_failed_stderr = True
591
592   ferr.close()
593
594   # Finished file check!
595   print('RUN SUCCESS CHECK:')
596   for key, value in list(event.run_status_dict.items()):
597     print('  %s: %s' % (key, value))
598
599   dstatus = event.run_status_dict
600
601   # Success or failure check
602   status = (retcode == 0) and \
603            run_failed_stderr is False and \
604            dstatus['firecrest'] is True and \
605            dstatus['bustard'] is True and \
606            dstatus['gerald'] is True
607
608   return status
609
610