[project @ Method 2 for monitoring pipeline run (based on config_pipeline.py]
[htsworkflow.git] / bin / config_pipeline2.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from pyinotify import WatchManager, ThreadedNotifier
9 from pyinotify import EventsCodes, ProcessEvent
10
11 logging.basicConfig(level=logging.DEBUG,
12                     format='%(asctime)s %(levelname)-8s %(message)s',
13                     datefmt='%a, %d %b %Y %H:%M:%S',
14                     filename='pipeline_main.log',
15                     filemode='w')
16
17 class ConfigInfo:
18   
19   def __init__(self):
20     self.run_path = None
21     self.bustard_path = None
22     self.config_filepath = None
23
24
25 class RunEvent(ProcessEvent):
26
27   def process_IN_CREATE(self, event):
28     fullpath = os.path.join(event.path, event.name)
29     if s_finished.search(fullpath):
30       logging.info("File Found: %s" % (fullpath))
31     print "Create: %s" % (os.path.join(event.path, event.name))
32
33   def process_IN_DELETE(self, event):
34     print "Remove %s" % (os.path.join(event.path, event.name))
35
36 #FLAGS
37 RUN_ABORT = 'abort'
38 RUN_FAILED = 'failed'
39
40
41 #####################################
42 # Configure Step (goat_pipeline.py)
43 #Info
44 s_start = re.compile('Starting Genome Analyzer Pipeline')
45 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
46 s_generating = re.compile('Generating journals, Makefiles and parameter files')
47 s_seq_folder = re.compile('^Sequence folder: ')
48 s_seq_folder_sub = re.compile('want to make ')
49 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
50
51 #Errors
52 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
53 s_species_dir_err = re.compile('Error: Lane [1-8]:')
54 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
55
56
57 ##Ignore - Example of out above each ignore regex.
58 #NOTE: Commenting out an ignore will cause it to be
59 # logged as DEBUG with the logging module.
60 #CF_STDERR_IGNORE_LIST = []
61 s_skip = re.compile('s_[0-8]_[0-9]+')
62
63
64 ##########################################
65 # Pipeline Run Step (make -j8 recursive)
66
67 ##Info
68 s_finished = re.compile('finished')
69
70 ##Errors
71 s_make_error = re.compile('^make[\S\s]+Error')
72 s_no_gnuplot = re.compile('gnuplot: command not found')
73 s_no_convert = re.compile('^Can\'t exec "convert"')
74 s_no_ghostscript = re.compile('gs: command not found')
75
76 ##Ignore - Example of out above each ignore regex.
77 #NOTE: Commenting out an ignore will cause it to be
78 # logged as DEBUG with the logging module.
79 #
80 PL_STDERR_IGNORE_LIST = []
81 # Info: PF 11802
82 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
83 # About to analyse intensity file s_4_0101_sig2.txt
84 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
85 # Will send output to standard output
86 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
87 # Found 31877 clusters
88 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
89 # Will use quality criterion ((CHASTITY>=0.6)
90 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
91 # Quality criterion translated to (($F[5]>=0.6))
92 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
93 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
94 #  AND
95 # opened s_4_0103_qhg.txt
96 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
97 # 81129 sequences out of 157651 passed filter criteria
98 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
99
100
101 def pl_stderr_ignore(line):
102   """
103   Searches lines for lines to ignore (i.e. not to log)
104
105   returns True if line should be ignored
106   returns False if line should NOT be ignored
107   """
108   for s in PL_STDERR_IGNORE_LIST:
109     if s.search(line):
110       return True
111   return False
112
113
114 def config_stdout_handler(line, conf_info):
115   """
116   Processes each line of output from GOAT
117   and stores useful information using the logging module
118
119   Loads useful information into conf_info as well, for future
120   use outside the function.
121
122   returns True if found condition that signifies success.
123   """
124
125   # Skip irrelevant line (without logging)
126   if s_skip.search(line):
127     pass
128
129   # Detect invalid command-line arguments
130   elif s_invalid_cmdline.search(line):
131     logging.error("Invalid commandline options!")
132
133   # Detect starting of configuration
134   elif s_start.search(line):
135     logging.info('START: Configuring pipeline')
136
137   # Detect it made it past invalid arguments
138   elif s_gerald.search(line):
139     logging.info('Running make now')
140
141   # Detect that make files have been generated (based on output)
142   elif s_generating.search(line):
143     logging.info('Make files generted')
144     return True
145
146   # Capture run directory
147   elif s_seq_folder.search(line):
148     mo = s_seq_folder_sub.search(line)
149     #Output changed when using --tiles=<tiles>
150     # at least in pipeline v0.3.0b2
151     if mo:
152       firecrest_bustard_gerald_makefile = line[mo.end():]
153       firecrest_bustard_gerald, junk = \
154                                 os.path.split(firecrest_bustard_gerald_makefile)
155       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
156       firecrest, junk = os.path.split(firecrest_bustard)
157
158       conf_info.bustard_path = firecrest_bustard
159       conf_info.run_path = firecrest
160     
161     #Standard output handling
162     else:
163       print 'Sequence line:', line
164       mo = s_seq_folder.search(line)
165       conf_info.bustard_path = line[mo.end():]
166       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
167
168   # Log all other output for debugging purposes
169   else:
170     logging.warning('CONF:?: %s' % (line))
171
172   return False
173
174
175
176 def config_stderr_handler(line, conf_info):
177   """
178   Processes each line of output from GOAT
179   and stores useful information using the logging module
180
181   Loads useful information into conf_info as well, for future
182   use outside the function.
183
184   returns RUN_ABORT upon detecting failure;
185           True on success message;
186           False if neutral message
187             (i.e. doesn't signify failure or success)
188   """
189
190   # Detect invalid species directory error
191   if s_species_dir_err.search(line):
192     logging.error(line)
193     return RUN_ABORT
194   # Detect goat_pipeline.py traceback
195   elif s_goat_traceb.search(line):
196     logging.error("Goat config script died, traceback in debug output")
197     return RUN_ABORT
198   # Detect indication of successful configuration (from stderr; odd, but ok)
199   elif s_stderr_taskcomplete.search(line):
200     logging.info('Configure step successful (from: stderr)')
201     return True
202   # Log all other output as debug output
203   else:
204     logging.debug('CONF:STDERR:?: %s' % (line))
205
206   # Neutral (not failure; nor success)
207   return False
208
209
210 #FIXME: Temperary hack
211 f = open('pipeline_run.log', 'w')
212 #ferr = open('pipeline_err.log', 'w')
213
214
215
216 def pipeline_stdout_handler(line, conf_info):
217   """
218   Processes each line of output from running the pipeline
219   and stores useful information using the logging module
220
221   Loads useful information into conf_info as well, for future
222   use outside the function.
223
224   returns True if found condition that signifies success.
225   """
226
227   f.write(line + '\n')
228
229   return True
230
231
232
233 def pipeline_stderr_handler(line, conf_info):
234   """
235   """
236
237   if pl_stderr_ignore(line):
238     pass
239   elif s_make_error.search(line):
240     logging.error("make error detected; run failed")
241     return RUN_FAILED
242   elif s_no_gnuplot.search(line):
243     logging.error("gnuplot not found")
244     return RUN_FAILED
245   elif s_no_convert.search(line):
246     logging.error("imagemagick's convert command not found")
247     return RUN_FAILED
248   elif s_no_ghostscript.search(line):
249     logging.error("ghostscript not found")
250     return RUN_FAILED
251   else:
252     logging.debug('PIPE:STDERR:?: %s' % (line))
253
254   return False
255
256
257 def configure(conf_info):
258   """
259   Attempts to configure the GA pipeline using goat.
260
261   Uses logging module to store information about status.
262
263   returns True if configuration successful, otherwise False.
264   """
265   #ERROR Test:
266   #pipe = subprocess.Popen(['goat_pipeline.py',
267   #                         '--GERALD=config32bk.txt',
268   #                         '--make .',],
269   #                         #'.'],
270   #                        stdout=subprocess.PIPE,
271   #                        stderr=subprocess.PIPE)
272
273   #ERROR Test (2), causes goat_pipeline.py traceback
274   #pipe = subprocess.Popen(['goat_pipeline.py',
275   #                  '--GERALD=%s' % (conf_info.config_filepath),
276   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
277   #                         '--make',
278   #                         '.'],
279   #                        stdout=subprocess.PIPE,
280   #                        stderr=subprocess.PIPE)
281
282   ##########################
283   # Run configuration step
284   #   Not a test; actual configure attempt.
285   #pipe = subprocess.Popen(['goat_pipeline.py',
286   #                  '--GERALD=%s' % (conf_info.config_filepath),
287   #                         '--make',
288   #                         '.'],
289   #                        stdout=subprocess.PIPE,
290   #                        stderr=subprocess.PIPE)
291
292   # CONTINUE HERE
293   #FIXME: this only does a run on 5 tiles on lane 4
294   pipe = subprocess.Popen(['goat_pipeline.py',
295                     '--GERALD=%s' % (conf_info.config_filepath),
296                            '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
297                            '--make',
298                            '.'],
299                           stdout=subprocess.PIPE,
300                           stderr=subprocess.PIPE)
301   ##################
302   # Process stdout
303   stdout_line = pipe.stdout.readline()
304
305   complete = False
306   while stdout_line != '':
307     # Handle stdout
308     if config_stdout_handler(stdout_line, conf_info):
309       complete = True
310     stdout_line = pipe.stdout.readline()
311
312
313   error_code = pipe.wait()
314   if error_code:
315     logging.error('Recieved error_code: %s' % (error_code))
316   else:
317     logging.info('We are go for launch!')
318
319   #Process stderr
320   stderr_line = pipe.stderr.readline()
321
322   abort = 'NO!'
323   stderr_success = False
324   while stderr_line != '':
325     stderr_status = config_stderr_handler(stderr_line, conf_info)
326     if stderr_status == RUN_ABORT:
327       abort = RUN_ABORT
328     elif stderr_status is True:
329       stderr_success = True
330     stderr_line = pipe.stderr.readline()
331
332
333   #Success requirements:
334   # 1) The stdout completed without error
335   # 2) The program exited with status 0
336   # 3) No errors found in stdout
337   print '#Expect: True, False, True, True'
338   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
339   status = complete is True and \
340            bool(error_code) is False and \
341            abort != RUN_ABORT and \
342            stderr_success is True
343
344   # If everything was successful, but for some reason
345   #  we didn't retrieve the path info, log it.
346   if status is True:
347     if conf_info.bustard_path is None or conf_info.run_path is None:
348       logging.error("Failed to retrieve run_path")
349       return False
350   
351   return status
352
353
354 def run_pipeline(conf_info):
355   """
356   Run the pipeline and monitor status.
357   """
358   # Fail if the run_path doesn't actually exist
359   if not os.path.exists(conf_info.run_path):
360     logging.error('Run path does not exist: %s' \
361               % (conf_info.run_path))
362     return False
363
364   # Change cwd to run_path
365   os.chdir(conf_info.run_path)
366
367   # Monitor file creation
368   wm = WatchManager()
369   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
370   notifier = ThreadedNotifier(wm, RunEvent())
371   notifier.start()
372   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
373
374   # Log pipeline starting
375   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
376   
377   # Start the pipeline (and hide!)
378   #pipe = subprocess.Popen(['make',
379   #                         '-j8',
380   #                         'recursive'],
381   #                        stdout=subprocess.PIPE,
382   #                        stderr=subprocess.PIPE)
383
384   fout = open('pipeline_run_stdout.txt', 'w')
385   ferr = open('pipeline_run_stderr.txt', 'w')
386
387   pipe = subprocess.Popen(['make',
388                              '-j8',
389                              'recursive'],
390                              stdout=fout,
391                              stderr=ferr)
392                              #shell=True)
393   retcode = pipe.wait()
394
395   notifier.stop()
396
397   fout.close()
398   ferr.close()
399
400   #print ': %s' % (sts)
401     
402   status = (retcode == 0)
403
404   return status
405
406
407
408 if __name__ == '__main__':
409   ci = ConfigInfo()
410   ci.config_filepath = 'config32bk.txt'
411   
412   status = configure(ci)
413   if status:
414     print "Configure success"
415   else:
416     print "Configure failed"
417
418   print 'Run Dir:', ci.run_path
419   print 'Bustard Dir:', ci.bustard_path
420   
421   if status:
422     print 'Running pipeline now!'
423     run_status = run_pipeline(ci)
424     if run_status is True:
425       print 'Pipeline ran successfully.'
426     else:
427       print 'Pipeline run failed.'
428
429   #FIXME: Temperary hack
430   f.close()