logging.basicConfig should only be in top level scripts.
[htsworkflow.git] / gaworkflow / pipeline / configure_run.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile
9 from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404
10 from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
11 from gaworkflow.pipeline.run_status import GARunStatus
12
13 from pyinotify import WatchManager, ThreadedNotifier
14 from pyinotify import EventsCodes, ProcessEvent
15
16 class ConfigInfo:
17   
18   def __init__(self):
19     #run_path = firecrest analysis directory to run analysis from
20     self.run_path = None
21     self.bustard_path = None
22     self.config_filepath = None
23     self.status = None
24
25     #top level directory where all analyses are placed
26     self.base_analysis_dir = None
27     #analysis_dir, top level analysis dir...
28     # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
29     self.analysis_dir = None
30
31
32   def createStatusObject(self):
33     """
34     Creates a status object which can be queried for
35     status of running the pipeline
36
37     returns True if object created
38     returns False if object cannot be created
39     """
40     if self.config_filepath is None:
41       return False
42
43     self.status = GARunStatus(self.config_filepath)
44     return True
45
46
47
48 ####################################
49 # inotify event processor
50
51 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
52 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
53 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
54
55 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
56 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
57 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
58
59 class RunEvent(ProcessEvent):
60
61   def __init__(self, conf_info):
62
63     self.run_status_dict = {'firecrest': False,
64                             'bustard': False,
65                             'gerald': False}
66
67     self._ci = conf_info
68
69     ProcessEvent.__init__(self)
70     
71
72   def process_IN_CREATE(self, event):
73     fullpath = os.path.join(event.path, event.name)
74     if s_finished.search(fullpath):
75       logging.info("File Found: %s" % (fullpath))
76
77       if s_firecrest_finished.search(fullpath):
78         self.run_status_dict['firecrest'] = True
79         self._ci.status.updateFirecrest(event.name)
80       elif s_bustard_finished.search(fullpath):
81         self.run_status_dict['bustard'] = True
82         self._ci.status.updateBustard(event.name)
83       elif s_gerald_finished.search(fullpath):
84         self.run_status_dict['gerald'] = True
85         self._ci.status.updateGerald(event.name)
86
87     #WARNING: The following order is important!!
88     # Firecrest regex will catch all gerald, bustard, and firecrest
89     # Bustard regex will catch all gerald and bustard
90     # Gerald regex will catch all gerald
91     # So, order needs to be Gerald, Bustard, Firecrest, or this
92     #  won't work properly.
93     elif s_gerald_all.search(fullpath):
94       self._ci.status.updateGerald(event.name)
95     elif s_bustard_all.search(fullpath):
96       self._ci.status.updateBustard(event.name)
97     elif s_firecrest_all.search(fullpath):
98       self._ci.status.updateFirecrest(event.name)
99       
100     #print "Create: %s" % (os.path.join(event.path, event.name))
101
102   def process_IN_DELETE(self, event):
103     #print "Remove %s" % (os.path.join(event.path, event.name))
104     pass
105
106
107
108
109 #FLAGS
110 # Config Step Error
111 RUN_ABORT = 'abort'
112 # Run Step Error
113 RUN_FAILED = 'failed'
114
115
116 #####################################
117 # Configure Step (goat_pipeline.py)
118 #Info
119 s_start = re.compile('Starting Genome Analyzer Pipeline')
120 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
121 s_generating = re.compile('^Generating journals, Makefiles')
122 s_seq_folder = re.compile('^Sequence folder: ')
123 s_seq_folder_sub = re.compile('want to make ')
124 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
125
126 #Errors
127 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
128 s_species_dir_err = re.compile('Error: Lane [1-8]:')
129 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
130 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
131
132 SUPPRESS_MISSING_CYCLES = False
133
134
135 ##Ignore - Example of out above each ignore regex.
136 #NOTE: Commenting out an ignore will cause it to be
137 # logged as DEBUG with the logging module.
138 #CF_STDERR_IGNORE_LIST = []
139 s_skip = re.compile('s_[0-8]_[0-9]+')
140
141
142 ##########################################
143 # Pipeline Run Step (make -j8 recursive)
144
145 ##Info
146 s_finished = re.compile('finished')
147
148 ##Errors
149 s_make_error = re.compile('^make[\S\s]+Error')
150 s_no_gnuplot = re.compile('gnuplot: command not found')
151 s_no_convert = re.compile('^Can\'t exec "convert"')
152 s_no_ghostscript = re.compile('gs: command not found')
153
154 ##Ignore - Example of out above each ignore regex.
155 #NOTE: Commenting out an ignore will cause it to be
156 # logged as DEBUG with the logging module.
157 #
158 PL_STDERR_IGNORE_LIST = []
159 # Info: PF 11802
160 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
161 # About to analyse intensity file s_4_0101_sig2.txt
162 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
163 # Will send output to standard output
164 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
165 # Found 31877 clusters
166 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
167 # Will use quality criterion ((CHASTITY>=0.6)
168 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
169 # Quality criterion translated to (($F[5]>=0.6))
170 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
171 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
172 #  AND
173 # opened s_4_0103_qhg.txt
174 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
175 # 81129 sequences out of 157651 passed filter criteria
176 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
177
178
179 def pl_stderr_ignore(line):
180   """
181   Searches lines for lines to ignore (i.e. not to log)
182
183   returns True if line should be ignored
184   returns False if line should NOT be ignored
185   """
186   for s in PL_STDERR_IGNORE_LIST:
187     if s.search(line):
188       return True
189   return False
190
191
192 def config_stdout_handler(line, conf_info):
193   """
194   Processes each line of output from GOAT
195   and stores useful information using the logging module
196
197   Loads useful information into conf_info as well, for future
198   use outside the function.
199
200   returns True if found condition that signifies success.
201   """
202
203   # Skip irrelevant line (without logging)
204   if s_skip.search(line):
205     pass
206
207   # Detect invalid command-line arguments
208   elif s_invalid_cmdline.search(line):
209     logging.error("Invalid commandline options!")
210
211   # Detect starting of configuration
212   elif s_start.search(line):
213     logging.info('START: Configuring pipeline')
214
215   # Detect it made it past invalid arguments
216   elif s_gerald.search(line):
217     logging.info('Running make now')
218
219   # Detect that make files have been generated (based on output)
220   elif s_generating.search(line):
221     logging.info('Make files generted')
222     return True
223
224   # Capture run directory
225   elif s_seq_folder.search(line):
226     mo = s_seq_folder_sub.search(line)
227     #Output changed when using --tiles=<tiles>
228     # at least in pipeline v0.3.0b2
229     if mo:
230       firecrest_bustard_gerald_makefile = line[mo.end():]
231       firecrest_bustard_gerald, junk = \
232                                 os.path.split(firecrest_bustard_gerald_makefile)
233       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
234       firecrest, junk = os.path.split(firecrest_bustard)
235
236       conf_info.bustard_path = firecrest_bustard
237       conf_info.run_path = firecrest
238     
239     #Standard output handling
240     else:
241       print 'Sequence line:', line
242       mo = s_seq_folder.search(line)
243       conf_info.bustard_path = line[mo.end():]
244       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
245
246   # Log all other output for debugging purposes
247   else:
248     logging.warning('CONF:?: %s' % (line))
249
250   return False
251
252
253
254 def config_stderr_handler(line, conf_info):
255   """
256   Processes each line of output from GOAT
257   and stores useful information using the logging module
258
259   Loads useful information into conf_info as well, for future
260   use outside the function.
261
262   returns RUN_ABORT upon detecting failure;
263           True on success message;
264           False if neutral message
265             (i.e. doesn't signify failure or success)
266   """
267   global SUPPRESS_MISSING_CYCLES
268
269   # Detect invalid species directory error
270   if s_species_dir_err.search(line):
271     logging.error(line)
272     return RUN_ABORT
273   # Detect goat_pipeline.py traceback
274   elif s_goat_traceb.search(line):
275     logging.error("Goat config script died, traceback in debug output")
276     return RUN_ABORT
277   # Detect indication of successful configuration (from stderr; odd, but ok)
278   elif s_stderr_taskcomplete.search(line):
279     logging.info('Configure step successful (from: stderr)')
280     return True
281   # Detect missing cycles
282   elif s_missing_cycles.search(line):
283
284     # Only display error once
285     if not SUPPRESS_MISSING_CYCLES:
286       logging.error("Missing cycles detected; Not all cycles copied?")
287       logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
288       SUPPRESS_MISSING_CYCLES = True
289     return RUN_ABORT
290   
291   # Log all other output as debug output
292   else:
293     logging.debug('CONF:STDERR:?: %s' % (line))
294
295   # Neutral (not failure; nor success)
296   return False
297
298
299 #def pipeline_stdout_handler(line, conf_info):
300 #  """
301 #  Processes each line of output from running the pipeline
302 #  and stores useful information using the logging module
303 #
304 #  Loads useful information into conf_info as well, for future
305 #  use outside the function.
306 #
307 #  returns True if found condition that signifies success.
308 #  """
309 #
310 #  #f.write(line + '\n')
311 #
312 #  return True
313
314
315
316 def pipeline_stderr_handler(line, conf_info):
317   """
318   Processes each line of stderr from pipelien run
319   and stores useful information using the logging module
320
321   ##FIXME: Future feature (doesn't actually do this yet)
322   #Loads useful information into conf_info as well, for future
323   #use outside the function.
324
325   returns RUN_FAILED upon detecting failure;
326           #True on success message; (no clear success state)
327           False if neutral message
328             (i.e. doesn't signify failure or success)
329   """
330
331   if pl_stderr_ignore(line):
332     pass
333   elif s_make_error.search(line):
334     logging.error("make error detected; run failed")
335     return RUN_FAILED
336   elif s_no_gnuplot.search(line):
337     logging.error("gnuplot not found")
338     return RUN_FAILED
339   elif s_no_convert.search(line):
340     logging.error("imagemagick's convert command not found")
341     return RUN_FAILED
342   elif s_no_ghostscript.search(line):
343     logging.error("ghostscript not found")
344     return RUN_FAILED
345   else:
346     logging.debug('PIPE:STDERR:?: %s' % (line))
347
348   return False
349
350
351 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
352   """
353   Gets the config file from server...
354   requires config file in:
355     /etc/ga_frontend/ga_frontend.conf
356    or
357     ~/.ga_frontend.conf
358
359   with:
360   [config_file_server]
361   base_host_url: http://host:port
362
363   return True if successful, False is failure
364   """
365   options = getCombinedOptions()
366
367   if options.url is None:
368     logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \
369                   " missing base_host_url option")
370     return False
371
372   try:
373     saveConfigFile(flowcell, options.url, cfg_filepath)
374     conf_info.config_filepath = cfg_filepath
375   except FlowCellNotFound, e:
376     logging.error(e)
377     return False
378   except WebError404, e:
379     logging.error(e)
380     return False
381   except IOError, e:
382     logging.error(e)
383     return False
384   except Exception, e:
385     logging.error(e)
386     return False
387
388   f = open(cfg_filepath, 'r')
389   data = f.read()
390   f.close()
391
392   genome_dict = getAvailableGenomes(genome_dir)
393   mapper_dict = constructMapperDict(genome_dict)
394
395   f = open(cfg_filepath, 'w')
396   f.write(data % (mapper_dict))
397   f.close()
398   
399   return True  
400   
401
402
403 def configure(conf_info):
404   """
405   Attempts to configure the GA pipeline using goat.
406
407   Uses logging module to store information about status.
408
409   returns True if configuration successful, otherwise False.
410   """
411   #ERROR Test:
412   #pipe = subprocess.Popen(['goat_pipeline.py',
413   #                         '--GERALD=config32bk.txt',
414   #                         '--make .',],
415   #                         #'.'],
416   #                        stdout=subprocess.PIPE,
417   #                        stderr=subprocess.PIPE)
418
419   #ERROR Test (2), causes goat_pipeline.py traceback
420   #pipe = subprocess.Popen(['goat_pipeline.py',
421   #                  '--GERALD=%s' % (conf_info.config_filepath),
422   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
423   #                         '--make',
424   #                         '.'],
425   #                        stdout=subprocess.PIPE,
426   #                        stderr=subprocess.PIPE)
427
428   ##########################
429   # Run configuration step
430   #   Not a test; actual configure attempt.
431   #pipe = subprocess.Popen(['goat_pipeline.py',
432   #                  '--GERALD=%s' % (conf_info.config_filepath),
433   #                         '--make',
434   #                         '.'],
435   #                        stdout=subprocess.PIPE,
436   #                        stderr=subprocess.PIPE)
437
438
439   stdout_filepath = os.path.join(conf_info.analysis_dir,
440                                  "pipeline_configure_stdout.txt")
441   stderr_filepath = os.path.join(conf_info.analysis_dir,
442                                  "pipeline_configure_stderr.txt")
443
444   fout = open(stdout_filepath, 'w')
445   ferr = open(stderr_filepath, 'w')
446   
447   pipe = subprocess.Popen(['goat_pipeline.py',
448                     '--GERALD=%s' % (conf_info.config_filepath),
449                            #'--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
450                            '--make',
451                            conf_info.analysis_dir],
452                           stdout=fout,
453                           stderr=ferr)
454
455   print "Configuring pipeline: %s" % (time.ctime())
456   error_code = pipe.wait()
457
458   # Clean up
459   fout.close()
460   ferr.close()
461   
462   
463   ##################
464   # Process stdout
465   fout = open(stdout_filepath, 'r')
466   
467   stdout_line = fout.readline()
468
469   complete = False
470   while stdout_line != '':
471     # Handle stdout
472     if config_stdout_handler(stdout_line, conf_info):
473       complete = True
474     stdout_line = fout.readline()
475
476   fout.close()
477
478
479   #error_code = pipe.wait()
480   if error_code:
481     logging.error('Recieved error_code: %s' % (error_code))
482   else:
483     logging.info('We are go for launch!')
484
485   #Process stderr
486   ferr = open(stderr_filepath, 'r')
487   stderr_line = ferr.readline()
488
489   abort = 'NO!'
490   stderr_success = False
491   while stderr_line != '':
492     stderr_status = config_stderr_handler(stderr_line, conf_info)
493     if stderr_status == RUN_ABORT:
494       abort = RUN_ABORT
495     elif stderr_status is True:
496       stderr_success = True
497     stderr_line = ferr.readline()
498
499   ferr.close()
500
501
502   #Success requirements:
503   # 1) The stdout completed without error
504   # 2) The program exited with status 0
505   # 3) No errors found in stdout
506   print '#Expect: True, False, True, True'
507   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
508   status = complete is True and \
509            bool(error_code) is False and \
510            abort != RUN_ABORT and \
511            stderr_success is True
512
513   # If everything was successful, but for some reason
514   #  we didn't retrieve the path info, log it.
515   if status is True:
516     if conf_info.bustard_path is None or conf_info.run_path is None:
517       logging.error("Failed to retrieve run_path")
518       return False
519   
520   return status
521
522
523 def run_pipeline(conf_info):
524   """
525   Run the pipeline and monitor status.
526   """
527   # Fail if the run_path doesn't actually exist
528   if not os.path.exists(conf_info.run_path):
529     logging.error('Run path does not exist: %s' \
530               % (conf_info.run_path))
531     return False
532
533   # Change cwd to run_path
534   stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
535   stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
536
537   # Create status object
538   conf_info.createStatusObject()
539
540   # Monitor file creation
541   wm = WatchManager()
542   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
543   event = RunEvent(conf_info)
544   notifier = ThreadedNotifier(wm, event)
545   notifier.start()
546   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
547
548   # Log pipeline starting
549   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
550   
551   # Start the pipeline (and hide!)
552   #pipe = subprocess.Popen(['make',
553   #                         '-j8',
554   #                         'recursive'],
555   #                        stdout=subprocess.PIPE,
556   #                        stderr=subprocess.PIPE)
557
558   fout = open(stdout_filepath, 'w')
559   ferr = open(stderr_filepath, 'w')
560
561   pipe = subprocess.Popen(['make',
562                            '--directory=%s' % (conf_info.run_path),
563                            '-j8',
564                            'recursive'],
565                            stdout=fout,
566                            stderr=ferr)
567                            #shell=True)
568   # Wait for run to finish
569   retcode = pipe.wait()
570
571
572   # Clean up
573   notifier.stop()
574   fout.close()
575   ferr.close()
576
577   # Process stderr
578   ferr = open(stderr_filepath, 'r')
579
580   run_failed_stderr = False
581   for line in ferr:
582     err_status = pipeline_stderr_handler(line, conf_info)
583     if err_status == RUN_FAILED:
584       run_failed_stderr = True
585
586   ferr.close()
587
588   # Finished file check!
589   print 'RUN SUCCESS CHECK:'
590   for key, value in event.run_status_dict.items():
591     print '  %s: %s' % (key, value)
592
593   dstatus = event.run_status_dict
594
595   # Success or failure check
596   status = (retcode == 0) and \
597            run_failed_stderr is False and \
598            dstatus['firecrest'] is True and \
599            dstatus['bustard'] is True and \
600            dstatus['gerald'] is True
601
602   return status
603
604