8 from htsworkflow.pipelines.retrieve_config import \
9 CONFIG_SYSTEM, CONFIG_USER, \
10 FlowCellNotFound, getCombinedOptions, saveConfigFile, WebError404
11 from htsworkflow.pipelines.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
12 from htsworkflow.pipelines.run_status import GARunStatus
14 from pyinotify import WatchManager, ThreadedNotifier
15 from pyinotify import EventsCodes, ProcessEvent
20 #run_path = firecrest analysis directory to run analysis from
22 self.bustard_path = None
23 self.config_filepath = None
26 #top level directory where all analyses are placed
27 self.base_analysis_dir = None
28 #analysis_dir, top level analysis dir...
29 # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
30 self.analysis_dir = None
33 def createStatusObject(self):
35 Creates a status object which can be queried for
36 status of running the pipeline
38 returns True if object created
39 returns False if object cannot be created
41 if self.config_filepath is None:
44 self.status = GARunStatus(self.config_filepath)
49 ####################################
50 # inotify event processor
52 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
53 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
54 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
56 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
57 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
58 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
60 class RunEvent(ProcessEvent):
62 def __init__(self, conf_info):
64 self.run_status_dict = {'firecrest': False,
70 ProcessEvent.__init__(self)
73 def process_IN_CREATE(self, event):
74 fullpath = os.path.join(event.path, event.name)
75 if s_finished.search(fullpath):
76 logging.info("File Found: %s" % (fullpath))
78 if s_firecrest_finished.search(fullpath):
79 self.run_status_dict['firecrest'] = True
80 self._ci.status.updateFirecrest(event.name)
81 elif s_bustard_finished.search(fullpath):
82 self.run_status_dict['bustard'] = True
83 self._ci.status.updateBustard(event.name)
84 elif s_gerald_finished.search(fullpath):
85 self.run_status_dict['gerald'] = True
86 self._ci.status.updateGerald(event.name)
88 #WARNING: The following order is important!!
89 # Firecrest regex will catch all gerald, bustard, and firecrest
90 # Bustard regex will catch all gerald and bustard
91 # Gerald regex will catch all gerald
92 # So, order needs to be Gerald, Bustard, Firecrest, or this
93 # won't work properly.
94 elif s_gerald_all.search(fullpath):
95 self._ci.status.updateGerald(event.name)
96 elif s_bustard_all.search(fullpath):
97 self._ci.status.updateBustard(event.name)
98 elif s_firecrest_all.search(fullpath):
99 self._ci.status.updateFirecrest(event.name)
101 #print "Create: %s" % (os.path.join(event.path, event.name))
103 def process_IN_DELETE(self, event):
104 #print "Remove %s" % (os.path.join(event.path, event.name))
114 RUN_FAILED = 'failed'
117 #####################################
118 # Configure Step (goat_pipeline.py)
120 s_start = re.compile('Starting Genome Analyzer Pipeline')
121 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
122 s_generating = re.compile('^Generating journals, Makefiles')
123 s_seq_folder = re.compile('^Sequence folder: ')
124 s_seq_folder_sub = re.compile('want to make ')
125 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
128 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
129 s_species_dir_err = re.compile('Error: Lane [1-8]:')
130 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
131 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
133 SUPPRESS_MISSING_CYCLES = False
136 ##Ignore - Example of out above each ignore regex.
137 #NOTE: Commenting out an ignore will cause it to be
138 # logged as DEBUG with the logging module.
139 #CF_STDERR_IGNORE_LIST = []
140 s_skip = re.compile('s_[0-8]_[0-9]+')
143 ##########################################
144 # Pipeline Run Step (make -j8 recursive)
147 s_finished = re.compile('finished')
150 s_make_error = re.compile('^make[\S\s]+Error')
151 s_no_gnuplot = re.compile('gnuplot: command not found')
152 s_no_convert = re.compile('^Can\'t exec "convert"')
153 s_no_ghostscript = re.compile('gs: command not found')
155 ##Ignore - Example of out above each ignore regex.
156 #NOTE: Commenting out an ignore will cause it to be
157 # logged as DEBUG with the logging module.
159 PL_STDERR_IGNORE_LIST = []
161 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
162 # About to analyse intensity file s_4_0101_sig2.txt
163 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
164 # Will send output to standard output
165 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
166 # Found 31877 clusters
167 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
168 # Will use quality criterion ((CHASTITY>=0.6)
169 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
170 # Quality criterion translated to (($F[5]>=0.6))
171 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
172 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
174 # opened s_4_0103_qhg.txt
175 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
176 # 81129 sequences out of 157651 passed filter criteria
177 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
180 def pl_stderr_ignore(line):
182 Searches lines for lines to ignore (i.e. not to log)
184 returns True if line should be ignored
185 returns False if line should NOT be ignored
187 for s in PL_STDERR_IGNORE_LIST:
193 def config_stdout_handler(line, conf_info):
195 Processes each line of output from GOAT
196 and stores useful information using the logging module
198 Loads useful information into conf_info as well, for future
199 use outside the function.
201 returns True if found condition that signifies success.
204 # Skip irrelevant line (without logging)
205 if s_skip.search(line):
208 # Detect invalid command-line arguments
209 elif s_invalid_cmdline.search(line):
210 logging.error("Invalid commandline options!")
212 # Detect starting of configuration
213 elif s_start.search(line):
214 logging.info('START: Configuring pipeline')
216 # Detect it made it past invalid arguments
217 elif s_gerald.search(line):
218 logging.info('Running make now')
220 # Detect that make files have been generated (based on output)
221 elif s_generating.search(line):
222 logging.info('Make files generted')
225 # Capture run directory
226 elif s_seq_folder.search(line):
227 mo = s_seq_folder_sub.search(line)
228 #Output changed when using --tiles=<tiles>
229 # at least in pipeline v0.3.0b2
231 firecrest_bustard_gerald_makefile = line[mo.end():]
232 firecrest_bustard_gerald, junk = \
233 os.path.split(firecrest_bustard_gerald_makefile)
234 firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
235 firecrest, junk = os.path.split(firecrest_bustard)
237 conf_info.bustard_path = firecrest_bustard
238 conf_info.run_path = firecrest
240 #Standard output handling
242 print 'Sequence line:', line
243 mo = s_seq_folder.search(line)
244 conf_info.bustard_path = line[mo.end():]
245 conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
247 # Log all other output for debugging purposes
249 logging.warning('CONF:?: %s' % (line))
255 def config_stderr_handler(line, conf_info):
257 Processes each line of output from GOAT
258 and stores useful information using the logging module
260 Loads useful information into conf_info as well, for future
261 use outside the function.
263 returns RUN_ABORT upon detecting failure;
264 True on success message;
265 False if neutral message
266 (i.e. doesn't signify failure or success)
268 global SUPPRESS_MISSING_CYCLES
270 # Detect invalid species directory error
271 if s_species_dir_err.search(line):
274 # Detect goat_pipeline.py traceback
275 elif s_goat_traceb.search(line):
276 logging.error("Goat config script died, traceback in debug output")
278 # Detect indication of successful configuration (from stderr; odd, but ok)
279 elif s_stderr_taskcomplete.search(line):
280 logging.info('Configure step successful (from: stderr)')
282 # Detect missing cycles
283 elif s_missing_cycles.search(line):
285 # Only display error once
286 if not SUPPRESS_MISSING_CYCLES:
287 logging.error("Missing cycles detected; Not all cycles copied?")
288 logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
289 SUPPRESS_MISSING_CYCLES = True
292 # Log all other output as debug output
294 logging.debug('CONF:STDERR:?: %s' % (line))
296 # Neutral (not failure; nor success)
300 #def pipeline_stdout_handler(line, conf_info):
302 # Processes each line of output from running the pipeline
303 # and stores useful information using the logging module
305 # Loads useful information into conf_info as well, for future
306 # use outside the function.
308 # returns True if found condition that signifies success.
311 # #f.write(line + '\n')
317 def pipeline_stderr_handler(line, conf_info):
319 Processes each line of stderr from pipelien run
320 and stores useful information using the logging module
322 ##FIXME: Future feature (doesn't actually do this yet)
323 #Loads useful information into conf_info as well, for future
324 #use outside the function.
326 returns RUN_FAILED upon detecting failure;
327 #True on success message; (no clear success state)
328 False if neutral message
329 (i.e. doesn't signify failure or success)
332 if pl_stderr_ignore(line):
334 elif s_make_error.search(line):
335 logging.error("make error detected; run failed")
337 elif s_no_gnuplot.search(line):
338 logging.error("gnuplot not found")
340 elif s_no_convert.search(line):
341 logging.error("imagemagick's convert command not found")
343 elif s_no_ghostscript.search(line):
344 logging.error("ghostscript not found")
347 logging.debug('PIPE:STDERR:?: %s' % (line))
352 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir,
355 Gets the config file from server...
356 requires config file in:
357 /etc/ga_frontend/ga_frontend.conf
363 base_host_url: http://host:port
365 return True if successful, False is failure
367 options = getCombinedOptions(cfg_defaults)
369 if options.url is None:
370 logging.error("%s or %s missing base_host_url option" % \
371 (CONFIG_USER, CONFIG_SYSTEM))
375 saveConfigFile(flowcell, options.url, cfg_filepath)
376 conf_info.config_filepath = cfg_filepath
377 except FlowCellNotFound, e:
380 except WebError404, e:
390 f = open(cfg_filepath, 'r')
394 genome_dict = getAvailableGenomes(genome_dir)
395 mapper_dict = constructMapperDict(genome_dict)
399 f = open(cfg_filepath, 'w')
400 f.write(data % (mapper_dict))
407 def configure(conf_info):
409 Attempts to configure the GA pipeline using goat.
411 Uses logging module to store information about status.
413 returns True if configuration successful, otherwise False.
416 #pipe = subprocess.Popen(['goat_pipeline.py',
417 # '--GERALD=config32bk.txt',
420 # stdout=subprocess.PIPE,
421 # stderr=subprocess.PIPE)
423 #ERROR Test (2), causes goat_pipeline.py traceback
424 #pipe = subprocess.Popen(['goat_pipeline.py',
425 # '--GERALD=%s' % (conf_info.config_filepath),
426 # '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
429 # stdout=subprocess.PIPE,
430 # stderr=subprocess.PIPE)
432 ##########################
433 # Run configuration step
434 # Not a test; actual configure attempt.
435 #pipe = subprocess.Popen(['goat_pipeline.py',
436 # '--GERALD=%s' % (conf_info.config_filepath),
439 # stdout=subprocess.PIPE,
440 # stderr=subprocess.PIPE)
443 stdout_filepath = os.path.join(conf_info.analysis_dir,
444 "pipeline_configure_stdout.txt")
445 stderr_filepath = os.path.join(conf_info.analysis_dir,
446 "pipeline_configure_stderr.txt")
448 fout = open(stdout_filepath, 'w')
449 ferr = open(stderr_filepath, 'w')
451 pipe = subprocess.Popen(['goat_pipeline.py',
452 '--GERALD=%s' % (conf_info.config_filepath),
454 conf_info.analysis_dir],
458 print "Configuring pipeline: %s" % (time.ctime())
459 error_code = pipe.wait()
468 fout = open(stdout_filepath, 'r')
470 stdout_line = fout.readline()
473 while stdout_line != '':
475 if config_stdout_handler(stdout_line, conf_info):
477 stdout_line = fout.readline()
482 #error_code = pipe.wait()
484 logging.error('Recieved error_code: %s' % (error_code))
486 logging.info('We are go for launch!')
489 ferr = open(stderr_filepath, 'r')
490 stderr_line = ferr.readline()
493 stderr_success = False
494 while stderr_line != '':
495 stderr_status = config_stderr_handler(stderr_line, conf_info)
496 if stderr_status == RUN_ABORT:
498 elif stderr_status is True:
499 stderr_success = True
500 stderr_line = ferr.readline()
505 #Success requirements:
506 # 1) The stdout completed without error
507 # 2) The program exited with status 0
508 # 3) No errors found in stdout
509 print '#Expect: True, False, True, True'
510 print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
511 status = complete is True and \
512 bool(error_code) is False and \
513 abort != RUN_ABORT and \
514 stderr_success is True
516 # If everything was successful, but for some reason
517 # we didn't retrieve the path info, log it.
519 if conf_info.bustard_path is None or conf_info.run_path is None:
520 logging.error("Failed to retrieve run_path")
526 def run_pipeline(conf_info):
528 Run the pipeline and monitor status.
530 # Fail if the run_path doesn't actually exist
531 if not os.path.exists(conf_info.run_path):
532 logging.error('Run path does not exist: %s' \
533 % (conf_info.run_path))
536 # Change cwd to run_path
537 stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
538 stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
540 # Create status object
541 conf_info.createStatusObject()
543 # Monitor file creation
545 mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
546 event = RunEvent(conf_info)
547 notifier = ThreadedNotifier(wm, event)
549 wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
551 # Log pipeline starting
552 logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
554 # Start the pipeline (and hide!)
555 #pipe = subprocess.Popen(['make',
558 # stdout=subprocess.PIPE,
559 # stderr=subprocess.PIPE)
561 fout = open(stdout_filepath, 'w')
562 ferr = open(stderr_filepath, 'w')
564 pipe = subprocess.Popen(['make',
565 '--directory=%s' % (conf_info.run_path),
571 # Wait for run to finish
572 retcode = pipe.wait()
581 ferr = open(stderr_filepath, 'r')
583 run_failed_stderr = False
585 err_status = pipeline_stderr_handler(line, conf_info)
586 if err_status == RUN_FAILED:
587 run_failed_stderr = True
591 # Finished file check!
592 print 'RUN SUCCESS CHECK:'
593 for key, value in event.run_status_dict.items():
594 print ' %s: %s' % (key, value)
596 dstatus = event.run_status_dict
598 # Success or failure check
599 status = (retcode == 0) and \
600 run_failed_stderr is False and \
601 dstatus['firecrest'] is True and \
602 dstatus['bustard'] is True and \
603 dstatus['gerald'] is True