[project @ Proof of concept!!!]
[htsworkflow.git] / bin / config_pipeline2.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from pyinotify import WatchManager, ThreadedNotifier
9 from pyinotify import EventsCodes, ProcessEvent
10
11 logging.basicConfig(level=logging.DEBUG,
12                     format='%(asctime)s %(levelname)-8s %(message)s',
13                     datefmt='%a, %d %b %Y %H:%M:%S',
14                     filename='pipeline_main.log',
15                     filemode='w')
16
17 class ConfigInfo:
18   
19   def __init__(self):
20     self.run_path = None
21     self.bustard_path = None
22     self.config_filepath = None
23
24
25 ####################################
26 # inotify event processor
27
28 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
29 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
30 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
31
32 class RunEvent(ProcessEvent):
33
34   def __init__(self):
35
36     self.run_status_dict = {'firecrest': False,
37                             'bustard': False,
38                             'gerald': False}
39
40     ProcessEvent.__init__(self)
41
42   def process_IN_CREATE(self, event):
43     fullpath = os.path.join(event.path, event.name)
44     if s_finished.search(fullpath):
45       logging.info("File Found: %s" % (fullpath))
46
47       if s_firecrest_finished.search(fullpath):
48         self.run_status_dict['firecrest'] = True
49       elif s_bustard_finished.search(fullpath):
50         self.run_status_dict['bustard'] = True
51       elif s_gerald_finished.search(fullpath):
52         self.run_status_dict['gerald'] = True
53       
54     print "Create: %s" % (os.path.join(event.path, event.name))
55
56   def process_IN_DELETE(self, event):
57     print "Remove %s" % (os.path.join(event.path, event.name))
58
59 #FLAGS
60 # Config Step Error
61 RUN_ABORT = 'abort'
62 # Run Step Error
63 RUN_FAILED = 'failed'
64
65
66 #####################################
67 # Configure Step (goat_pipeline.py)
68 #Info
69 s_start = re.compile('Starting Genome Analyzer Pipeline')
70 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
71 s_generating = re.compile('Generating journals, Makefiles and parameter files')
72 s_seq_folder = re.compile('^Sequence folder: ')
73 s_seq_folder_sub = re.compile('want to make ')
74 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
75
76 #Errors
77 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
78 s_species_dir_err = re.compile('Error: Lane [1-8]:')
79 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
80
81
82 ##Ignore - Example of out above each ignore regex.
83 #NOTE: Commenting out an ignore will cause it to be
84 # logged as DEBUG with the logging module.
85 #CF_STDERR_IGNORE_LIST = []
86 s_skip = re.compile('s_[0-8]_[0-9]+')
87
88
89 ##########################################
90 # Pipeline Run Step (make -j8 recursive)
91
92 ##Info
93 s_finished = re.compile('finished')
94
95 ##Errors
96 s_make_error = re.compile('^make[\S\s]+Error')
97 s_no_gnuplot = re.compile('gnuplot: command not found')
98 s_no_convert = re.compile('^Can\'t exec "convert"')
99 s_no_ghostscript = re.compile('gs: command not found')
100
101 ##Ignore - Example of out above each ignore regex.
102 #NOTE: Commenting out an ignore will cause it to be
103 # logged as DEBUG with the logging module.
104 #
105 PL_STDERR_IGNORE_LIST = []
106 # Info: PF 11802
107 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
108 # About to analyse intensity file s_4_0101_sig2.txt
109 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
110 # Will send output to standard output
111 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
112 # Found 31877 clusters
113 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
114 # Will use quality criterion ((CHASTITY>=0.6)
115 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
116 # Quality criterion translated to (($F[5]>=0.6))
117 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
118 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
119 #  AND
120 # opened s_4_0103_qhg.txt
121 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
122 # 81129 sequences out of 157651 passed filter criteria
123 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
124
125
126 def pl_stderr_ignore(line):
127   """
128   Searches lines for lines to ignore (i.e. not to log)
129
130   returns True if line should be ignored
131   returns False if line should NOT be ignored
132   """
133   for s in PL_STDERR_IGNORE_LIST:
134     if s.search(line):
135       return True
136   return False
137
138
139 def config_stdout_handler(line, conf_info):
140   """
141   Processes each line of output from GOAT
142   and stores useful information using the logging module
143
144   Loads useful information into conf_info as well, for future
145   use outside the function.
146
147   returns True if found condition that signifies success.
148   """
149
150   # Skip irrelevant line (without logging)
151   if s_skip.search(line):
152     pass
153
154   # Detect invalid command-line arguments
155   elif s_invalid_cmdline.search(line):
156     logging.error("Invalid commandline options!")
157
158   # Detect starting of configuration
159   elif s_start.search(line):
160     logging.info('START: Configuring pipeline')
161
162   # Detect it made it past invalid arguments
163   elif s_gerald.search(line):
164     logging.info('Running make now')
165
166   # Detect that make files have been generated (based on output)
167   elif s_generating.search(line):
168     logging.info('Make files generted')
169     return True
170
171   # Capture run directory
172   elif s_seq_folder.search(line):
173     mo = s_seq_folder_sub.search(line)
174     #Output changed when using --tiles=<tiles>
175     # at least in pipeline v0.3.0b2
176     if mo:
177       firecrest_bustard_gerald_makefile = line[mo.end():]
178       firecrest_bustard_gerald, junk = \
179                                 os.path.split(firecrest_bustard_gerald_makefile)
180       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
181       firecrest, junk = os.path.split(firecrest_bustard)
182
183       conf_info.bustard_path = firecrest_bustard
184       conf_info.run_path = firecrest
185     
186     #Standard output handling
187     else:
188       print 'Sequence line:', line
189       mo = s_seq_folder.search(line)
190       conf_info.bustard_path = line[mo.end():]
191       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
192
193   # Log all other output for debugging purposes
194   else:
195     logging.warning('CONF:?: %s' % (line))
196
197   return False
198
199
200
201 def config_stderr_handler(line, conf_info):
202   """
203   Processes each line of output from GOAT
204   and stores useful information using the logging module
205
206   Loads useful information into conf_info as well, for future
207   use outside the function.
208
209   returns RUN_ABORT upon detecting failure;
210           True on success message;
211           False if neutral message
212             (i.e. doesn't signify failure or success)
213   """
214
215   # Detect invalid species directory error
216   if s_species_dir_err.search(line):
217     logging.error(line)
218     return RUN_ABORT
219   # Detect goat_pipeline.py traceback
220   elif s_goat_traceb.search(line):
221     logging.error("Goat config script died, traceback in debug output")
222     return RUN_ABORT
223   # Detect indication of successful configuration (from stderr; odd, but ok)
224   elif s_stderr_taskcomplete.search(line):
225     logging.info('Configure step successful (from: stderr)')
226     return True
227   # Log all other output as debug output
228   else:
229     logging.debug('CONF:STDERR:?: %s' % (line))
230
231   # Neutral (not failure; nor success)
232   return False
233
234
235 #FIXME: Temperary hack
236 #f = open('pipeline_run.log', 'w')
237 #ferr = open('pipeline_err.log', 'w')
238
239
240
241 #def pipeline_stdout_handler(line, conf_info):
242 #  """
243 #  Processes each line of output from running the pipeline
244 #  and stores useful information using the logging module
245 #
246 #  Loads useful information into conf_info as well, for future
247 #  use outside the function.
248 #
249 #  returns True if found condition that signifies success.
250 #  """
251 #
252 #  f.write(line + '\n')
253 #
254 #  return True
255
256
257
258 def pipeline_stderr_handler(line, conf_info):
259   """
260   Processes each line of stderr from pipelien run
261   and stores useful information using the logging module
262
263   ##FIXME: Future feature (doesn't actually do this yet)
264   #Loads useful information into conf_info as well, for future
265   #use outside the function.
266
267   returns RUN_FAILED upon detecting failure;
268           #True on success message; (no clear success state)
269           False if neutral message
270             (i.e. doesn't signify failure or success)
271   """
272
273   if pl_stderr_ignore(line):
274     pass
275   elif s_make_error.search(line):
276     logging.error("make error detected; run failed")
277     return RUN_FAILED
278   elif s_no_gnuplot.search(line):
279     logging.error("gnuplot not found")
280     return RUN_FAILED
281   elif s_no_convert.search(line):
282     logging.error("imagemagick's convert command not found")
283     return RUN_FAILED
284   elif s_no_ghostscript.search(line):
285     logging.error("ghostscript not found")
286     return RUN_FAILED
287   else:
288     logging.debug('PIPE:STDERR:?: %s' % (line))
289
290   return False
291
292
293 def configure(conf_info):
294   """
295   Attempts to configure the GA pipeline using goat.
296
297   Uses logging module to store information about status.
298
299   returns True if configuration successful, otherwise False.
300   """
301   #ERROR Test:
302   #pipe = subprocess.Popen(['goat_pipeline.py',
303   #                         '--GERALD=config32bk.txt',
304   #                         '--make .',],
305   #                         #'.'],
306   #                        stdout=subprocess.PIPE,
307   #                        stderr=subprocess.PIPE)
308
309   #ERROR Test (2), causes goat_pipeline.py traceback
310   #pipe = subprocess.Popen(['goat_pipeline.py',
311   #                  '--GERALD=%s' % (conf_info.config_filepath),
312   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
313   #                         '--make',
314   #                         '.'],
315   #                        stdout=subprocess.PIPE,
316   #                        stderr=subprocess.PIPE)
317
318   ##########################
319   # Run configuration step
320   #   Not a test; actual configure attempt.
321   #pipe = subprocess.Popen(['goat_pipeline.py',
322   #                  '--GERALD=%s' % (conf_info.config_filepath),
323   #                         '--make',
324   #                         '.'],
325   #                        stdout=subprocess.PIPE,
326   #                        stderr=subprocess.PIPE)
327
328   # CONTINUE HERE
329   #FIXME: this only does a run on 5 tiles on lane 4
330   pipe = subprocess.Popen(['goat_pipeline.py',
331                     '--GERALD=%s' % (conf_info.config_filepath),
332                            '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
333                            '--make',
334                            '.'],
335                           stdout=subprocess.PIPE,
336                           stderr=subprocess.PIPE)
337   ##################
338   # Process stdout
339   stdout_line = pipe.stdout.readline()
340
341   complete = False
342   while stdout_line != '':
343     # Handle stdout
344     if config_stdout_handler(stdout_line, conf_info):
345       complete = True
346     stdout_line = pipe.stdout.readline()
347
348
349   error_code = pipe.wait()
350   if error_code:
351     logging.error('Recieved error_code: %s' % (error_code))
352   else:
353     logging.info('We are go for launch!')
354
355   #Process stderr
356   stderr_line = pipe.stderr.readline()
357
358   abort = 'NO!'
359   stderr_success = False
360   while stderr_line != '':
361     stderr_status = config_stderr_handler(stderr_line, conf_info)
362     if stderr_status == RUN_ABORT:
363       abort = RUN_ABORT
364     elif stderr_status is True:
365       stderr_success = True
366     stderr_line = pipe.stderr.readline()
367
368
369   #Success requirements:
370   # 1) The stdout completed without error
371   # 2) The program exited with status 0
372   # 3) No errors found in stdout
373   print '#Expect: True, False, True, True'
374   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
375   status = complete is True and \
376            bool(error_code) is False and \
377            abort != RUN_ABORT and \
378            stderr_success is True
379
380   # If everything was successful, but for some reason
381   #  we didn't retrieve the path info, log it.
382   if status is True:
383     if conf_info.bustard_path is None or conf_info.run_path is None:
384       logging.error("Failed to retrieve run_path")
385       return False
386   
387   return status
388
389
390 def run_pipeline(conf_info):
391   """
392   Run the pipeline and monitor status.
393   """
394   # Fail if the run_path doesn't actually exist
395   if not os.path.exists(conf_info.run_path):
396     logging.error('Run path does not exist: %s' \
397               % (conf_info.run_path))
398     return False
399
400   # Change cwd to run_path
401   os.chdir(conf_info.run_path)
402   stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
403   stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
404
405   # Monitor file creation
406   wm = WatchManager()
407   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
408   event = RunEvent()
409   notifier = ThreadedNotifier(wm, event)
410   notifier.start()
411   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
412
413   # Log pipeline starting
414   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
415   
416   # Start the pipeline (and hide!)
417   #pipe = subprocess.Popen(['make',
418   #                         '-j8',
419   #                         'recursive'],
420   #                        stdout=subprocess.PIPE,
421   #                        stderr=subprocess.PIPE)
422
423   fout = open(stdout_filepath, 'w')
424   ferr = open(stderr_filepath, 'w')
425
426   pipe = subprocess.Popen(['make',
427                              '-j8',
428                              'recursive'],
429                              stdout=fout,
430                              stderr=ferr)
431                              #shell=True)
432   # Wait for run to finish
433   retcode = pipe.wait()
434
435
436   # Clean up
437   notifier.stop()
438   fout.close()
439   ferr.close()
440
441   # Process stderr
442   ferr = open(stderr_filepath, 'r')
443
444   run_failed_stderr = False
445   for line in ferr:
446     err_status = pipeline_stderr_handler(line, conf_info)
447     if err_status == RUN_FAILED:
448       run_failed_stderr = True
449
450   ferr.close()
451
452   # Finished file check!
453   print 'RUN SUCCESS CHECK:'
454   for key, value in event.run_status_dict.items():
455     print '  %s: %s' % (key, value)
456
457   dstatus = event.run_status_dict
458
459   # Success or failure check
460   status = (retcode == 0) and \
461            run_failed_stderr is False and \
462            dstatus['firecrest'] is True and \
463            dstatus['bustard'] is True and \
464            dstatus['gerald'] is True
465
466   return status
467
468
469
470 if __name__ == '__main__':
471   ci = ConfigInfo()
472   ci.config_filepath = 'config32bk.txt'
473   
474   status = configure(ci)
475   if status:
476     print "Configure success"
477   else:
478     print "Configure failed"
479
480   print 'Run Dir:', ci.run_path
481   print 'Bustard Dir:', ci.bustard_path
482   
483   if status:
484     print 'Running pipeline now!'
485     run_status = run_pipeline(ci)
486     if run_status is True:
487       print 'Pipeline ran successfully.'
488     else:
489       print 'Pipeline run failed.'
490
491   #FIXME: Temperary hack
492   #f.close()