8 from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile
9 from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404
10 from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
11 from gaworkflow.pipeline.run_status import GARunStatus
13 from pyinotify import WatchManager, ThreadedNotifier
14 from pyinotify import EventsCodes, ProcessEvent
19 #run_path = firecrest analysis directory to run analysis from
21 self.bustard_path = None
22 self.config_filepath = None
25 #top level directory where all analyses are placed
26 self.base_analysis_dir = None
27 #analysis_dir, top level analysis dir...
28 # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
29 self.analysis_dir = None
32 def createStatusObject(self):
34 Creates a status object which can be queried for
35 status of running the pipeline
37 returns True if object created
38 returns False if object cannot be created
40 if self.config_filepath is None:
43 self.status = GARunStatus(self.config_filepath)
48 ####################################
49 # inotify event processor
51 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
52 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
53 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
55 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
56 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
57 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
59 class RunEvent(ProcessEvent):
61 def __init__(self, conf_info):
63 self.run_status_dict = {'firecrest': False,
69 ProcessEvent.__init__(self)
72 def process_IN_CREATE(self, event):
73 fullpath = os.path.join(event.path, event.name)
74 if s_finished.search(fullpath):
75 logging.info("File Found: %s" % (fullpath))
77 if s_firecrest_finished.search(fullpath):
78 self.run_status_dict['firecrest'] = True
79 self._ci.status.updateFirecrest(event.name)
80 elif s_bustard_finished.search(fullpath):
81 self.run_status_dict['bustard'] = True
82 self._ci.status.updateBustard(event.name)
83 elif s_gerald_finished.search(fullpath):
84 self.run_status_dict['gerald'] = True
85 self._ci.status.updateGerald(event.name)
87 #WARNING: The following order is important!!
88 # Firecrest regex will catch all gerald, bustard, and firecrest
89 # Bustard regex will catch all gerald and bustard
90 # Gerald regex will catch all gerald
91 # So, order needs to be Gerald, Bustard, Firecrest, or this
92 # won't work properly.
93 elif s_gerald_all.search(fullpath):
94 self._ci.status.updateGerald(event.name)
95 elif s_bustard_all.search(fullpath):
96 self._ci.status.updateBustard(event.name)
97 elif s_firecrest_all.search(fullpath):
98 self._ci.status.updateFirecrest(event.name)
100 #print "Create: %s" % (os.path.join(event.path, event.name))
102 def process_IN_DELETE(self, event):
103 #print "Remove %s" % (os.path.join(event.path, event.name))
113 RUN_FAILED = 'failed'
116 #####################################
117 # Configure Step (goat_pipeline.py)
119 s_start = re.compile('Starting Genome Analyzer Pipeline')
120 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
121 s_generating = re.compile('^Generating journals, Makefiles')
122 s_seq_folder = re.compile('^Sequence folder: ')
123 s_seq_folder_sub = re.compile('want to make ')
124 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
127 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
128 s_species_dir_err = re.compile('Error: Lane [1-8]:')
129 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
130 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
132 SUPPRESS_MISSING_CYCLES = False
135 ##Ignore - Example of out above each ignore regex.
136 #NOTE: Commenting out an ignore will cause it to be
137 # logged as DEBUG with the logging module.
138 #CF_STDERR_IGNORE_LIST = []
139 s_skip = re.compile('s_[0-8]_[0-9]+')
142 ##########################################
143 # Pipeline Run Step (make -j8 recursive)
146 s_finished = re.compile('finished')
149 s_make_error = re.compile('^make[\S\s]+Error')
150 s_no_gnuplot = re.compile('gnuplot: command not found')
151 s_no_convert = re.compile('^Can\'t exec "convert"')
152 s_no_ghostscript = re.compile('gs: command not found')
154 ##Ignore - Example of out above each ignore regex.
155 #NOTE: Commenting out an ignore will cause it to be
156 # logged as DEBUG with the logging module.
158 PL_STDERR_IGNORE_LIST = []
160 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
161 # About to analyse intensity file s_4_0101_sig2.txt
162 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
163 # Will send output to standard output
164 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
165 # Found 31877 clusters
166 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
167 # Will use quality criterion ((CHASTITY>=0.6)
168 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
169 # Quality criterion translated to (($F[5]>=0.6))
170 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
171 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
173 # opened s_4_0103_qhg.txt
174 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
175 # 81129 sequences out of 157651 passed filter criteria
176 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
179 def pl_stderr_ignore(line):
181 Searches lines for lines to ignore (i.e. not to log)
183 returns True if line should be ignored
184 returns False if line should NOT be ignored
186 for s in PL_STDERR_IGNORE_LIST:
192 def config_stdout_handler(line, conf_info):
194 Processes each line of output from GOAT
195 and stores useful information using the logging module
197 Loads useful information into conf_info as well, for future
198 use outside the function.
200 returns True if found condition that signifies success.
203 # Skip irrelevant line (without logging)
204 if s_skip.search(line):
207 # Detect invalid command-line arguments
208 elif s_invalid_cmdline.search(line):
209 logging.error("Invalid commandline options!")
211 # Detect starting of configuration
212 elif s_start.search(line):
213 logging.info('START: Configuring pipeline')
215 # Detect it made it past invalid arguments
216 elif s_gerald.search(line):
217 logging.info('Running make now')
219 # Detect that make files have been generated (based on output)
220 elif s_generating.search(line):
221 logging.info('Make files generted')
224 # Capture run directory
225 elif s_seq_folder.search(line):
226 mo = s_seq_folder_sub.search(line)
227 #Output changed when using --tiles=<tiles>
228 # at least in pipeline v0.3.0b2
230 firecrest_bustard_gerald_makefile = line[mo.end():]
231 firecrest_bustard_gerald, junk = \
232 os.path.split(firecrest_bustard_gerald_makefile)
233 firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
234 firecrest, junk = os.path.split(firecrest_bustard)
236 conf_info.bustard_path = firecrest_bustard
237 conf_info.run_path = firecrest
239 #Standard output handling
241 print 'Sequence line:', line
242 mo = s_seq_folder.search(line)
243 conf_info.bustard_path = line[mo.end():]
244 conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
246 # Log all other output for debugging purposes
248 logging.warning('CONF:?: %s' % (line))
254 def config_stderr_handler(line, conf_info):
256 Processes each line of output from GOAT
257 and stores useful information using the logging module
259 Loads useful information into conf_info as well, for future
260 use outside the function.
262 returns RUN_ABORT upon detecting failure;
263 True on success message;
264 False if neutral message
265 (i.e. doesn't signify failure or success)
267 global SUPPRESS_MISSING_CYCLES
269 # Detect invalid species directory error
270 if s_species_dir_err.search(line):
273 # Detect goat_pipeline.py traceback
274 elif s_goat_traceb.search(line):
275 logging.error("Goat config script died, traceback in debug output")
277 # Detect indication of successful configuration (from stderr; odd, but ok)
278 elif s_stderr_taskcomplete.search(line):
279 logging.info('Configure step successful (from: stderr)')
281 # Detect missing cycles
282 elif s_missing_cycles.search(line):
284 # Only display error once
285 if not SUPPRESS_MISSING_CYCLES:
286 logging.error("Missing cycles detected; Not all cycles copied?")
287 logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
288 SUPPRESS_MISSING_CYCLES = True
291 # Log all other output as debug output
293 logging.debug('CONF:STDERR:?: %s' % (line))
295 # Neutral (not failure; nor success)
299 #def pipeline_stdout_handler(line, conf_info):
301 # Processes each line of output from running the pipeline
302 # and stores useful information using the logging module
304 # Loads useful information into conf_info as well, for future
305 # use outside the function.
307 # returns True if found condition that signifies success.
310 # #f.write(line + '\n')
316 def pipeline_stderr_handler(line, conf_info):
318 Processes each line of stderr from pipelien run
319 and stores useful information using the logging module
321 ##FIXME: Future feature (doesn't actually do this yet)
322 #Loads useful information into conf_info as well, for future
323 #use outside the function.
325 returns RUN_FAILED upon detecting failure;
326 #True on success message; (no clear success state)
327 False if neutral message
328 (i.e. doesn't signify failure or success)
331 if pl_stderr_ignore(line):
333 elif s_make_error.search(line):
334 logging.error("make error detected; run failed")
336 elif s_no_gnuplot.search(line):
337 logging.error("gnuplot not found")
339 elif s_no_convert.search(line):
340 logging.error("imagemagick's convert command not found")
342 elif s_no_ghostscript.search(line):
343 logging.error("ghostscript not found")
346 logging.debug('PIPE:STDERR:?: %s' % (line))
351 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
353 Gets the config file from server...
354 requires config file in:
355 /etc/ga_frontend/ga_frontend.conf
361 base_host_url: http://host:port
363 return True if successful, False is failure
365 options = getCombinedOptions()
367 if options.url is None:
368 logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \
369 " missing base_host_url option")
373 saveConfigFile(flowcell, options.url, cfg_filepath)
374 conf_info.config_filepath = cfg_filepath
375 except FlowCellNotFound, e:
378 except WebError404, e:
388 f = open(cfg_filepath, 'r')
392 genome_dict = getAvailableGenomes(genome_dir)
393 mapper_dict = constructMapperDict(genome_dict)
397 f = open(cfg_filepath, 'w')
398 f.write(data % (mapper_dict))
405 def configure(conf_info):
407 Attempts to configure the GA pipeline using goat.
409 Uses logging module to store information about status.
411 returns True if configuration successful, otherwise False.
414 #pipe = subprocess.Popen(['goat_pipeline.py',
415 # '--GERALD=config32bk.txt',
418 # stdout=subprocess.PIPE,
419 # stderr=subprocess.PIPE)
421 #ERROR Test (2), causes goat_pipeline.py traceback
422 #pipe = subprocess.Popen(['goat_pipeline.py',
423 # '--GERALD=%s' % (conf_info.config_filepath),
424 # '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
427 # stdout=subprocess.PIPE,
428 # stderr=subprocess.PIPE)
430 ##########################
431 # Run configuration step
432 # Not a test; actual configure attempt.
433 #pipe = subprocess.Popen(['goat_pipeline.py',
434 # '--GERALD=%s' % (conf_info.config_filepath),
437 # stdout=subprocess.PIPE,
438 # stderr=subprocess.PIPE)
441 stdout_filepath = os.path.join(conf_info.analysis_dir,
442 "pipeline_configure_stdout.txt")
443 stderr_filepath = os.path.join(conf_info.analysis_dir,
444 "pipeline_configure_stderr.txt")
446 fout = open(stdout_filepath, 'w')
447 ferr = open(stderr_filepath, 'w')
449 pipe = subprocess.Popen(['goat_pipeline.py',
450 '--GERALD=%s' % (conf_info.config_filepath),
451 #'--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
453 conf_info.analysis_dir],
457 print "Configuring pipeline: %s" % (time.ctime())
458 error_code = pipe.wait()
467 fout = open(stdout_filepath, 'r')
469 stdout_line = fout.readline()
472 while stdout_line != '':
474 if config_stdout_handler(stdout_line, conf_info):
476 stdout_line = fout.readline()
481 #error_code = pipe.wait()
483 logging.error('Recieved error_code: %s' % (error_code))
485 logging.info('We are go for launch!')
488 ferr = open(stderr_filepath, 'r')
489 stderr_line = ferr.readline()
492 stderr_success = False
493 while stderr_line != '':
494 stderr_status = config_stderr_handler(stderr_line, conf_info)
495 if stderr_status == RUN_ABORT:
497 elif stderr_status is True:
498 stderr_success = True
499 stderr_line = ferr.readline()
504 #Success requirements:
505 # 1) The stdout completed without error
506 # 2) The program exited with status 0
507 # 3) No errors found in stdout
508 print '#Expect: True, False, True, True'
509 print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
510 status = complete is True and \
511 bool(error_code) is False and \
512 abort != RUN_ABORT and \
513 stderr_success is True
515 # If everything was successful, but for some reason
516 # we didn't retrieve the path info, log it.
518 if conf_info.bustard_path is None or conf_info.run_path is None:
519 logging.error("Failed to retrieve run_path")
525 def run_pipeline(conf_info):
527 Run the pipeline and monitor status.
529 # Fail if the run_path doesn't actually exist
530 if not os.path.exists(conf_info.run_path):
531 logging.error('Run path does not exist: %s' \
532 % (conf_info.run_path))
535 # Change cwd to run_path
536 stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
537 stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
539 # Create status object
540 conf_info.createStatusObject()
542 # Monitor file creation
544 mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
545 event = RunEvent(conf_info)
546 notifier = ThreadedNotifier(wm, event)
548 wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
550 # Log pipeline starting
551 logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
553 # Start the pipeline (and hide!)
554 #pipe = subprocess.Popen(['make',
557 # stdout=subprocess.PIPE,
558 # stderr=subprocess.PIPE)
560 fout = open(stdout_filepath, 'w')
561 ferr = open(stderr_filepath, 'w')
563 pipe = subprocess.Popen(['make',
564 '--directory=%s' % (conf_info.run_path),
570 # Wait for run to finish
571 retcode = pipe.wait()
580 ferr = open(stderr_filepath, 'r')
582 run_failed_stderr = False
584 err_status = pipeline_stderr_handler(line, conf_info)
585 if err_status == RUN_FAILED:
586 run_failed_stderr = True
590 # Finished file check!
591 print 'RUN SUCCESS CHECK:'
592 for key, value in event.run_status_dict.items():
593 print ' %s: %s' % (key, value)
595 dstatus = event.run_status_dict
597 # Success or failure check
598 status = (retcode == 0) and \
599 run_failed_stderr is False and \
600 dstatus['firecrest'] is True and \
601 dstatus['bustard'] is True and \
602 dstatus['gerald'] is True