2 from __future__ import print_function
4 __docformat__ = "restructuredtext en"
12 from htsworkflow.pipelines.retrieve_config import \
13 CONFIG_SYSTEM, CONFIG_USER, \
14 FlowCellNotFound, getCombinedOptions, saveConfigFile, WebError404
15 from htsworkflow.pipelines.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
16 from htsworkflow.pipelines.run_status import GARunStatus
18 from pyinotify import WatchManager, ThreadedNotifier
19 from pyinotify import EventsCodes, ProcessEvent
21 LOGGER = logging.getLogger(__name__)
26 #run_path = firecrest analysis directory to run analysis from
28 self.bustard_path = None
29 self.config_filepath = None
32 #top level directory where all analyses are placed
33 self.base_analysis_dir = None
34 #analysis_dir, top level analysis dir...
35 # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
36 self.analysis_dir = None
39 def createStatusObject(self):
41 Creates a status object which can be queried for
42 status of running the pipeline
44 returns True if object created
45 returns False if object cannot be created
47 if self.config_filepath is None:
50 self.status = GARunStatus(self.config_filepath)
55 ####################################
56 # inotify event processor
58 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
59 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
60 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
62 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
63 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
64 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
66 class RunEvent(ProcessEvent):
68 def __init__(self, conf_info):
70 self.run_status_dict = {'firecrest': False,
76 ProcessEvent.__init__(self)
79 def process_IN_CREATE(self, event):
80 fullpath = os.path.join(event.path, event.name)
81 if s_finished.search(fullpath):
82 LOGGER.info("File Found: %s" % (fullpath))
84 if s_firecrest_finished.search(fullpath):
85 self.run_status_dict['firecrest'] = True
86 self._ci.status.updateFirecrest(event.name)
87 elif s_bustard_finished.search(fullpath):
88 self.run_status_dict['bustard'] = True
89 self._ci.status.updateBustard(event.name)
90 elif s_gerald_finished.search(fullpath):
91 self.run_status_dict['gerald'] = True
92 self._ci.status.updateGerald(event.name)
94 #WARNING: The following order is important!!
95 # Firecrest regex will catch all gerald, bustard, and firecrest
96 # Bustard regex will catch all gerald and bustard
97 # Gerald regex will catch all gerald
98 # So, order needs to be Gerald, Bustard, Firecrest, or this
99 # won't work properly.
100 elif s_gerald_all.search(fullpath):
101 self._ci.status.updateGerald(event.name)
102 elif s_bustard_all.search(fullpath):
103 self._ci.status.updateBustard(event.name)
104 elif s_firecrest_all.search(fullpath):
105 self._ci.status.updateFirecrest(event.name)
107 #print "Create: %s" % (os.path.join(event.path, event.name))
109 def process_IN_DELETE(self, event):
110 #print "Remove %s" % (os.path.join(event.path, event.name))
120 RUN_FAILED = 'failed'
123 #####################################
124 # Configure Step (goat_pipeline.py)
126 s_start = re.compile('Starting Genome Analyzer Pipeline')
127 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
128 s_generating = re.compile('^Generating journals, Makefiles')
129 s_seq_folder = re.compile('^Sequence folder: ')
130 s_seq_folder_sub = re.compile('want to make ')
131 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
134 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
135 s_species_dir_err = re.compile('Error: Lane [1-8]:')
136 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
137 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
139 SUPPRESS_MISSING_CYCLES = False
142 ##Ignore - Example of out above each ignore regex.
143 #NOTE: Commenting out an ignore will cause it to be
144 # logged as DEBUG with the logging module.
145 #CF_STDERR_IGNORE_LIST = []
146 s_skip = re.compile('s_[0-8]_[0-9]+')
149 ##########################################
150 # Pipeline Run Step (make -j8 recursive)
153 s_finished = re.compile('finished')
156 s_make_error = re.compile('^make[\S\s]+Error')
157 s_no_gnuplot = re.compile('gnuplot: command not found')
158 s_no_convert = re.compile('^Can\'t exec "convert"')
159 s_no_ghostscript = re.compile('gs: command not found')
161 ##Ignore - Example of out above each ignore regex.
162 #NOTE: Commenting out an ignore will cause it to be
163 # logged as DEBUG with the logging module.
165 PL_STDERR_IGNORE_LIST = []
167 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
168 # About to analyse intensity file s_4_0101_sig2.txt
169 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
170 # Will send output to standard output
171 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
172 # Found 31877 clusters
173 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
174 # Will use quality criterion ((CHASTITY>=0.6)
175 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
176 # Quality criterion translated to (($F[5]>=0.6))
177 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
178 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
180 # opened s_4_0103_qhg.txt
181 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
182 # 81129 sequences out of 157651 passed filter criteria
183 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
186 def pl_stderr_ignore(line):
188 Searches lines for lines to ignore (i.e. not to log)
190 returns True if line should be ignored
191 returns False if line should NOT be ignored
193 for s in PL_STDERR_IGNORE_LIST:
199 def config_stdout_handler(line, conf_info):
201 Processes each line of output from GOAT
202 and stores useful information using the logging module
204 Loads useful information into conf_info as well, for future
205 use outside the function.
207 returns True if found condition that signifies success.
210 # Skip irrelevant line (without logging)
211 if s_skip.search(line):
214 # Detect invalid command-line arguments
215 elif s_invalid_cmdline.search(line):
216 LOGGER.error("Invalid commandline options!")
218 # Detect starting of configuration
219 elif s_start.search(line):
220 LOGGER.info('START: Configuring pipeline')
222 # Detect it made it past invalid arguments
223 elif s_gerald.search(line):
224 LOGGER.info('Running make now')
226 # Detect that make files have been generated (based on output)
227 elif s_generating.search(line):
228 LOGGER.info('Make files generted')
231 # Capture run directory
232 elif s_seq_folder.search(line):
233 mo = s_seq_folder_sub.search(line)
234 #Output changed when using --tiles=<tiles>
235 # at least in pipeline v0.3.0b2
237 firecrest_bustard_gerald_makefile = line[mo.end():]
238 firecrest_bustard_gerald, junk = \
239 os.path.split(firecrest_bustard_gerald_makefile)
240 firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
241 firecrest, junk = os.path.split(firecrest_bustard)
243 conf_info.bustard_path = firecrest_bustard
244 conf_info.run_path = firecrest
246 #Standard output handling
248 print('Sequence line:', line)
249 mo = s_seq_folder.search(line)
250 conf_info.bustard_path = line[mo.end():]
251 conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
253 # Log all other output for debugging purposes
255 LOGGER.warning('CONF:?: %s' % (line))
261 def config_stderr_handler(line, conf_info):
263 Processes each line of output from GOAT
264 and stores useful information using the logging module
266 Loads useful information into conf_info as well, for future
267 use outside the function.
269 returns RUN_ABORT upon detecting failure;
270 True on success message;
271 False if neutral message
272 (i.e. doesn't signify failure or success)
274 global SUPPRESS_MISSING_CYCLES
276 # Detect invalid species directory error
277 if s_species_dir_err.search(line):
280 # Detect goat_pipeline.py traceback
281 elif s_goat_traceb.search(line):
282 LOGGER.error("Goat config script died, traceback in debug output")
284 # Detect indication of successful configuration (from stderr; odd, but ok)
285 elif s_stderr_taskcomplete.search(line):
286 LOGGER.info('Configure step successful (from: stderr)')
288 # Detect missing cycles
289 elif s_missing_cycles.search(line):
291 # Only display error once
292 if not SUPPRESS_MISSING_CYCLES:
293 LOGGER.error("Missing cycles detected; Not all cycles copied?")
294 LOGGER.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
295 SUPPRESS_MISSING_CYCLES = True
298 # Log all other output as debug output
300 LOGGER.debug('CONF:STDERR:?: %s' % (line))
302 # Neutral (not failure; nor success)
306 #def pipeline_stdout_handler(line, conf_info):
308 # Processes each line of output from running the pipeline
309 # and stores useful information using the logging module
311 # Loads useful information into conf_info as well, for future
312 # use outside the function.
314 # returns True if found condition that signifies success.
317 # #f.write(line + '\n')
323 def pipeline_stderr_handler(line, conf_info):
325 Processes each line of stderr from pipelien run
326 and stores useful information using the logging module
328 ##FIXME: Future feature (doesn't actually do this yet)
329 #Loads useful information into conf_info as well, for future
330 #use outside the function.
332 returns RUN_FAILED upon detecting failure;
333 #True on success message; (no clear success state)
334 False if neutral message
335 (i.e. doesn't signify failure or success)
338 if pl_stderr_ignore(line):
340 elif s_make_error.search(line):
341 LOGGER.error("make error detected; run failed")
343 elif s_no_gnuplot.search(line):
344 LOGGER.error("gnuplot not found")
346 elif s_no_convert.search(line):
347 LOGGER.error("imagemagick's convert command not found")
349 elif s_no_ghostscript.search(line):
350 LOGGER.error("ghostscript not found")
353 LOGGER.debug('PIPE:STDERR:?: %s' % (line))
358 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
360 Gets the config file from server...
361 requires config file in:
362 /etc/ga_frontend/ga_frontend.conf
368 base_host_url: http://host:port
370 return True if successful, False is failure
372 options = getCombinedOptions()
374 if options.url is None:
375 LOGGER.error("%s or %s missing base_host_url option" % \
376 (CONFIG_USER, CONFIG_SYSTEM))
380 saveConfigFile(flowcell, options.url, cfg_filepath)
381 conf_info.config_filepath = cfg_filepath
382 except FlowCellNotFound as e:
385 except WebError404 as e:
391 except Exception as e:
395 f = open(cfg_filepath, 'r')
399 genome_dict = getAvailableGenomes(genome_dir)
400 mapper_dict = constructMapperDict(genome_dict)
404 f = open(cfg_filepath, 'w')
405 f.write(data % (mapper_dict))
412 def configure(conf_info):
414 Attempts to configure the GA pipeline using goat.
416 Uses logging module to store information about status.
418 returns True if configuration successful, otherwise False.
421 #pipe = subprocess.Popen(['goat_pipeline.py',
422 # '--GERALD=config32bk.txt',
425 # stdout=subprocess.PIPE,
426 # stderr=subprocess.PIPE)
428 #ERROR Test (2), causes goat_pipeline.py traceback
429 #pipe = subprocess.Popen(['goat_pipeline.py',
430 # '--GERALD=%s' % (conf_info.config_filepath),
431 # '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
434 # stdout=subprocess.PIPE,
435 # stderr=subprocess.PIPE)
437 ##########################
438 # Run configuration step
439 # Not a test; actual configure attempt.
440 #pipe = subprocess.Popen(['goat_pipeline.py',
441 # '--GERALD=%s' % (conf_info.config_filepath),
444 # stdout=subprocess.PIPE,
445 # stderr=subprocess.PIPE)
448 stdout_filepath = os.path.join(conf_info.analysis_dir,
449 "pipeline_configure_stdout.txt")
450 stderr_filepath = os.path.join(conf_info.analysis_dir,
451 "pipeline_configure_stderr.txt")
453 fout = open(stdout_filepath, 'w')
454 ferr = open(stderr_filepath, 'w')
456 pipe = subprocess.Popen(['goat_pipeline.py',
457 '--GERALD=%s' % (conf_info.config_filepath),
459 conf_info.analysis_dir],
463 print("Configuring pipeline: %s" % (time.ctime()))
464 error_code = pipe.wait()
473 fout = open(stdout_filepath, 'r')
475 stdout_line = fout.readline()
478 while stdout_line != '':
480 if config_stdout_handler(stdout_line, conf_info):
482 stdout_line = fout.readline()
487 #error_code = pipe.wait()
489 LOGGER.error('Recieved error_code: %s' % (error_code))
491 LOGGER.info('We are go for launch!')
494 ferr = open(stderr_filepath, 'r')
495 stderr_line = ferr.readline()
498 stderr_success = False
499 while stderr_line != '':
500 stderr_status = config_stderr_handler(stderr_line, conf_info)
501 if stderr_status == RUN_ABORT:
503 elif stderr_status is True:
504 stderr_success = True
505 stderr_line = ferr.readline()
510 #Success requirements:
511 # 1) The stdout completed without error
512 # 2) The program exited with status 0
513 # 3) No errors found in stdout
514 print('#Expect: True, False, True, True')
515 print(complete, bool(error_code), abort != RUN_ABORT, stderr_success is True)
516 status = complete is True and \
517 bool(error_code) is False and \
518 abort != RUN_ABORT and \
519 stderr_success is True
521 # If everything was successful, but for some reason
522 # we didn't retrieve the path info, log it.
524 if conf_info.bustard_path is None or conf_info.run_path is None:
525 LOGGER.error("Failed to retrieve run_path")
531 def run_pipeline(conf_info):
533 Run the pipeline and monitor status.
535 # Fail if the run_path doesn't actually exist
536 if not os.path.exists(conf_info.run_path):
537 LOGGER.error('Run path does not exist: %s' \
538 % (conf_info.run_path))
541 # Change cwd to run_path
542 stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
543 stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
545 # Create status object
546 conf_info.createStatusObject()
548 # Monitor file creation
550 mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
551 event = RunEvent(conf_info)
552 notifier = ThreadedNotifier(wm, event)
554 wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
556 # Log pipeline starting
557 LOGGER.info('STARTING PIPELINE @ %s' % (time.ctime()))
559 # Start the pipeline (and hide!)
560 #pipe = subprocess.Popen(['make',
563 # stdout=subprocess.PIPE,
564 # stderr=subprocess.PIPE)
566 fout = open(stdout_filepath, 'w')
567 ferr = open(stderr_filepath, 'w')
569 pipe = subprocess.Popen(['make',
570 '--directory=%s' % (conf_info.run_path),
576 # Wait for run to finish
577 retcode = pipe.wait()
586 ferr = open(stderr_filepath, 'r')
588 run_failed_stderr = False
590 err_status = pipeline_stderr_handler(line, conf_info)
591 if err_status == RUN_FAILED:
592 run_failed_stderr = True
596 # Finished file check!
597 print('RUN SUCCESS CHECK:')
598 for key, value in event.run_status_dict.items():
599 print(' %s: %s' % (key, value))
601 dstatus = event.run_status_dict
603 # Success or failure check
604 status = (retcode == 0) and \
605 run_failed_stderr is False and \
606 dstatus['firecrest'] is True and \
607 dstatus['bustard'] is True and \
608 dstatus['gerald'] is True