8 from retrieve_eland_config import getCombinedOptions, saveConfigFile
9 from retrieve_eland_config import FlowCellNotFound, WebError404
10 from genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
12 from pyinotify import WatchManager, ThreadedNotifier
13 from pyinotify import EventsCodes, ProcessEvent
15 logging.basicConfig(level=logging.DEBUG,
16 format='%(asctime)s %(levelname)-8s %(message)s',
17 datefmt='%a, %d %b %Y %H:%M:%S',
18 filename='pipeline_main.log',
25 self.bustard_path = None
26 self.config_filepath = None
29 ####################################
30 # inotify event processor
32 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
33 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
34 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
36 class RunEvent(ProcessEvent):
40 self.run_status_dict = {'firecrest': False,
44 ProcessEvent.__init__(self)
46 def process_IN_CREATE(self, event):
47 fullpath = os.path.join(event.path, event.name)
48 if s_finished.search(fullpath):
49 logging.info("File Found: %s" % (fullpath))
51 if s_firecrest_finished.search(fullpath):
52 self.run_status_dict['firecrest'] = True
53 elif s_bustard_finished.search(fullpath):
54 self.run_status_dict['bustard'] = True
55 elif s_gerald_finished.search(fullpath):
56 self.run_status_dict['gerald'] = True
58 print "Create: %s" % (os.path.join(event.path, event.name))
60 def process_IN_DELETE(self, event):
61 print "Remove %s" % (os.path.join(event.path, event.name))
70 #####################################
71 # Configure Step (goat_pipeline.py)
73 s_start = re.compile('Starting Genome Analyzer Pipeline')
74 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
75 s_generating = re.compile('Generating journals, Makefiles and parameter files')
76 s_seq_folder = re.compile('^Sequence folder: ')
77 s_seq_folder_sub = re.compile('want to make ')
78 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
81 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
82 s_species_dir_err = re.compile('Error: Lane [1-8]:')
83 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
86 ##Ignore - Example of out above each ignore regex.
87 #NOTE: Commenting out an ignore will cause it to be
88 # logged as DEBUG with the logging module.
89 #CF_STDERR_IGNORE_LIST = []
90 s_skip = re.compile('s_[0-8]_[0-9]+')
93 ##########################################
94 # Pipeline Run Step (make -j8 recursive)
97 s_finished = re.compile('finished')
100 s_make_error = re.compile('^make[\S\s]+Error')
101 s_no_gnuplot = re.compile('gnuplot: command not found')
102 s_no_convert = re.compile('^Can\'t exec "convert"')
103 s_no_ghostscript = re.compile('gs: command not found')
105 ##Ignore - Example of out above each ignore regex.
106 #NOTE: Commenting out an ignore will cause it to be
107 # logged as DEBUG with the logging module.
109 PL_STDERR_IGNORE_LIST = []
111 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
112 # About to analyse intensity file s_4_0101_sig2.txt
113 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
114 # Will send output to standard output
115 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
116 # Found 31877 clusters
117 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
118 # Will use quality criterion ((CHASTITY>=0.6)
119 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
120 # Quality criterion translated to (($F[5]>=0.6))
121 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
122 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
124 # opened s_4_0103_qhg.txt
125 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
126 # 81129 sequences out of 157651 passed filter criteria
127 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
130 def pl_stderr_ignore(line):
132 Searches lines for lines to ignore (i.e. not to log)
134 returns True if line should be ignored
135 returns False if line should NOT be ignored
137 for s in PL_STDERR_IGNORE_LIST:
143 def config_stdout_handler(line, conf_info):
145 Processes each line of output from GOAT
146 and stores useful information using the logging module
148 Loads useful information into conf_info as well, for future
149 use outside the function.
151 returns True if found condition that signifies success.
154 # Skip irrelevant line (without logging)
155 if s_skip.search(line):
158 # Detect invalid command-line arguments
159 elif s_invalid_cmdline.search(line):
160 logging.error("Invalid commandline options!")
162 # Detect starting of configuration
163 elif s_start.search(line):
164 logging.info('START: Configuring pipeline')
166 # Detect it made it past invalid arguments
167 elif s_gerald.search(line):
168 logging.info('Running make now')
170 # Detect that make files have been generated (based on output)
171 elif s_generating.search(line):
172 logging.info('Make files generted')
175 # Capture run directory
176 elif s_seq_folder.search(line):
177 mo = s_seq_folder_sub.search(line)
178 #Output changed when using --tiles=<tiles>
179 # at least in pipeline v0.3.0b2
181 firecrest_bustard_gerald_makefile = line[mo.end():]
182 firecrest_bustard_gerald, junk = \
183 os.path.split(firecrest_bustard_gerald_makefile)
184 firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
185 firecrest, junk = os.path.split(firecrest_bustard)
187 conf_info.bustard_path = firecrest_bustard
188 conf_info.run_path = firecrest
190 #Standard output handling
192 print 'Sequence line:', line
193 mo = s_seq_folder.search(line)
194 conf_info.bustard_path = line[mo.end():]
195 conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
197 # Log all other output for debugging purposes
199 logging.warning('CONF:?: %s' % (line))
205 def config_stderr_handler(line, conf_info):
207 Processes each line of output from GOAT
208 and stores useful information using the logging module
210 Loads useful information into conf_info as well, for future
211 use outside the function.
213 returns RUN_ABORT upon detecting failure;
214 True on success message;
215 False if neutral message
216 (i.e. doesn't signify failure or success)
219 # Detect invalid species directory error
220 if s_species_dir_err.search(line):
223 # Detect goat_pipeline.py traceback
224 elif s_goat_traceb.search(line):
225 logging.error("Goat config script died, traceback in debug output")
227 # Detect indication of successful configuration (from stderr; odd, but ok)
228 elif s_stderr_taskcomplete.search(line):
229 logging.info('Configure step successful (from: stderr)')
231 # Log all other output as debug output
233 logging.debug('CONF:STDERR:?: %s' % (line))
235 # Neutral (not failure; nor success)
239 #def pipeline_stdout_handler(line, conf_info):
241 # Processes each line of output from running the pipeline
242 # and stores useful information using the logging module
244 # Loads useful information into conf_info as well, for future
245 # use outside the function.
247 # returns True if found condition that signifies success.
250 # #f.write(line + '\n')
256 def pipeline_stderr_handler(line, conf_info):
258 Processes each line of stderr from pipelien run
259 and stores useful information using the logging module
261 ##FIXME: Future feature (doesn't actually do this yet)
262 #Loads useful information into conf_info as well, for future
263 #use outside the function.
265 returns RUN_FAILED upon detecting failure;
266 #True on success message; (no clear success state)
267 False if neutral message
268 (i.e. doesn't signify failure or success)
271 if pl_stderr_ignore(line):
273 elif s_make_error.search(line):
274 logging.error("make error detected; run failed")
276 elif s_no_gnuplot.search(line):
277 logging.error("gnuplot not found")
279 elif s_no_convert.search(line):
280 logging.error("imagemagick's convert command not found")
282 elif s_no_ghostscript.search(line):
283 logging.error("ghostscript not found")
286 logging.debug('PIPE:STDERR:?: %s' % (line))
291 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
293 Gets the config file from server...
294 requires config file in:
295 /etc/ga_frontend/ga_frontend.conf
301 base_host_url: http://host:port
303 return True if successful, False is failure
305 options = getCombinedOptions()
307 if options.url is None:
308 logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \
309 " missing base_host_url option")
313 saveConfigFile(flowcell, options.url, cfg_filepath)
314 conf_info.config_filepath = cfg_filepath
315 except FlowCellNotFound, e:
318 except WebError404, e:
328 f = open(cfg_filepath, 'r')
332 genome_dict = getAvailableGenomes(genome_dir)
333 mapper_dict = constructMapperDict(genome_dict)
335 f = open(cfg_filepath, 'w')
336 f.write(data % (mapper_dict))
343 def configure(conf_info):
345 Attempts to configure the GA pipeline using goat.
347 Uses logging module to store information about status.
349 returns True if configuration successful, otherwise False.
352 #pipe = subprocess.Popen(['goat_pipeline.py',
353 # '--GERALD=config32bk.txt',
356 # stdout=subprocess.PIPE,
357 # stderr=subprocess.PIPE)
359 #ERROR Test (2), causes goat_pipeline.py traceback
360 #pipe = subprocess.Popen(['goat_pipeline.py',
361 # '--GERALD=%s' % (conf_info.config_filepath),
362 # '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
365 # stdout=subprocess.PIPE,
366 # stderr=subprocess.PIPE)
368 ##########################
369 # Run configuration step
370 # Not a test; actual configure attempt.
371 #pipe = subprocess.Popen(['goat_pipeline.py',
372 # '--GERALD=%s' % (conf_info.config_filepath),
375 # stdout=subprocess.PIPE,
376 # stderr=subprocess.PIPE)
379 #FIXME: this only does a run on 5 tiles on lane 4
380 pipe = subprocess.Popen(['goat_pipeline.py',
381 '--GERALD=%s' % (conf_info.config_filepath),
382 '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
385 stdout=subprocess.PIPE,
386 stderr=subprocess.PIPE)
389 stdout_line = pipe.stdout.readline()
392 while stdout_line != '':
394 if config_stdout_handler(stdout_line, conf_info):
396 stdout_line = pipe.stdout.readline()
399 error_code = pipe.wait()
401 logging.error('Recieved error_code: %s' % (error_code))
403 logging.info('We are go for launch!')
406 stderr_line = pipe.stderr.readline()
409 stderr_success = False
410 while stderr_line != '':
411 stderr_status = config_stderr_handler(stderr_line, conf_info)
412 if stderr_status == RUN_ABORT:
414 elif stderr_status is True:
415 stderr_success = True
416 stderr_line = pipe.stderr.readline()
419 #Success requirements:
420 # 1) The stdout completed without error
421 # 2) The program exited with status 0
422 # 3) No errors found in stdout
423 print '#Expect: True, False, True, True'
424 print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
425 status = complete is True and \
426 bool(error_code) is False and \
427 abort != RUN_ABORT and \
428 stderr_success is True
430 # If everything was successful, but for some reason
431 # we didn't retrieve the path info, log it.
433 if conf_info.bustard_path is None or conf_info.run_path is None:
434 logging.error("Failed to retrieve run_path")
440 def run_pipeline(conf_info):
442 Run the pipeline and monitor status.
444 # Fail if the run_path doesn't actually exist
445 if not os.path.exists(conf_info.run_path):
446 logging.error('Run path does not exist: %s' \
447 % (conf_info.run_path))
450 # Change cwd to run_path
451 os.chdir(conf_info.run_path)
452 stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
453 stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
455 # Monitor file creation
457 mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
459 notifier = ThreadedNotifier(wm, event)
461 wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
463 # Log pipeline starting
464 logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
466 # Start the pipeline (and hide!)
467 #pipe = subprocess.Popen(['make',
470 # stdout=subprocess.PIPE,
471 # stderr=subprocess.PIPE)
473 fout = open(stdout_filepath, 'w')
474 ferr = open(stderr_filepath, 'w')
476 pipe = subprocess.Popen(['make',
482 # Wait for run to finish
483 retcode = pipe.wait()
492 ferr = open(stderr_filepath, 'r')
494 run_failed_stderr = False
496 err_status = pipeline_stderr_handler(line, conf_info)
497 if err_status == RUN_FAILED:
498 run_failed_stderr = True
502 # Finished file check!
503 print 'RUN SUCCESS CHECK:'
504 for key, value in event.run_status_dict.items():
505 print ' %s: %s' % (key, value)
507 dstatus = event.run_status_dict
509 # Success or failure check
510 status = (retcode == 0) and \
511 run_failed_stderr is False and \
512 dstatus['firecrest'] is True and \
513 dstatus['bustard'] is True and \
514 dstatus['gerald'] is True
520 if __name__ == '__main__':
524 cfg_filepath = 'config32auto.txt'
525 genome_dir = '/home/king/trog_drive/'
527 status_retrieve_cfg = retrieve_config(ci, flowcell, cfg_filepath, genome_dir)
528 if status_retrieve_cfg:
529 print "Retrieve config file successful"
531 print "Failed to retrieve config file"
532 #ci.config_filepath = 'config32bk.txt'
534 if status_retrieve_cfg:
535 status = configure(ci)
537 print "Configure success"
539 print "Configure failed"
541 print 'Run Dir:', ci.run_path
542 print 'Bustard Dir:', ci.bustard_path
545 print 'Running pipeline now!'
546 run_status = run_pipeline(ci)
547 if run_status is True:
548 print 'Pipeline ran successfully.'
550 print 'Pipeline run failed.'