8 from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile
9 from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404
10 from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
11 from gaworkflow.pipeline.run_status import GARunStatus
13 from pyinotify import WatchManager, ThreadedNotifier
14 from pyinotify import EventsCodes, ProcessEvent
16 logging.basicConfig(level=logging.DEBUG,
17 format='%(asctime)s %(levelname)-8s %(message)s',
18 datefmt='%a, %d %b %Y %H:%M:%S',
19 filename='pipeline_main.log',
26 self.bustard_path = None
27 self.config_filepath = None
31 def createStatusObject(self):
33 Creates a status object which can be queried for
34 status of running the pipeline
36 returns True if object created
37 returns False if object cannot be created
39 if self.config_filepath is None:
42 self.status = GARunStatus(self.config_filepath)
47 ####################################
48 # inotify event processor
50 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
51 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
52 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
54 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
55 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
56 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
58 class RunEvent(ProcessEvent):
60 def __init__(self, conf_info):
62 self.run_status_dict = {'firecrest': False,
68 ProcessEvent.__init__(self)
71 def process_IN_CREATE(self, event):
72 fullpath = os.path.join(event.path, event.name)
73 if s_finished.search(fullpath):
74 logging.info("File Found: %s" % (fullpath))
76 if s_firecrest_finished.search(fullpath):
77 self.run_status_dict['firecrest'] = True
78 self._ci.status.updateFirecrest(event.name)
79 elif s_bustard_finished.search(fullpath):
80 self.run_status_dict['bustard'] = True
81 self._ci.status.updateBustard(event.name)
82 elif s_gerald_finished.search(fullpath):
83 self.run_status_dict['gerald'] = True
84 self._ci.status.updateGerald(event.name)
86 #WARNING: The following order is important!!
87 # Firecrest regex will catch all gerald, bustard, and firecrest
88 # Bustard regex will catch all gerald and bustard
89 # Gerald regex will catch all gerald
90 # So, order needs to be Gerald, Bustard, Firecrest, or this
91 # won't work properly.
92 elif s_gerald_all.search(fullpath):
93 self._ci.status.updateGerald(event.name)
94 elif s_bustard_all.search(fullpath):
95 self._ci.status.updateBustard(event.name)
96 elif s_firecrest_all.search(fullpath):
97 self._ci.status.updateFirecrest(event.name)
99 #print "Create: %s" % (os.path.join(event.path, event.name))
101 def process_IN_DELETE(self, event):
102 #print "Remove %s" % (os.path.join(event.path, event.name))
112 RUN_FAILED = 'failed'
115 #####################################
116 # Configure Step (goat_pipeline.py)
118 s_start = re.compile('Starting Genome Analyzer Pipeline')
119 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
120 s_generating = re.compile('^Generating journals, Makefiles')
121 s_seq_folder = re.compile('^Sequence folder: ')
122 s_seq_folder_sub = re.compile('want to make ')
123 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
126 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
127 s_species_dir_err = re.compile('Error: Lane [1-8]:')
128 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
131 ##Ignore - Example of out above each ignore regex.
132 #NOTE: Commenting out an ignore will cause it to be
133 # logged as DEBUG with the logging module.
134 #CF_STDERR_IGNORE_LIST = []
135 s_skip = re.compile('s_[0-8]_[0-9]+')
138 ##########################################
139 # Pipeline Run Step (make -j8 recursive)
142 s_finished = re.compile('finished')
145 s_make_error = re.compile('^make[\S\s]+Error')
146 s_no_gnuplot = re.compile('gnuplot: command not found')
147 s_no_convert = re.compile('^Can\'t exec "convert"')
148 s_no_ghostscript = re.compile('gs: command not found')
150 ##Ignore - Example of out above each ignore regex.
151 #NOTE: Commenting out an ignore will cause it to be
152 # logged as DEBUG with the logging module.
154 PL_STDERR_IGNORE_LIST = []
156 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
157 # About to analyse intensity file s_4_0101_sig2.txt
158 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
159 # Will send output to standard output
160 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
161 # Found 31877 clusters
162 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
163 # Will use quality criterion ((CHASTITY>=0.6)
164 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
165 # Quality criterion translated to (($F[5]>=0.6))
166 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
167 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
169 # opened s_4_0103_qhg.txt
170 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
171 # 81129 sequences out of 157651 passed filter criteria
172 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
175 def pl_stderr_ignore(line):
177 Searches lines for lines to ignore (i.e. not to log)
179 returns True if line should be ignored
180 returns False if line should NOT be ignored
182 for s in PL_STDERR_IGNORE_LIST:
188 def config_stdout_handler(line, conf_info):
190 Processes each line of output from GOAT
191 and stores useful information using the logging module
193 Loads useful information into conf_info as well, for future
194 use outside the function.
196 returns True if found condition that signifies success.
199 # Skip irrelevant line (without logging)
200 if s_skip.search(line):
203 # Detect invalid command-line arguments
204 elif s_invalid_cmdline.search(line):
205 logging.error("Invalid commandline options!")
207 # Detect starting of configuration
208 elif s_start.search(line):
209 logging.info('START: Configuring pipeline')
211 # Detect it made it past invalid arguments
212 elif s_gerald.search(line):
213 logging.info('Running make now')
215 # Detect that make files have been generated (based on output)
216 elif s_generating.search(line):
217 logging.info('Make files generted')
220 # Capture run directory
221 elif s_seq_folder.search(line):
222 mo = s_seq_folder_sub.search(line)
223 #Output changed when using --tiles=<tiles>
224 # at least in pipeline v0.3.0b2
226 firecrest_bustard_gerald_makefile = line[mo.end():]
227 firecrest_bustard_gerald, junk = \
228 os.path.split(firecrest_bustard_gerald_makefile)
229 firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
230 firecrest, junk = os.path.split(firecrest_bustard)
232 conf_info.bustard_path = firecrest_bustard
233 conf_info.run_path = firecrest
235 #Standard output handling
237 print 'Sequence line:', line
238 mo = s_seq_folder.search(line)
239 conf_info.bustard_path = line[mo.end():]
240 conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
242 # Log all other output for debugging purposes
244 logging.warning('CONF:?: %s' % (line))
250 def config_stderr_handler(line, conf_info):
252 Processes each line of output from GOAT
253 and stores useful information using the logging module
255 Loads useful information into conf_info as well, for future
256 use outside the function.
258 returns RUN_ABORT upon detecting failure;
259 True on success message;
260 False if neutral message
261 (i.e. doesn't signify failure or success)
264 # Detect invalid species directory error
265 if s_species_dir_err.search(line):
268 # Detect goat_pipeline.py traceback
269 elif s_goat_traceb.search(line):
270 logging.error("Goat config script died, traceback in debug output")
272 # Detect indication of successful configuration (from stderr; odd, but ok)
273 elif s_stderr_taskcomplete.search(line):
274 logging.info('Configure step successful (from: stderr)')
276 # Log all other output as debug output
278 logging.debug('CONF:STDERR:?: %s' % (line))
280 # Neutral (not failure; nor success)
284 #def pipeline_stdout_handler(line, conf_info):
286 # Processes each line of output from running the pipeline
287 # and stores useful information using the logging module
289 # Loads useful information into conf_info as well, for future
290 # use outside the function.
292 # returns True if found condition that signifies success.
295 # #f.write(line + '\n')
301 def pipeline_stderr_handler(line, conf_info):
303 Processes each line of stderr from pipelien run
304 and stores useful information using the logging module
306 ##FIXME: Future feature (doesn't actually do this yet)
307 #Loads useful information into conf_info as well, for future
308 #use outside the function.
310 returns RUN_FAILED upon detecting failure;
311 #True on success message; (no clear success state)
312 False if neutral message
313 (i.e. doesn't signify failure or success)
316 if pl_stderr_ignore(line):
318 elif s_make_error.search(line):
319 logging.error("make error detected; run failed")
321 elif s_no_gnuplot.search(line):
322 logging.error("gnuplot not found")
324 elif s_no_convert.search(line):
325 logging.error("imagemagick's convert command not found")
327 elif s_no_ghostscript.search(line):
328 logging.error("ghostscript not found")
331 logging.debug('PIPE:STDERR:?: %s' % (line))
336 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
338 Gets the config file from server...
339 requires config file in:
340 /etc/ga_frontend/ga_frontend.conf
346 base_host_url: http://host:port
348 return True if successful, False is failure
350 options = getCombinedOptions()
352 if options.url is None:
353 logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \
354 " missing base_host_url option")
358 saveConfigFile(flowcell, options.url, cfg_filepath)
359 conf_info.config_filepath = cfg_filepath
360 except FlowCellNotFound, e:
363 except WebError404, e:
373 f = open(cfg_filepath, 'r')
377 genome_dict = getAvailableGenomes(genome_dir)
378 mapper_dict = constructMapperDict(genome_dict)
380 f = open(cfg_filepath, 'w')
381 f.write(data % (mapper_dict))
388 def configure(conf_info):
390 Attempts to configure the GA pipeline using goat.
392 Uses logging module to store information about status.
394 returns True if configuration successful, otherwise False.
397 #pipe = subprocess.Popen(['goat_pipeline.py',
398 # '--GERALD=config32bk.txt',
401 # stdout=subprocess.PIPE,
402 # stderr=subprocess.PIPE)
404 #ERROR Test (2), causes goat_pipeline.py traceback
405 #pipe = subprocess.Popen(['goat_pipeline.py',
406 # '--GERALD=%s' % (conf_info.config_filepath),
407 # '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
410 # stdout=subprocess.PIPE,
411 # stderr=subprocess.PIPE)
413 ##########################
414 # Run configuration step
415 # Not a test; actual configure attempt.
416 #pipe = subprocess.Popen(['goat_pipeline.py',
417 # '--GERALD=%s' % (conf_info.config_filepath),
420 # stdout=subprocess.PIPE,
421 # stderr=subprocess.PIPE)
424 #FIXME: this only does a run on 5 tiles on lane 4
425 pipe = subprocess.Popen(['goat_pipeline.py',
426 '--GERALD=%s' % (conf_info.config_filepath),
427 '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
430 stdout=subprocess.PIPE,
431 stderr=subprocess.PIPE)
434 stdout_line = pipe.stdout.readline()
437 while stdout_line != '':
439 if config_stdout_handler(stdout_line, conf_info):
441 stdout_line = pipe.stdout.readline()
444 error_code = pipe.wait()
446 logging.error('Recieved error_code: %s' % (error_code))
448 logging.info('We are go for launch!')
451 stderr_line = pipe.stderr.readline()
454 stderr_success = False
455 while stderr_line != '':
456 stderr_status = config_stderr_handler(stderr_line, conf_info)
457 if stderr_status == RUN_ABORT:
459 elif stderr_status is True:
460 stderr_success = True
461 stderr_line = pipe.stderr.readline()
464 #Success requirements:
465 # 1) The stdout completed without error
466 # 2) The program exited with status 0
467 # 3) No errors found in stdout
468 print '#Expect: True, False, True, True'
469 print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
470 status = complete is True and \
471 bool(error_code) is False and \
472 abort != RUN_ABORT and \
473 stderr_success is True
475 # If everything was successful, but for some reason
476 # we didn't retrieve the path info, log it.
478 if conf_info.bustard_path is None or conf_info.run_path is None:
479 logging.error("Failed to retrieve run_path")
485 def run_pipeline(conf_info):
487 Run the pipeline and monitor status.
489 # Fail if the run_path doesn't actually exist
490 if not os.path.exists(conf_info.run_path):
491 logging.error('Run path does not exist: %s' \
492 % (conf_info.run_path))
495 # Change cwd to run_path
496 os.chdir(conf_info.run_path)
497 stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
498 stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
500 # Create status object
501 conf_info.createStatusObject()
503 # Monitor file creation
505 mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
506 event = RunEvent(conf_info)
507 notifier = ThreadedNotifier(wm, event)
509 wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
511 # Log pipeline starting
512 logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
514 # Start the pipeline (and hide!)
515 #pipe = subprocess.Popen(['make',
518 # stdout=subprocess.PIPE,
519 # stderr=subprocess.PIPE)
521 fout = open(stdout_filepath, 'w')
522 ferr = open(stderr_filepath, 'w')
524 pipe = subprocess.Popen(['make',
530 # Wait for run to finish
531 retcode = pipe.wait()
540 ferr = open(stderr_filepath, 'r')
542 run_failed_stderr = False
544 err_status = pipeline_stderr_handler(line, conf_info)
545 if err_status == RUN_FAILED:
546 run_failed_stderr = True
550 # Finished file check!
551 print 'RUN SUCCESS CHECK:'
552 for key, value in event.run_status_dict.items():
553 print ' %s: %s' % (key, value)
555 dstatus = event.run_status_dict
557 # Success or failure check
558 status = (retcode == 0) and \
559 run_failed_stderr is False and \
560 dstatus['firecrest'] is True and \
561 dstatus['bustard'] is True and \
562 dstatus['gerald'] is True