19e242615b0de6b1b4f71fda2fad5389675011a9
[htsworkflow.git] / gaworkflow / pipeline / configure_run.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile
9 from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404
10 from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
11 from gaworkflow.pipeline.run_status import GARunStatus
12
13 from pyinotify import WatchManager, ThreadedNotifier
14 from pyinotify import EventsCodes, ProcessEvent
15
16 logging.basicConfig(level=logging.DEBUG,
17                     format='%(asctime)s %(levelname)-8s %(message)s',
18                     datefmt='%a, %d %b %Y %H:%M:%S',
19                     filename='pipeline_main.log',
20                     filemode='w')
21
22 class ConfigInfo:
23   
24   def __init__(self):
25     self.run_path = None
26     self.bustard_path = None
27     self.config_filepath = None
28     self.status = None
29
30
31   def createStatusObject(self):
32     """
33     Creates a status object which can be queried for
34     status of running the pipeline
35
36     returns True if object created
37     returns False if object cannot be created
38     """
39     if self.config_filepath is None:
40       return False
41
42     self.status = GARunStatus(self.config_filepath)
43     return True
44
45
46
47 ####################################
48 # inotify event processor
49
50 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
51 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
52 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
53
54 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
55 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
56 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
57
58 class RunEvent(ProcessEvent):
59
60   def __init__(self, conf_info):
61
62     self.run_status_dict = {'firecrest': False,
63                             'bustard': False,
64                             'gerald': False}
65
66     self._ci = conf_info
67
68     ProcessEvent.__init__(self)
69     
70
71   def process_IN_CREATE(self, event):
72     fullpath = os.path.join(event.path, event.name)
73     if s_finished.search(fullpath):
74       logging.info("File Found: %s" % (fullpath))
75
76       if s_firecrest_finished.search(fullpath):
77         self.run_status_dict['firecrest'] = True
78         self._ci.status.updateFirecrest(event.name)
79       elif s_bustard_finished.search(fullpath):
80         self.run_status_dict['bustard'] = True
81         self._ci.status.updateBustard(event.name)
82       elif s_gerald_finished.search(fullpath):
83         self.run_status_dict['gerald'] = True
84         self._ci.status.updateGerald(event.name)
85
86     #WARNING: The following order is important!!
87     # Firecrest regex will catch all gerald, bustard, and firecrest
88     # Bustard regex will catch all gerald and bustard
89     # Gerald regex will catch all gerald
90     # So, order needs to be Gerald, Bustard, Firecrest, or this
91     #  won't work properly.
92     elif s_gerald_all.search(fullpath):
93       self._ci.status.updateGerald(event.name)
94     elif s_bustard_all.search(fullpath):
95       self._ci.status.updateBustard(event.name)
96     elif s_firecrest_all.search(fullpath):
97       self._ci.status.updateFirecrest(event.name)
98       
99     #print "Create: %s" % (os.path.join(event.path, event.name))
100
101   def process_IN_DELETE(self, event):
102     #print "Remove %s" % (os.path.join(event.path, event.name))
103     pass
104
105
106
107
108 #FLAGS
109 # Config Step Error
110 RUN_ABORT = 'abort'
111 # Run Step Error
112 RUN_FAILED = 'failed'
113
114
115 #####################################
116 # Configure Step (goat_pipeline.py)
117 #Info
118 s_start = re.compile('Starting Genome Analyzer Pipeline')
119 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
120 s_generating = re.compile('Generating journals, Makefiles and parameter files')
121 s_seq_folder = re.compile('^Sequence folder: ')
122 s_seq_folder_sub = re.compile('want to make ')
123 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
124
125 #Errors
126 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
127 s_species_dir_err = re.compile('Error: Lane [1-8]:')
128 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
129
130
131 ##Ignore - Example of out above each ignore regex.
132 #NOTE: Commenting out an ignore will cause it to be
133 # logged as DEBUG with the logging module.
134 #CF_STDERR_IGNORE_LIST = []
135 s_skip = re.compile('s_[0-8]_[0-9]+')
136
137
138 ##########################################
139 # Pipeline Run Step (make -j8 recursive)
140
141 ##Info
142 s_finished = re.compile('finished')
143
144 ##Errors
145 s_make_error = re.compile('^make[\S\s]+Error')
146 s_no_gnuplot = re.compile('gnuplot: command not found')
147 s_no_convert = re.compile('^Can\'t exec "convert"')
148 s_no_ghostscript = re.compile('gs: command not found')
149
150 ##Ignore - Example of out above each ignore regex.
151 #NOTE: Commenting out an ignore will cause it to be
152 # logged as DEBUG with the logging module.
153 #
154 PL_STDERR_IGNORE_LIST = []
155 # Info: PF 11802
156 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
157 # About to analyse intensity file s_4_0101_sig2.txt
158 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
159 # Will send output to standard output
160 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
161 # Found 31877 clusters
162 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
163 # Will use quality criterion ((CHASTITY>=0.6)
164 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
165 # Quality criterion translated to (($F[5]>=0.6))
166 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
167 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
168 #  AND
169 # opened s_4_0103_qhg.txt
170 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
171 # 81129 sequences out of 157651 passed filter criteria
172 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
173
174
175 def pl_stderr_ignore(line):
176   """
177   Searches lines for lines to ignore (i.e. not to log)
178
179   returns True if line should be ignored
180   returns False if line should NOT be ignored
181   """
182   for s in PL_STDERR_IGNORE_LIST:
183     if s.search(line):
184       return True
185   return False
186
187
188 def config_stdout_handler(line, conf_info):
189   """
190   Processes each line of output from GOAT
191   and stores useful information using the logging module
192
193   Loads useful information into conf_info as well, for future
194   use outside the function.
195
196   returns True if found condition that signifies success.
197   """
198
199   # Skip irrelevant line (without logging)
200   if s_skip.search(line):
201     pass
202
203   # Detect invalid command-line arguments
204   elif s_invalid_cmdline.search(line):
205     logging.error("Invalid commandline options!")
206
207   # Detect starting of configuration
208   elif s_start.search(line):
209     logging.info('START: Configuring pipeline')
210
211   # Detect it made it past invalid arguments
212   elif s_gerald.search(line):
213     logging.info('Running make now')
214
215   # Detect that make files have been generated (based on output)
216   elif s_generating.search(line):
217     logging.info('Make files generted')
218     return True
219
220   # Capture run directory
221   elif s_seq_folder.search(line):
222     mo = s_seq_folder_sub.search(line)
223     #Output changed when using --tiles=<tiles>
224     # at least in pipeline v0.3.0b2
225     if mo:
226       firecrest_bustard_gerald_makefile = line[mo.end():]
227       firecrest_bustard_gerald, junk = \
228                                 os.path.split(firecrest_bustard_gerald_makefile)
229       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
230       firecrest, junk = os.path.split(firecrest_bustard)
231
232       conf_info.bustard_path = firecrest_bustard
233       conf_info.run_path = firecrest
234     
235     #Standard output handling
236     else:
237       print 'Sequence line:', line
238       mo = s_seq_folder.search(line)
239       conf_info.bustard_path = line[mo.end():]
240       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
241
242   # Log all other output for debugging purposes
243   else:
244     logging.warning('CONF:?: %s' % (line))
245
246   return False
247
248
249
250 def config_stderr_handler(line, conf_info):
251   """
252   Processes each line of output from GOAT
253   and stores useful information using the logging module
254
255   Loads useful information into conf_info as well, for future
256   use outside the function.
257
258   returns RUN_ABORT upon detecting failure;
259           True on success message;
260           False if neutral message
261             (i.e. doesn't signify failure or success)
262   """
263
264   # Detect invalid species directory error
265   if s_species_dir_err.search(line):
266     logging.error(line)
267     return RUN_ABORT
268   # Detect goat_pipeline.py traceback
269   elif s_goat_traceb.search(line):
270     logging.error("Goat config script died, traceback in debug output")
271     return RUN_ABORT
272   # Detect indication of successful configuration (from stderr; odd, but ok)
273   elif s_stderr_taskcomplete.search(line):
274     logging.info('Configure step successful (from: stderr)')
275     return True
276   # Log all other output as debug output
277   else:
278     logging.debug('CONF:STDERR:?: %s' % (line))
279
280   # Neutral (not failure; nor success)
281   return False
282
283
284 #def pipeline_stdout_handler(line, conf_info):
285 #  """
286 #  Processes each line of output from running the pipeline
287 #  and stores useful information using the logging module
288 #
289 #  Loads useful information into conf_info as well, for future
290 #  use outside the function.
291 #
292 #  returns True if found condition that signifies success.
293 #  """
294 #
295 #  #f.write(line + '\n')
296 #
297 #  return True
298
299
300
301 def pipeline_stderr_handler(line, conf_info):
302   """
303   Processes each line of stderr from pipelien run
304   and stores useful information using the logging module
305
306   ##FIXME: Future feature (doesn't actually do this yet)
307   #Loads useful information into conf_info as well, for future
308   #use outside the function.
309
310   returns RUN_FAILED upon detecting failure;
311           #True on success message; (no clear success state)
312           False if neutral message
313             (i.e. doesn't signify failure or success)
314   """
315
316   if pl_stderr_ignore(line):
317     pass
318   elif s_make_error.search(line):
319     logging.error("make error detected; run failed")
320     return RUN_FAILED
321   elif s_no_gnuplot.search(line):
322     logging.error("gnuplot not found")
323     return RUN_FAILED
324   elif s_no_convert.search(line):
325     logging.error("imagemagick's convert command not found")
326     return RUN_FAILED
327   elif s_no_ghostscript.search(line):
328     logging.error("ghostscript not found")
329     return RUN_FAILED
330   else:
331     logging.debug('PIPE:STDERR:?: %s' % (line))
332
333   return False
334
335
336 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
337   """
338   Gets the config file from server...
339   requires config file in:
340     /etc/ga_frontend/ga_frontend.conf
341    or
342     ~/.ga_frontend.conf
343
344   with:
345   [config_file_server]
346   base_host_url: http://host:port
347
348   return True if successful, False is failure
349   """
350   options = getCombinedOptions()
351
352   if options.url is None:
353     logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \
354                   " missing base_host_url option")
355     return False
356
357   try:
358     saveConfigFile(flowcell, options.url, cfg_filepath)
359     conf_info.config_filepath = cfg_filepath
360   except FlowCellNotFound, e:
361     logging.error(e)
362     return False
363   except WebError404, e:
364     logging.error(e)
365     return False
366   except IOError, e:
367     logging.error(e)
368     return False
369   except Exception, e:
370     logging.error(e)
371     return False
372
373   f = open(cfg_filepath, 'r')
374   data = f.read()
375   f.close()
376
377   genome_dict = getAvailableGenomes(genome_dir)
378   mapper_dict = constructMapperDict(genome_dict)
379
380   f = open(cfg_filepath, 'w')
381   f.write(data % (mapper_dict))
382   f.close()
383   
384   return True  
385   
386
387
388 def configure(conf_info):
389   """
390   Attempts to configure the GA pipeline using goat.
391
392   Uses logging module to store information about status.
393
394   returns True if configuration successful, otherwise False.
395   """
396   #ERROR Test:
397   #pipe = subprocess.Popen(['goat_pipeline.py',
398   #                         '--GERALD=config32bk.txt',
399   #                         '--make .',],
400   #                         #'.'],
401   #                        stdout=subprocess.PIPE,
402   #                        stderr=subprocess.PIPE)
403
404   #ERROR Test (2), causes goat_pipeline.py traceback
405   #pipe = subprocess.Popen(['goat_pipeline.py',
406   #                  '--GERALD=%s' % (conf_info.config_filepath),
407   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
408   #                         '--make',
409   #                         '.'],
410   #                        stdout=subprocess.PIPE,
411   #                        stderr=subprocess.PIPE)
412
413   ##########################
414   # Run configuration step
415   #   Not a test; actual configure attempt.
416   #pipe = subprocess.Popen(['goat_pipeline.py',
417   #                  '--GERALD=%s' % (conf_info.config_filepath),
418   #                         '--make',
419   #                         '.'],
420   #                        stdout=subprocess.PIPE,
421   #                        stderr=subprocess.PIPE)
422
423   # CONTINUE HERE
424   #FIXME: this only does a run on 5 tiles on lane 4
425   pipe = subprocess.Popen(['goat_pipeline.py',
426                     '--GERALD=%s' % (conf_info.config_filepath),
427                            '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
428                            '--make',
429                            '.'],
430                           stdout=subprocess.PIPE,
431                           stderr=subprocess.PIPE)
432   ##################
433   # Process stdout
434   stdout_line = pipe.stdout.readline()
435
436   complete = False
437   while stdout_line != '':
438     # Handle stdout
439     if config_stdout_handler(stdout_line, conf_info):
440       complete = True
441     stdout_line = pipe.stdout.readline()
442
443
444   error_code = pipe.wait()
445   if error_code:
446     logging.error('Recieved error_code: %s' % (error_code))
447   else:
448     logging.info('We are go for launch!')
449
450   #Process stderr
451   stderr_line = pipe.stderr.readline()
452
453   abort = 'NO!'
454   stderr_success = False
455   while stderr_line != '':
456     stderr_status = config_stderr_handler(stderr_line, conf_info)
457     if stderr_status == RUN_ABORT:
458       abort = RUN_ABORT
459     elif stderr_status is True:
460       stderr_success = True
461     stderr_line = pipe.stderr.readline()
462
463
464   #Success requirements:
465   # 1) The stdout completed without error
466   # 2) The program exited with status 0
467   # 3) No errors found in stdout
468   print '#Expect: True, False, True, True'
469   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
470   status = complete is True and \
471            bool(error_code) is False and \
472            abort != RUN_ABORT and \
473            stderr_success is True
474
475   # If everything was successful, but for some reason
476   #  we didn't retrieve the path info, log it.
477   if status is True:
478     if conf_info.bustard_path is None or conf_info.run_path is None:
479       logging.error("Failed to retrieve run_path")
480       return False
481   
482   return status
483
484
485 def run_pipeline(conf_info):
486   """
487   Run the pipeline and monitor status.
488   """
489   # Fail if the run_path doesn't actually exist
490   if not os.path.exists(conf_info.run_path):
491     logging.error('Run path does not exist: %s' \
492               % (conf_info.run_path))
493     return False
494
495   # Change cwd to run_path
496   os.chdir(conf_info.run_path)
497   stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
498   stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
499
500   # Create status object
501   conf_info.createStatusObject()
502
503   # Monitor file creation
504   wm = WatchManager()
505   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
506   event = RunEvent(conf_info)
507   notifier = ThreadedNotifier(wm, event)
508   notifier.start()
509   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
510
511   # Log pipeline starting
512   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
513   
514   # Start the pipeline (and hide!)
515   #pipe = subprocess.Popen(['make',
516   #                         '-j8',
517   #                         'recursive'],
518   #                        stdout=subprocess.PIPE,
519   #                        stderr=subprocess.PIPE)
520
521   fout = open(stdout_filepath, 'w')
522   ferr = open(stderr_filepath, 'w')
523
524   pipe = subprocess.Popen(['make',
525                              '-j8',
526                              'recursive'],
527                              stdout=fout,
528                              stderr=ferr)
529                              #shell=True)
530   # Wait for run to finish
531   retcode = pipe.wait()
532
533
534   # Clean up
535   notifier.stop()
536   fout.close()
537   ferr.close()
538
539   # Process stderr
540   ferr = open(stderr_filepath, 'r')
541
542   run_failed_stderr = False
543   for line in ferr:
544     err_status = pipeline_stderr_handler(line, conf_info)
545     if err_status == RUN_FAILED:
546       run_failed_stderr = True
547
548   ferr.close()
549
550   # Finished file check!
551   print 'RUN SUCCESS CHECK:'
552   for key, value in event.run_status_dict.items():
553     print '  %s: %s' % (key, value)
554
555   dstatus = event.run_status_dict
556
557   # Success or failure check
558   status = (retcode == 0) and \
559            run_failed_stderr is False and \
560            dstatus['firecrest'] is True and \
561            dstatus['bustard'] is True and \
562            dstatus['gerald'] is True
563
564   return status
565
566