Runner seems to work with running the pipeline when launch from within
[htsworkflow.git] / gaworkflow / pipeline / configure_run.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile
9 from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404
10 from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
11 from gaworkflow.pipeline.run_status import GARunStatus
12
13 from pyinotify import WatchManager, ThreadedNotifier
14 from pyinotify import EventsCodes, ProcessEvent
15
16 logging.basicConfig(level=logging.DEBUG,
17                     format='%(asctime)s %(levelname)-8s %(message)s',
18                     datefmt='%a, %d %b %Y %H:%M:%S',
19                     filename='pipeline_main.log',
20                     filemode='w')
21
22 class ConfigInfo:
23   
24   def __init__(self):
25     #run_path = firecrest analysis directory to run analysis from
26     self.run_path = None
27     self.bustard_path = None
28     self.config_filepath = None
29     self.status = None
30
31     #top level directory where all analyses are placed
32     self.base_analysis_dir = None
33     #analysis_dir, top level analysis dir...
34     # base_analysis_dir + '/070924_USI-EAS44_0022_FC12150'
35     self.analysis_dir = None
36
37
38   def createStatusObject(self):
39     """
40     Creates a status object which can be queried for
41     status of running the pipeline
42
43     returns True if object created
44     returns False if object cannot be created
45     """
46     if self.config_filepath is None:
47       return False
48
49     self.status = GARunStatus(self.config_filepath)
50     return True
51
52
53
54 ####################################
55 # inotify event processor
56
57 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
58 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
59 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
60
61 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
62 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
63 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
64
65 class RunEvent(ProcessEvent):
66
67   def __init__(self, conf_info):
68
69     self.run_status_dict = {'firecrest': False,
70                             'bustard': False,
71                             'gerald': False}
72
73     self._ci = conf_info
74
75     ProcessEvent.__init__(self)
76     
77
78   def process_IN_CREATE(self, event):
79     fullpath = os.path.join(event.path, event.name)
80     if s_finished.search(fullpath):
81       logging.info("File Found: %s" % (fullpath))
82
83       if s_firecrest_finished.search(fullpath):
84         self.run_status_dict['firecrest'] = True
85         self._ci.status.updateFirecrest(event.name)
86       elif s_bustard_finished.search(fullpath):
87         self.run_status_dict['bustard'] = True
88         self._ci.status.updateBustard(event.name)
89       elif s_gerald_finished.search(fullpath):
90         self.run_status_dict['gerald'] = True
91         self._ci.status.updateGerald(event.name)
92
93     #WARNING: The following order is important!!
94     # Firecrest regex will catch all gerald, bustard, and firecrest
95     # Bustard regex will catch all gerald and bustard
96     # Gerald regex will catch all gerald
97     # So, order needs to be Gerald, Bustard, Firecrest, or this
98     #  won't work properly.
99     elif s_gerald_all.search(fullpath):
100       self._ci.status.updateGerald(event.name)
101     elif s_bustard_all.search(fullpath):
102       self._ci.status.updateBustard(event.name)
103     elif s_firecrest_all.search(fullpath):
104       self._ci.status.updateFirecrest(event.name)
105       
106     #print "Create: %s" % (os.path.join(event.path, event.name))
107
108   def process_IN_DELETE(self, event):
109     #print "Remove %s" % (os.path.join(event.path, event.name))
110     pass
111
112
113
114
115 #FLAGS
116 # Config Step Error
117 RUN_ABORT = 'abort'
118 # Run Step Error
119 RUN_FAILED = 'failed'
120
121
122 #####################################
123 # Configure Step (goat_pipeline.py)
124 #Info
125 s_start = re.compile('Starting Genome Analyzer Pipeline')
126 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
127 s_generating = re.compile('^Generating journals, Makefiles')
128 s_seq_folder = re.compile('^Sequence folder: ')
129 s_seq_folder_sub = re.compile('want to make ')
130 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
131
132 #Errors
133 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
134 s_species_dir_err = re.compile('Error: Lane [1-8]:')
135 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
136 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
137
138 SUPPRESS_MISSING_CYCLES = False
139
140
141 ##Ignore - Example of out above each ignore regex.
142 #NOTE: Commenting out an ignore will cause it to be
143 # logged as DEBUG with the logging module.
144 #CF_STDERR_IGNORE_LIST = []
145 s_skip = re.compile('s_[0-8]_[0-9]+')
146
147
148 ##########################################
149 # Pipeline Run Step (make -j8 recursive)
150
151 ##Info
152 s_finished = re.compile('finished')
153
154 ##Errors
155 s_make_error = re.compile('^make[\S\s]+Error')
156 s_no_gnuplot = re.compile('gnuplot: command not found')
157 s_no_convert = re.compile('^Can\'t exec "convert"')
158 s_no_ghostscript = re.compile('gs: command not found')
159
160 ##Ignore - Example of out above each ignore regex.
161 #NOTE: Commenting out an ignore will cause it to be
162 # logged as DEBUG with the logging module.
163 #
164 PL_STDERR_IGNORE_LIST = []
165 # Info: PF 11802
166 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
167 # About to analyse intensity file s_4_0101_sig2.txt
168 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
169 # Will send output to standard output
170 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
171 # Found 31877 clusters
172 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
173 # Will use quality criterion ((CHASTITY>=0.6)
174 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
175 # Quality criterion translated to (($F[5]>=0.6))
176 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
177 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
178 #  AND
179 # opened s_4_0103_qhg.txt
180 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
181 # 81129 sequences out of 157651 passed filter criteria
182 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
183
184
185 def pl_stderr_ignore(line):
186   """
187   Searches lines for lines to ignore (i.e. not to log)
188
189   returns True if line should be ignored
190   returns False if line should NOT be ignored
191   """
192   for s in PL_STDERR_IGNORE_LIST:
193     if s.search(line):
194       return True
195   return False
196
197
198 def config_stdout_handler(line, conf_info):
199   """
200   Processes each line of output from GOAT
201   and stores useful information using the logging module
202
203   Loads useful information into conf_info as well, for future
204   use outside the function.
205
206   returns True if found condition that signifies success.
207   """
208
209   # Skip irrelevant line (without logging)
210   if s_skip.search(line):
211     pass
212
213   # Detect invalid command-line arguments
214   elif s_invalid_cmdline.search(line):
215     logging.error("Invalid commandline options!")
216
217   # Detect starting of configuration
218   elif s_start.search(line):
219     logging.info('START: Configuring pipeline')
220
221   # Detect it made it past invalid arguments
222   elif s_gerald.search(line):
223     logging.info('Running make now')
224
225   # Detect that make files have been generated (based on output)
226   elif s_generating.search(line):
227     logging.info('Make files generted')
228     return True
229
230   # Capture run directory
231   elif s_seq_folder.search(line):
232     mo = s_seq_folder_sub.search(line)
233     #Output changed when using --tiles=<tiles>
234     # at least in pipeline v0.3.0b2
235     if mo:
236       firecrest_bustard_gerald_makefile = line[mo.end():]
237       firecrest_bustard_gerald, junk = \
238                                 os.path.split(firecrest_bustard_gerald_makefile)
239       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
240       firecrest, junk = os.path.split(firecrest_bustard)
241
242       conf_info.bustard_path = firecrest_bustard
243       conf_info.run_path = firecrest
244     
245     #Standard output handling
246     else:
247       print 'Sequence line:', line
248       mo = s_seq_folder.search(line)
249       conf_info.bustard_path = line[mo.end():]
250       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
251
252   # Log all other output for debugging purposes
253   else:
254     logging.warning('CONF:?: %s' % (line))
255
256   return False
257
258
259
260 def config_stderr_handler(line, conf_info):
261   """
262   Processes each line of output from GOAT
263   and stores useful information using the logging module
264
265   Loads useful information into conf_info as well, for future
266   use outside the function.
267
268   returns RUN_ABORT upon detecting failure;
269           True on success message;
270           False if neutral message
271             (i.e. doesn't signify failure or success)
272   """
273   global SUPPRESS_MISSING_CYCLES
274
275   # Detect invalid species directory error
276   if s_species_dir_err.search(line):
277     logging.error(line)
278     return RUN_ABORT
279   # Detect goat_pipeline.py traceback
280   elif s_goat_traceb.search(line):
281     logging.error("Goat config script died, traceback in debug output")
282     return RUN_ABORT
283   # Detect indication of successful configuration (from stderr; odd, but ok)
284   elif s_stderr_taskcomplete.search(line):
285     logging.info('Configure step successful (from: stderr)')
286     return True
287   # Detect missing cycles
288   elif s_missing_cycles.search(line):
289
290     # Only display error once
291     if not SUPPRESS_MISSING_CYCLES:
292       logging.error("Missing cycles detected; Not all cycles copied?")
293       logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
294       SUPPRESS_MISSING_CYCLES = True
295     return RUN_ABORT
296   
297   # Log all other output as debug output
298   else:
299     logging.debug('CONF:STDERR:?: %s' % (line))
300
301   # Neutral (not failure; nor success)
302   return False
303
304
305 #def pipeline_stdout_handler(line, conf_info):
306 #  """
307 #  Processes each line of output from running the pipeline
308 #  and stores useful information using the logging module
309 #
310 #  Loads useful information into conf_info as well, for future
311 #  use outside the function.
312 #
313 #  returns True if found condition that signifies success.
314 #  """
315 #
316 #  #f.write(line + '\n')
317 #
318 #  return True
319
320
321
322 def pipeline_stderr_handler(line, conf_info):
323   """
324   Processes each line of stderr from pipelien run
325   and stores useful information using the logging module
326
327   ##FIXME: Future feature (doesn't actually do this yet)
328   #Loads useful information into conf_info as well, for future
329   #use outside the function.
330
331   returns RUN_FAILED upon detecting failure;
332           #True on success message; (no clear success state)
333           False if neutral message
334             (i.e. doesn't signify failure or success)
335   """
336
337   if pl_stderr_ignore(line):
338     pass
339   elif s_make_error.search(line):
340     logging.error("make error detected; run failed")
341     return RUN_FAILED
342   elif s_no_gnuplot.search(line):
343     logging.error("gnuplot not found")
344     return RUN_FAILED
345   elif s_no_convert.search(line):
346     logging.error("imagemagick's convert command not found")
347     return RUN_FAILED
348   elif s_no_ghostscript.search(line):
349     logging.error("ghostscript not found")
350     return RUN_FAILED
351   else:
352     logging.debug('PIPE:STDERR:?: %s' % (line))
353
354   return False
355
356
357 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
358   """
359   Gets the config file from server...
360   requires config file in:
361     /etc/ga_frontend/ga_frontend.conf
362    or
363     ~/.ga_frontend.conf
364
365   with:
366   [config_file_server]
367   base_host_url: http://host:port
368
369   return True if successful, False is failure
370   """
371   options = getCombinedOptions()
372
373   if options.url is None:
374     logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \
375                   " missing base_host_url option")
376     return False
377
378   try:
379     saveConfigFile(flowcell, options.url, cfg_filepath)
380     conf_info.config_filepath = cfg_filepath
381   except FlowCellNotFound, e:
382     logging.error(e)
383     return False
384   except WebError404, e:
385     logging.error(e)
386     return False
387   except IOError, e:
388     logging.error(e)
389     return False
390   except Exception, e:
391     logging.error(e)
392     return False
393
394   f = open(cfg_filepath, 'r')
395   data = f.read()
396   f.close()
397
398   genome_dict = getAvailableGenomes(genome_dir)
399   mapper_dict = constructMapperDict(genome_dict)
400
401   f = open(cfg_filepath, 'w')
402   f.write(data % (mapper_dict))
403   f.close()
404   
405   return True  
406   
407
408
409 def configure(conf_info):
410   """
411   Attempts to configure the GA pipeline using goat.
412
413   Uses logging module to store information about status.
414
415   returns True if configuration successful, otherwise False.
416   """
417   #ERROR Test:
418   #pipe = subprocess.Popen(['goat_pipeline.py',
419   #                         '--GERALD=config32bk.txt',
420   #                         '--make .',],
421   #                         #'.'],
422   #                        stdout=subprocess.PIPE,
423   #                        stderr=subprocess.PIPE)
424
425   #ERROR Test (2), causes goat_pipeline.py traceback
426   #pipe = subprocess.Popen(['goat_pipeline.py',
427   #                  '--GERALD=%s' % (conf_info.config_filepath),
428   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
429   #                         '--make',
430   #                         '.'],
431   #                        stdout=subprocess.PIPE,
432   #                        stderr=subprocess.PIPE)
433
434   ##########################
435   # Run configuration step
436   #   Not a test; actual configure attempt.
437   #pipe = subprocess.Popen(['goat_pipeline.py',
438   #                  '--GERALD=%s' % (conf_info.config_filepath),
439   #                         '--make',
440   #                         '.'],
441   #                        stdout=subprocess.PIPE,
442   #                        stderr=subprocess.PIPE)
443
444
445   stdout_filepath = os.path.join(conf_info.analysis_dir,
446                                  "pipeline_configure_stdout.txt")
447   stderr_filepath = os.path.join(conf_info.analysis_dir,
448                                  "pipeline_configure_stderr.txt")
449
450   fout = open(stdout_filepath, 'w')
451   ferr = open(stderr_filepath, 'w')
452   
453   pipe = subprocess.Popen(['goat_pipeline.py',
454                     '--GERALD=%s' % (conf_info.config_filepath),
455                            #'--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
456                            '--make',
457                            conf_info.analysis_dir],
458                           stdout=fout,
459                           stderr=ferr)
460
461   print "Configuring pipeline: %s" % (time.ctime())
462   error_code = pipe.wait()
463
464   # Clean up
465   fout.close()
466   ferr.close()
467   
468   
469   ##################
470   # Process stdout
471   fout = open(stdout_filepath, 'r')
472   
473   stdout_line = fout.readline()
474
475   complete = False
476   while stdout_line != '':
477     # Handle stdout
478     if config_stdout_handler(stdout_line, conf_info):
479       complete = True
480     stdout_line = fout.readline()
481
482   fout.close()
483
484
485   #error_code = pipe.wait()
486   if error_code:
487     logging.error('Recieved error_code: %s' % (error_code))
488   else:
489     logging.info('We are go for launch!')
490
491   #Process stderr
492   ferr = open(stderr_filepath, 'r')
493   stderr_line = ferr.readline()
494
495   abort = 'NO!'
496   stderr_success = False
497   while stderr_line != '':
498     stderr_status = config_stderr_handler(stderr_line, conf_info)
499     if stderr_status == RUN_ABORT:
500       abort = RUN_ABORT
501     elif stderr_status is True:
502       stderr_success = True
503     stderr_line = ferr.readline()
504
505   ferr.close()
506
507
508   #Success requirements:
509   # 1) The stdout completed without error
510   # 2) The program exited with status 0
511   # 3) No errors found in stdout
512   print '#Expect: True, False, True, True'
513   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
514   status = complete is True and \
515            bool(error_code) is False and \
516            abort != RUN_ABORT and \
517            stderr_success is True
518
519   # If everything was successful, but for some reason
520   #  we didn't retrieve the path info, log it.
521   if status is True:
522     if conf_info.bustard_path is None or conf_info.run_path is None:
523       logging.error("Failed to retrieve run_path")
524       return False
525   
526   return status
527
528
529 def run_pipeline(conf_info):
530   """
531   Run the pipeline and monitor status.
532   """
533   # Fail if the run_path doesn't actually exist
534   if not os.path.exists(conf_info.run_path):
535     logging.error('Run path does not exist: %s' \
536               % (conf_info.run_path))
537     return False
538
539   # Change cwd to run_path
540   stdout_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stdout.txt')
541   stderr_filepath = os.path.join(conf_info.analysis_dir, 'pipeline_run_stderr.txt')
542
543   # Create status object
544   conf_info.createStatusObject()
545
546   # Monitor file creation
547   wm = WatchManager()
548   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
549   event = RunEvent(conf_info)
550   notifier = ThreadedNotifier(wm, event)
551   notifier.start()
552   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
553
554   # Log pipeline starting
555   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
556   
557   # Start the pipeline (and hide!)
558   #pipe = subprocess.Popen(['make',
559   #                         '-j8',
560   #                         'recursive'],
561   #                        stdout=subprocess.PIPE,
562   #                        stderr=subprocess.PIPE)
563
564   fout = open(stdout_filepath, 'w')
565   ferr = open(stderr_filepath, 'w')
566
567   pipe = subprocess.Popen(['make',
568                            '--directory=%s' % (conf_info.run_path),
569                            '-j8',
570                            'recursive'],
571                            stdout=fout,
572                            stderr=ferr)
573                            #shell=True)
574   # Wait for run to finish
575   retcode = pipe.wait()
576
577
578   # Clean up
579   notifier.stop()
580   fout.close()
581   ferr.close()
582
583   # Process stderr
584   ferr = open(stderr_filepath, 'r')
585
586   run_failed_stderr = False
587   for line in ferr:
588     err_status = pipeline_stderr_handler(line, conf_info)
589     if err_status == RUN_FAILED:
590       run_failed_stderr = True
591
592   ferr.close()
593
594   # Finished file check!
595   print 'RUN SUCCESS CHECK:'
596   for key, value in event.run_status_dict.items():
597     print '  %s: %s' % (key, value)
598
599   dstatus = event.run_status_dict
600
601   # Success or failure check
602   status = (retcode == 0) and \
603            run_failed_stderr is False and \
604            dstatus['firecrest'] is True and \
605            dstatus['bustard'] is True and \
606            dstatus['gerald'] is True
607
608   return status
609
610