8 from pyinotify import WatchManager, ThreadedNotifier
9 from pyinotify import EventsCodes, ProcessEvent
11 logging.basicConfig(level=logging.DEBUG,
12 format='%(asctime)s %(levelname)-8s %(message)s',
13 datefmt='%a, %d %b %Y %H:%M:%S',
14 filename='pipeline_main.log',
21 self.bustard_path = None
22 self.config_filepath = None
25 class RunEvent(ProcessEvent):
27 def process_IN_CREATE(self, event):
28 fullpath = os.path.join(event.path, event.name)
29 if s_finished.search(fullpath):
30 logging.info("File Found: %s" % (fullpath))
31 print "Create: %s" % (os.path.join(event.path, event.name))
33 def process_IN_DELETE(self, event):
34 print "Remove %s" % (os.path.join(event.path, event.name))
41 #####################################
42 # Configure Step (goat_pipeline.py)
44 s_start = re.compile('Starting Genome Analyzer Pipeline')
45 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
46 s_generating = re.compile('Generating journals, Makefiles and parameter files')
47 s_seq_folder = re.compile('^Sequence folder: ')
48 s_seq_folder_sub = re.compile('want to make ')
49 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
52 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
53 s_species_dir_err = re.compile('Error: Lane [1-8]:')
54 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
57 ##Ignore - Example of out above each ignore regex.
58 #NOTE: Commenting out an ignore will cause it to be
59 # logged as DEBUG with the logging module.
60 #CF_STDERR_IGNORE_LIST = []
61 s_skip = re.compile('s_[0-8]_[0-9]+')
64 ##########################################
65 # Pipeline Run Step (make -j8 recursive)
68 s_finished = re.compile('finished')
71 s_make_error = re.compile('^make[\S\s]+Error')
72 s_no_gnuplot = re.compile('gnuplot: command not found')
73 s_no_convert = re.compile('^Can\'t exec "convert"')
74 s_no_ghostscript = re.compile('gs: command not found')
76 ##Ignore - Example of out above each ignore regex.
77 #NOTE: Commenting out an ignore will cause it to be
78 # logged as DEBUG with the logging module.
80 PL_STDERR_IGNORE_LIST = []
82 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
83 # About to analyse intensity file s_4_0101_sig2.txt
84 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
85 # Will send output to standard output
86 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
87 # Found 31877 clusters
88 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
89 # Will use quality criterion ((CHASTITY>=0.6)
90 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
91 # Quality criterion translated to (($F[5]>=0.6))
92 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
93 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
95 # opened s_4_0103_qhg.txt
96 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
97 # 81129 sequences out of 157651 passed filter criteria
98 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
101 def pl_stderr_ignore(line):
103 Searches lines for lines to ignore (i.e. not to log)
105 returns True if line should be ignored
106 returns False if line should NOT be ignored
108 for s in PL_STDERR_IGNORE_LIST:
114 def config_stdout_handler(line, conf_info):
116 Processes each line of output from GOAT
117 and stores useful information using the logging module
119 Loads useful information into conf_info as well, for future
120 use outside the function.
122 returns True if found condition that signifies success.
125 # Skip irrelevant line (without logging)
126 if s_skip.search(line):
129 # Detect invalid command-line arguments
130 elif s_invalid_cmdline.search(line):
131 logging.error("Invalid commandline options!")
133 # Detect starting of configuration
134 elif s_start.search(line):
135 logging.info('START: Configuring pipeline')
137 # Detect it made it past invalid arguments
138 elif s_gerald.search(line):
139 logging.info('Running make now')
141 # Detect that make files have been generated (based on output)
142 elif s_generating.search(line):
143 logging.info('Make files generted')
146 # Capture run directory
147 elif s_seq_folder.search(line):
148 mo = s_seq_folder_sub.search(line)
149 #Output changed when using --tiles=<tiles>
150 # at least in pipeline v0.3.0b2
152 firecrest_bustard_gerald_makefile = line[mo.end():]
153 firecrest_bustard_gerald, junk = \
154 os.path.split(firecrest_bustard_gerald_makefile)
155 firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
156 firecrest, junk = os.path.split(firecrest_bustard)
158 conf_info.bustard_path = firecrest_bustard
159 conf_info.run_path = firecrest
161 #Standard output handling
163 print 'Sequence line:', line
164 mo = s_seq_folder.search(line)
165 conf_info.bustard_path = line[mo.end():]
166 conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
168 # Log all other output for debugging purposes
170 logging.warning('CONF:?: %s' % (line))
176 def config_stderr_handler(line, conf_info):
178 Processes each line of output from GOAT
179 and stores useful information using the logging module
181 Loads useful information into conf_info as well, for future
182 use outside the function.
184 returns RUN_ABORT upon detecting failure;
185 True on success message;
186 False if neutral message
187 (i.e. doesn't signify failure or success)
190 # Detect invalid species directory error
191 if s_species_dir_err.search(line):
194 # Detect goat_pipeline.py traceback
195 elif s_goat_traceb.search(line):
196 logging.error("Goat config script died, traceback in debug output")
198 # Detect indication of successful configuration (from stderr; odd, but ok)
199 elif s_stderr_taskcomplete.search(line):
200 logging.info('Configure step successful (from: stderr)')
202 # Log all other output as debug output
204 logging.debug('CONF:STDERR:?: %s' % (line))
206 # Neutral (not failure; nor success)
210 #FIXME: Temperary hack
211 f = open('pipeline_run.log', 'w')
212 #ferr = open('pipeline_err.log', 'w')
216 def pipeline_stdout_handler(line, conf_info):
218 Processes each line of output from running the pipeline
219 and stores useful information using the logging module
221 Loads useful information into conf_info as well, for future
222 use outside the function.
224 returns True if found condition that signifies success.
233 def pipeline_stderr_handler(line, conf_info):
237 if pl_stderr_ignore(line):
239 elif s_make_error.search(line):
240 logging.error("make error detected; run failed")
242 elif s_no_gnuplot.search(line):
243 logging.error("gnuplot not found")
245 elif s_no_convert.search(line):
246 logging.error("imagemagick's convert command not found")
248 elif s_no_ghostscript.search(line):
249 logging.error("ghostscript not found")
252 logging.debug('PIPE:STDERR:?: %s' % (line))
257 def configure(conf_info):
259 Attempts to configure the GA pipeline using goat.
261 Uses logging module to store information about status.
263 returns True if configuration successful, otherwise False.
266 #pipe = subprocess.Popen(['goat_pipeline.py',
267 # '--GERALD=config32bk.txt',
270 # stdout=subprocess.PIPE,
271 # stderr=subprocess.PIPE)
273 #ERROR Test (2), causes goat_pipeline.py traceback
274 #pipe = subprocess.Popen(['goat_pipeline.py',
275 # '--GERALD=%s' % (conf_info.config_filepath),
276 # '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
279 # stdout=subprocess.PIPE,
280 # stderr=subprocess.PIPE)
282 ##########################
283 # Run configuration step
284 # Not a test; actual configure attempt.
285 #pipe = subprocess.Popen(['goat_pipeline.py',
286 # '--GERALD=%s' % (conf_info.config_filepath),
289 # stdout=subprocess.PIPE,
290 # stderr=subprocess.PIPE)
293 #FIXME: this only does a run on 5 tiles on lane 4
294 pipe = subprocess.Popen(['goat_pipeline.py',
295 '--GERALD=%s' % (conf_info.config_filepath),
296 '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
299 stdout=subprocess.PIPE,
300 stderr=subprocess.PIPE)
303 stdout_line = pipe.stdout.readline()
306 while stdout_line != '':
308 if config_stdout_handler(stdout_line, conf_info):
310 stdout_line = pipe.stdout.readline()
313 error_code = pipe.wait()
315 logging.error('Recieved error_code: %s' % (error_code))
317 logging.info('We are go for launch!')
320 stderr_line = pipe.stderr.readline()
323 stderr_success = False
324 while stderr_line != '':
325 stderr_status = config_stderr_handler(stderr_line, conf_info)
326 if stderr_status == RUN_ABORT:
328 elif stderr_status is True:
329 stderr_success = True
330 stderr_line = pipe.stderr.readline()
333 #Success requirements:
334 # 1) The stdout completed without error
335 # 2) The program exited with status 0
336 # 3) No errors found in stdout
337 print '#Expect: True, False, True, True'
338 print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
339 status = complete is True and \
340 bool(error_code) is False and \
341 abort != RUN_ABORT and \
342 stderr_success is True
344 # If everything was successful, but for some reason
345 # we didn't retrieve the path info, log it.
347 if conf_info.bustard_path is None or conf_info.run_path is None:
348 logging.error("Failed to retrieve run_path")
354 def run_pipeline(conf_info):
356 Run the pipeline and monitor status.
358 # Fail if the run_path doesn't actually exist
359 if not os.path.exists(conf_info.run_path):
360 logging.error('Run path does not exist: %s' \
361 % (conf_info.run_path))
364 # Change cwd to run_path
365 os.chdir(conf_info.run_path)
367 # Monitor file creation
369 mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
370 notifier = ThreadedNotifier(wm, RunEvent())
372 wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
374 # Log pipeline starting
375 logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
377 # Start the pipeline (and hide!)
378 #pipe = subprocess.Popen(['make',
381 # stdout=subprocess.PIPE,
382 # stderr=subprocess.PIPE)
384 fout = open('pipeline_run_stdout.txt', 'w')
385 ferr = open('pipeline_run_stderr.txt', 'w')
387 pipe = subprocess.Popen(['make',
393 retcode = pipe.wait()
400 #print ': %s' % (sts)
402 status = (retcode == 0)
408 if __name__ == '__main__':
410 ci.config_filepath = 'config32bk.txt'
412 status = configure(ci)
414 print "Configure success"
416 print "Configure failed"
418 print 'Run Dir:', ci.run_path
419 print 'Bustard Dir:', ci.bustard_path
422 print 'Running pipeline now!'
423 run_status = run_pipeline(ci)
424 if run_status is True:
425 print 'Pipeline ran successfully.'
427 print 'Pipeline run failed.'
429 #FIXME: Temperary hack