8 from pyinotify import WatchManager, ThreadedNotifier
9 from pyinotify import EventsCodes, ProcessEvent
11 logging.basicConfig(level=logging.DEBUG,
12 format='%(asctime)s %(levelname)-8s %(message)s',
13 datefmt='%a, %d %b %Y %H:%M:%S',
14 filename='pipeline_main.log',
21 self.bustard_path = None
22 self.config_filepath = None
25 ####################################
26 # inotify event processor
28 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
29 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
30 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
32 class RunEvent(ProcessEvent):
36 self.run_status_dict = {'firecrest': False,
40 ProcessEvent.__init__(self)
42 def process_IN_CREATE(self, event):
43 fullpath = os.path.join(event.path, event.name)
44 if s_finished.search(fullpath):
45 logging.info("File Found: %s" % (fullpath))
47 if s_firecrest_finished.search(fullpath):
48 self.run_status_dict['firecrest'] = True
49 elif s_bustard_finished.search(fullpath):
50 self.run_status_dict['bustard'] = True
51 elif s_gerald_finished.search(fullpath):
52 self.run_status_dict['gerald'] = True
54 print "Create: %s" % (os.path.join(event.path, event.name))
56 def process_IN_DELETE(self, event):
57 print "Remove %s" % (os.path.join(event.path, event.name))
66 #####################################
67 # Configure Step (goat_pipeline.py)
69 s_start = re.compile('Starting Genome Analyzer Pipeline')
70 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
71 s_generating = re.compile('Generating journals, Makefiles and parameter files')
72 s_seq_folder = re.compile('^Sequence folder: ')
73 s_seq_folder_sub = re.compile('want to make ')
74 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
77 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
78 s_species_dir_err = re.compile('Error: Lane [1-8]:')
79 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
82 ##Ignore - Example of out above each ignore regex.
83 #NOTE: Commenting out an ignore will cause it to be
84 # logged as DEBUG with the logging module.
85 #CF_STDERR_IGNORE_LIST = []
86 s_skip = re.compile('s_[0-8]_[0-9]+')
89 ##########################################
90 # Pipeline Run Step (make -j8 recursive)
93 s_finished = re.compile('finished')
96 s_make_error = re.compile('^make[\S\s]+Error')
97 s_no_gnuplot = re.compile('gnuplot: command not found')
98 s_no_convert = re.compile('^Can\'t exec "convert"')
99 s_no_ghostscript = re.compile('gs: command not found')
101 ##Ignore - Example of out above each ignore regex.
102 #NOTE: Commenting out an ignore will cause it to be
103 # logged as DEBUG with the logging module.
105 PL_STDERR_IGNORE_LIST = []
107 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
108 # About to analyse intensity file s_4_0101_sig2.txt
109 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
110 # Will send output to standard output
111 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
112 # Found 31877 clusters
113 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
114 # Will use quality criterion ((CHASTITY>=0.6)
115 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
116 # Quality criterion translated to (($F[5]>=0.6))
117 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
118 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
120 # opened s_4_0103_qhg.txt
121 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
122 # 81129 sequences out of 157651 passed filter criteria
123 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
126 def pl_stderr_ignore(line):
128 Searches lines for lines to ignore (i.e. not to log)
130 returns True if line should be ignored
131 returns False if line should NOT be ignored
133 for s in PL_STDERR_IGNORE_LIST:
139 def config_stdout_handler(line, conf_info):
141 Processes each line of output from GOAT
142 and stores useful information using the logging module
144 Loads useful information into conf_info as well, for future
145 use outside the function.
147 returns True if found condition that signifies success.
150 # Skip irrelevant line (without logging)
151 if s_skip.search(line):
154 # Detect invalid command-line arguments
155 elif s_invalid_cmdline.search(line):
156 logging.error("Invalid commandline options!")
158 # Detect starting of configuration
159 elif s_start.search(line):
160 logging.info('START: Configuring pipeline')
162 # Detect it made it past invalid arguments
163 elif s_gerald.search(line):
164 logging.info('Running make now')
166 # Detect that make files have been generated (based on output)
167 elif s_generating.search(line):
168 logging.info('Make files generted')
171 # Capture run directory
172 elif s_seq_folder.search(line):
173 mo = s_seq_folder_sub.search(line)
174 #Output changed when using --tiles=<tiles>
175 # at least in pipeline v0.3.0b2
177 firecrest_bustard_gerald_makefile = line[mo.end():]
178 firecrest_bustard_gerald, junk = \
179 os.path.split(firecrest_bustard_gerald_makefile)
180 firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
181 firecrest, junk = os.path.split(firecrest_bustard)
183 conf_info.bustard_path = firecrest_bustard
184 conf_info.run_path = firecrest
186 #Standard output handling
188 print 'Sequence line:', line
189 mo = s_seq_folder.search(line)
190 conf_info.bustard_path = line[mo.end():]
191 conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
193 # Log all other output for debugging purposes
195 logging.warning('CONF:?: %s' % (line))
201 def config_stderr_handler(line, conf_info):
203 Processes each line of output from GOAT
204 and stores useful information using the logging module
206 Loads useful information into conf_info as well, for future
207 use outside the function.
209 returns RUN_ABORT upon detecting failure;
210 True on success message;
211 False if neutral message
212 (i.e. doesn't signify failure or success)
215 # Detect invalid species directory error
216 if s_species_dir_err.search(line):
219 # Detect goat_pipeline.py traceback
220 elif s_goat_traceb.search(line):
221 logging.error("Goat config script died, traceback in debug output")
223 # Detect indication of successful configuration (from stderr; odd, but ok)
224 elif s_stderr_taskcomplete.search(line):
225 logging.info('Configure step successful (from: stderr)')
227 # Log all other output as debug output
229 logging.debug('CONF:STDERR:?: %s' % (line))
231 # Neutral (not failure; nor success)
235 #FIXME: Temperary hack
236 #f = open('pipeline_run.log', 'w')
237 #ferr = open('pipeline_err.log', 'w')
241 #def pipeline_stdout_handler(line, conf_info):
243 # Processes each line of output from running the pipeline
244 # and stores useful information using the logging module
246 # Loads useful information into conf_info as well, for future
247 # use outside the function.
249 # returns True if found condition that signifies success.
252 # f.write(line + '\n')
258 def pipeline_stderr_handler(line, conf_info):
260 Processes each line of stderr from pipelien run
261 and stores useful information using the logging module
263 ##FIXME: Future feature (doesn't actually do this yet)
264 #Loads useful information into conf_info as well, for future
265 #use outside the function.
267 returns RUN_FAILED upon detecting failure;
268 #True on success message; (no clear success state)
269 False if neutral message
270 (i.e. doesn't signify failure or success)
273 if pl_stderr_ignore(line):
275 elif s_make_error.search(line):
276 logging.error("make error detected; run failed")
278 elif s_no_gnuplot.search(line):
279 logging.error("gnuplot not found")
281 elif s_no_convert.search(line):
282 logging.error("imagemagick's convert command not found")
284 elif s_no_ghostscript.search(line):
285 logging.error("ghostscript not found")
288 logging.debug('PIPE:STDERR:?: %s' % (line))
293 def configure(conf_info):
295 Attempts to configure the GA pipeline using goat.
297 Uses logging module to store information about status.
299 returns True if configuration successful, otherwise False.
302 #pipe = subprocess.Popen(['goat_pipeline.py',
303 # '--GERALD=config32bk.txt',
306 # stdout=subprocess.PIPE,
307 # stderr=subprocess.PIPE)
309 #ERROR Test (2), causes goat_pipeline.py traceback
310 #pipe = subprocess.Popen(['goat_pipeline.py',
311 # '--GERALD=%s' % (conf_info.config_filepath),
312 # '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
315 # stdout=subprocess.PIPE,
316 # stderr=subprocess.PIPE)
318 ##########################
319 # Run configuration step
320 # Not a test; actual configure attempt.
321 #pipe = subprocess.Popen(['goat_pipeline.py',
322 # '--GERALD=%s' % (conf_info.config_filepath),
325 # stdout=subprocess.PIPE,
326 # stderr=subprocess.PIPE)
329 #FIXME: this only does a run on 5 tiles on lane 4
330 pipe = subprocess.Popen(['goat_pipeline.py',
331 '--GERALD=%s' % (conf_info.config_filepath),
332 '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
335 stdout=subprocess.PIPE,
336 stderr=subprocess.PIPE)
339 stdout_line = pipe.stdout.readline()
342 while stdout_line != '':
344 if config_stdout_handler(stdout_line, conf_info):
346 stdout_line = pipe.stdout.readline()
349 error_code = pipe.wait()
351 logging.error('Recieved error_code: %s' % (error_code))
353 logging.info('We are go for launch!')
356 stderr_line = pipe.stderr.readline()
359 stderr_success = False
360 while stderr_line != '':
361 stderr_status = config_stderr_handler(stderr_line, conf_info)
362 if stderr_status == RUN_ABORT:
364 elif stderr_status is True:
365 stderr_success = True
366 stderr_line = pipe.stderr.readline()
369 #Success requirements:
370 # 1) The stdout completed without error
371 # 2) The program exited with status 0
372 # 3) No errors found in stdout
373 print '#Expect: True, False, True, True'
374 print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
375 status = complete is True and \
376 bool(error_code) is False and \
377 abort != RUN_ABORT and \
378 stderr_success is True
380 # If everything was successful, but for some reason
381 # we didn't retrieve the path info, log it.
383 if conf_info.bustard_path is None or conf_info.run_path is None:
384 logging.error("Failed to retrieve run_path")
390 def run_pipeline(conf_info):
392 Run the pipeline and monitor status.
394 # Fail if the run_path doesn't actually exist
395 if not os.path.exists(conf_info.run_path):
396 logging.error('Run path does not exist: %s' \
397 % (conf_info.run_path))
400 # Change cwd to run_path
401 os.chdir(conf_info.run_path)
402 stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
403 stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
405 # Monitor file creation
407 mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
409 notifier = ThreadedNotifier(wm, event)
411 wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
413 # Log pipeline starting
414 logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
416 # Start the pipeline (and hide!)
417 #pipe = subprocess.Popen(['make',
420 # stdout=subprocess.PIPE,
421 # stderr=subprocess.PIPE)
423 fout = open(stdout_filepath, 'w')
424 ferr = open(stderr_filepath, 'w')
426 pipe = subprocess.Popen(['make',
432 # Wait for run to finish
433 retcode = pipe.wait()
442 ferr = open(stderr_filepath, 'r')
444 run_failed_stderr = False
446 err_status = pipeline_stderr_handler(line, conf_info)
447 if err_status == RUN_FAILED:
448 run_failed_stderr = True
452 # Finished file check!
453 print 'RUN SUCCESS CHECK:'
454 for key, value in event.run_status_dict.items():
455 print ' %s: %s' % (key, value)
457 dstatus = event.run_status_dict
459 # Success or failure check
460 status = (retcode == 0) and \
461 run_failed_stderr is False and \
462 dstatus['firecrest'] is True and \
463 dstatus['bustard'] is True and \
464 dstatus['gerald'] is True
470 if __name__ == '__main__':
472 ci.config_filepath = 'config32bk.txt'
474 status = configure(ci)
476 print "Configure success"
478 print "Configure failed"
480 print 'Run Dir:', ci.run_path
481 print 'Bustard Dir:', ci.bustard_path
484 print 'Running pipeline now!'
485 run_status = run_pipeline(ci)
486 if run_status is True:
487 print 'Pipeline ran successfully.'
489 print 'Pipeline run failed.'
491 #FIXME: Temperary hack