[project @ Minor cleanup patch]
[htsworkflow.git] / bin / config_pipeline2.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from pyinotify import WatchManager, ThreadedNotifier
9 from pyinotify import EventsCodes, ProcessEvent
10
11 logging.basicConfig(level=logging.DEBUG,
12                     format='%(asctime)s %(levelname)-8s %(message)s',
13                     datefmt='%a, %d %b %Y %H:%M:%S',
14                     filename='pipeline_main.log',
15                     filemode='w')
16
17 class ConfigInfo:
18   
19   def __init__(self):
20     self.run_path = None
21     self.bustard_path = None
22     self.config_filepath = None
23
24
25 ####################################
26 # inotify event processor
27
28 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
29 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
30 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
31
32 class RunEvent(ProcessEvent):
33
34   def __init__(self):
35
36     self.run_status_dict = {'firecrest': False,
37                             'bustard': False,
38                             'gerald': False}
39
40     ProcessEvent.__init__(self)
41
42   def process_IN_CREATE(self, event):
43     fullpath = os.path.join(event.path, event.name)
44     if s_finished.search(fullpath):
45       logging.info("File Found: %s" % (fullpath))
46
47       if s_firecrest_finished.search(fullpath):
48         self.run_status_dict['firecrest'] = True
49       elif s_bustard_finished.search(fullpath):
50         self.run_status_dict['bustard'] = True
51       elif s_gerald_finished.search(fullpath):
52         self.run_status_dict['gerald'] = True
53       
54     print "Create: %s" % (os.path.join(event.path, event.name))
55
56   def process_IN_DELETE(self, event):
57     print "Remove %s" % (os.path.join(event.path, event.name))
58
59 #FLAGS
60 # Config Step Error
61 RUN_ABORT = 'abort'
62 # Run Step Error
63 RUN_FAILED = 'failed'
64
65
66 #####################################
67 # Configure Step (goat_pipeline.py)
68 #Info
69 s_start = re.compile('Starting Genome Analyzer Pipeline')
70 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
71 s_generating = re.compile('Generating journals, Makefiles and parameter files')
72 s_seq_folder = re.compile('^Sequence folder: ')
73 s_seq_folder_sub = re.compile('want to make ')
74 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
75
76 #Errors
77 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
78 s_species_dir_err = re.compile('Error: Lane [1-8]:')
79 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
80
81
82 ##Ignore - Example of out above each ignore regex.
83 #NOTE: Commenting out an ignore will cause it to be
84 # logged as DEBUG with the logging module.
85 #CF_STDERR_IGNORE_LIST = []
86 s_skip = re.compile('s_[0-8]_[0-9]+')
87
88
89 ##########################################
90 # Pipeline Run Step (make -j8 recursive)
91
92 ##Info
93 s_finished = re.compile('finished')
94
95 ##Errors
96 s_make_error = re.compile('^make[\S\s]+Error')
97 s_no_gnuplot = re.compile('gnuplot: command not found')
98 s_no_convert = re.compile('^Can\'t exec "convert"')
99 s_no_ghostscript = re.compile('gs: command not found')
100
101 ##Ignore - Example of out above each ignore regex.
102 #NOTE: Commenting out an ignore will cause it to be
103 # logged as DEBUG with the logging module.
104 #
105 PL_STDERR_IGNORE_LIST = []
106 # Info: PF 11802
107 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
108 # About to analyse intensity file s_4_0101_sig2.txt
109 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
110 # Will send output to standard output
111 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
112 # Found 31877 clusters
113 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
114 # Will use quality criterion ((CHASTITY>=0.6)
115 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
116 # Quality criterion translated to (($F[5]>=0.6))
117 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
118 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
119 #  AND
120 # opened s_4_0103_qhg.txt
121 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
122 # 81129 sequences out of 157651 passed filter criteria
123 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
124
125
126 def pl_stderr_ignore(line):
127   """
128   Searches lines for lines to ignore (i.e. not to log)
129
130   returns True if line should be ignored
131   returns False if line should NOT be ignored
132   """
133   for s in PL_STDERR_IGNORE_LIST:
134     if s.search(line):
135       return True
136   return False
137
138
139 def config_stdout_handler(line, conf_info):
140   """
141   Processes each line of output from GOAT
142   and stores useful information using the logging module
143
144   Loads useful information into conf_info as well, for future
145   use outside the function.
146
147   returns True if found condition that signifies success.
148   """
149
150   # Skip irrelevant line (without logging)
151   if s_skip.search(line):
152     pass
153
154   # Detect invalid command-line arguments
155   elif s_invalid_cmdline.search(line):
156     logging.error("Invalid commandline options!")
157
158   # Detect starting of configuration
159   elif s_start.search(line):
160     logging.info('START: Configuring pipeline')
161
162   # Detect it made it past invalid arguments
163   elif s_gerald.search(line):
164     logging.info('Running make now')
165
166   # Detect that make files have been generated (based on output)
167   elif s_generating.search(line):
168     logging.info('Make files generted')
169     return True
170
171   # Capture run directory
172   elif s_seq_folder.search(line):
173     mo = s_seq_folder_sub.search(line)
174     #Output changed when using --tiles=<tiles>
175     # at least in pipeline v0.3.0b2
176     if mo:
177       firecrest_bustard_gerald_makefile = line[mo.end():]
178       firecrest_bustard_gerald, junk = \
179                                 os.path.split(firecrest_bustard_gerald_makefile)
180       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
181       firecrest, junk = os.path.split(firecrest_bustard)
182
183       conf_info.bustard_path = firecrest_bustard
184       conf_info.run_path = firecrest
185     
186     #Standard output handling
187     else:
188       print 'Sequence line:', line
189       mo = s_seq_folder.search(line)
190       conf_info.bustard_path = line[mo.end():]
191       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
192
193   # Log all other output for debugging purposes
194   else:
195     logging.warning('CONF:?: %s' % (line))
196
197   return False
198
199
200
201 def config_stderr_handler(line, conf_info):
202   """
203   Processes each line of output from GOAT
204   and stores useful information using the logging module
205
206   Loads useful information into conf_info as well, for future
207   use outside the function.
208
209   returns RUN_ABORT upon detecting failure;
210           True on success message;
211           False if neutral message
212             (i.e. doesn't signify failure or success)
213   """
214
215   # Detect invalid species directory error
216   if s_species_dir_err.search(line):
217     logging.error(line)
218     return RUN_ABORT
219   # Detect goat_pipeline.py traceback
220   elif s_goat_traceb.search(line):
221     logging.error("Goat config script died, traceback in debug output")
222     return RUN_ABORT
223   # Detect indication of successful configuration (from stderr; odd, but ok)
224   elif s_stderr_taskcomplete.search(line):
225     logging.info('Configure step successful (from: stderr)')
226     return True
227   # Log all other output as debug output
228   else:
229     logging.debug('CONF:STDERR:?: %s' % (line))
230
231   # Neutral (not failure; nor success)
232   return False
233
234
235 #def pipeline_stdout_handler(line, conf_info):
236 #  """
237 #  Processes each line of output from running the pipeline
238 #  and stores useful information using the logging module
239 #
240 #  Loads useful information into conf_info as well, for future
241 #  use outside the function.
242 #
243 #  returns True if found condition that signifies success.
244 #  """
245 #
246 #  #f.write(line + '\n')
247 #
248 #  return True
249
250
251
252 def pipeline_stderr_handler(line, conf_info):
253   """
254   Processes each line of stderr from pipelien run
255   and stores useful information using the logging module
256
257   ##FIXME: Future feature (doesn't actually do this yet)
258   #Loads useful information into conf_info as well, for future
259   #use outside the function.
260
261   returns RUN_FAILED upon detecting failure;
262           #True on success message; (no clear success state)
263           False if neutral message
264             (i.e. doesn't signify failure or success)
265   """
266
267   if pl_stderr_ignore(line):
268     pass
269   elif s_make_error.search(line):
270     logging.error("make error detected; run failed")
271     return RUN_FAILED
272   elif s_no_gnuplot.search(line):
273     logging.error("gnuplot not found")
274     return RUN_FAILED
275   elif s_no_convert.search(line):
276     logging.error("imagemagick's convert command not found")
277     return RUN_FAILED
278   elif s_no_ghostscript.search(line):
279     logging.error("ghostscript not found")
280     return RUN_FAILED
281   else:
282     logging.debug('PIPE:STDERR:?: %s' % (line))
283
284   return False
285
286
287 def configure(conf_info):
288   """
289   Attempts to configure the GA pipeline using goat.
290
291   Uses logging module to store information about status.
292
293   returns True if configuration successful, otherwise False.
294   """
295   #ERROR Test:
296   #pipe = subprocess.Popen(['goat_pipeline.py',
297   #                         '--GERALD=config32bk.txt',
298   #                         '--make .',],
299   #                         #'.'],
300   #                        stdout=subprocess.PIPE,
301   #                        stderr=subprocess.PIPE)
302
303   #ERROR Test (2), causes goat_pipeline.py traceback
304   #pipe = subprocess.Popen(['goat_pipeline.py',
305   #                  '--GERALD=%s' % (conf_info.config_filepath),
306   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
307   #                         '--make',
308   #                         '.'],
309   #                        stdout=subprocess.PIPE,
310   #                        stderr=subprocess.PIPE)
311
312   ##########################
313   # Run configuration step
314   #   Not a test; actual configure attempt.
315   #pipe = subprocess.Popen(['goat_pipeline.py',
316   #                  '--GERALD=%s' % (conf_info.config_filepath),
317   #                         '--make',
318   #                         '.'],
319   #                        stdout=subprocess.PIPE,
320   #                        stderr=subprocess.PIPE)
321
322   # CONTINUE HERE
323   #FIXME: this only does a run on 5 tiles on lane 4
324   pipe = subprocess.Popen(['goat_pipeline.py',
325                     '--GERALD=%s' % (conf_info.config_filepath),
326                            '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
327                            '--make',
328                            '.'],
329                           stdout=subprocess.PIPE,
330                           stderr=subprocess.PIPE)
331   ##################
332   # Process stdout
333   stdout_line = pipe.stdout.readline()
334
335   complete = False
336   while stdout_line != '':
337     # Handle stdout
338     if config_stdout_handler(stdout_line, conf_info):
339       complete = True
340     stdout_line = pipe.stdout.readline()
341
342
343   error_code = pipe.wait()
344   if error_code:
345     logging.error('Recieved error_code: %s' % (error_code))
346   else:
347     logging.info('We are go for launch!')
348
349   #Process stderr
350   stderr_line = pipe.stderr.readline()
351
352   abort = 'NO!'
353   stderr_success = False
354   while stderr_line != '':
355     stderr_status = config_stderr_handler(stderr_line, conf_info)
356     if stderr_status == RUN_ABORT:
357       abort = RUN_ABORT
358     elif stderr_status is True:
359       stderr_success = True
360     stderr_line = pipe.stderr.readline()
361
362
363   #Success requirements:
364   # 1) The stdout completed without error
365   # 2) The program exited with status 0
366   # 3) No errors found in stdout
367   print '#Expect: True, False, True, True'
368   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
369   status = complete is True and \
370            bool(error_code) is False and \
371            abort != RUN_ABORT and \
372            stderr_success is True
373
374   # If everything was successful, but for some reason
375   #  we didn't retrieve the path info, log it.
376   if status is True:
377     if conf_info.bustard_path is None or conf_info.run_path is None:
378       logging.error("Failed to retrieve run_path")
379       return False
380   
381   return status
382
383
384 def run_pipeline(conf_info):
385   """
386   Run the pipeline and monitor status.
387   """
388   # Fail if the run_path doesn't actually exist
389   if not os.path.exists(conf_info.run_path):
390     logging.error('Run path does not exist: %s' \
391               % (conf_info.run_path))
392     return False
393
394   # Change cwd to run_path
395   os.chdir(conf_info.run_path)
396   stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
397   stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
398
399   # Monitor file creation
400   wm = WatchManager()
401   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
402   event = RunEvent()
403   notifier = ThreadedNotifier(wm, event)
404   notifier.start()
405   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
406
407   # Log pipeline starting
408   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
409   
410   # Start the pipeline (and hide!)
411   #pipe = subprocess.Popen(['make',
412   #                         '-j8',
413   #                         'recursive'],
414   #                        stdout=subprocess.PIPE,
415   #                        stderr=subprocess.PIPE)
416
417   fout = open(stdout_filepath, 'w')
418   ferr = open(stderr_filepath, 'w')
419
420   pipe = subprocess.Popen(['make',
421                              '-j8',
422                              'recursive'],
423                              stdout=fout,
424                              stderr=ferr)
425                              #shell=True)
426   # Wait for run to finish
427   retcode = pipe.wait()
428
429
430   # Clean up
431   notifier.stop()
432   fout.close()
433   ferr.close()
434
435   # Process stderr
436   ferr = open(stderr_filepath, 'r')
437
438   run_failed_stderr = False
439   for line in ferr:
440     err_status = pipeline_stderr_handler(line, conf_info)
441     if err_status == RUN_FAILED:
442       run_failed_stderr = True
443
444   ferr.close()
445
446   # Finished file check!
447   print 'RUN SUCCESS CHECK:'
448   for key, value in event.run_status_dict.items():
449     print '  %s: %s' % (key, value)
450
451   dstatus = event.run_status_dict
452
453   # Success or failure check
454   status = (retcode == 0) and \
455            run_failed_stderr is False and \
456            dstatus['firecrest'] is True and \
457            dstatus['bustard'] is True and \
458            dstatus['gerald'] is True
459
460   return status
461
462
463
464 if __name__ == '__main__':
465   ci = ConfigInfo()
466   ci.config_filepath = 'config32bk.txt'
467   
468   status = configure(ci)
469   if status:
470     print "Configure success"
471   else:
472     print "Configure failed"
473
474   print 'Run Dir:', ci.run_path
475   print 'Bustard Dir:', ci.bustard_path
476   
477   if status:
478     print 'Running pipeline now!'
479     run_status = run_pipeline(ci)
480     if run_status is True:
481       print 'Pipeline ran successfully.'
482     else:
483       print 'Pipeline run failed.'
484