a0d8c33285fda74d617a48984f509a5147bbcd7b
[htsworkflow.git] / gaworkflow / pipeline / configure_run.py
1 #!/usr/bin/python
2 import subprocess
3 import logging
4 import time
5 import re
6 import os
7
8 from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile
9 from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404
10 from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
11 from gaworkflow.pipeline.run_status import GARunStatus
12
13 from pyinotify import WatchManager, ThreadedNotifier
14 from pyinotify import EventsCodes, ProcessEvent
15
16 logging.basicConfig(level=logging.DEBUG,
17                     format='%(asctime)s %(levelname)-8s %(message)s',
18                     datefmt='%a, %d %b %Y %H:%M:%S',
19                     filename='pipeline_main.log',
20                     filemode='w')
21
22 class ConfigInfo:
23   
24   def __init__(self):
25     self.run_path = None
26     self.bustard_path = None
27     self.config_filepath = None
28     self.status = None
29
30
31   def createStatusObject(self):
32     """
33     Creates a status object which can be queried for
34     status of running the pipeline
35
36     returns True if object created
37     returns False if object cannot be created
38     """
39     if self.config_filepath is None:
40       return False
41
42     self.status = GARunStatus(self.config_filepath)
43     return True
44
45
46
47 ####################################
48 # inotify event processor
49
50 s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
51 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
52 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
53
54 s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
55 s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
56 s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
57
58 class RunEvent(ProcessEvent):
59
60   def __init__(self, conf_info):
61
62     self.run_status_dict = {'firecrest': False,
63                             'bustard': False,
64                             'gerald': False}
65
66     self._ci = conf_info
67
68     ProcessEvent.__init__(self)
69     
70
71   def process_IN_CREATE(self, event):
72     fullpath = os.path.join(event.path, event.name)
73     if s_finished.search(fullpath):
74       logging.info("File Found: %s" % (fullpath))
75
76       if s_firecrest_finished.search(fullpath):
77         self.run_status_dict['firecrest'] = True
78         self._ci.status.updateFirecrest(event.name)
79       elif s_bustard_finished.search(fullpath):
80         self.run_status_dict['bustard'] = True
81         self._ci.status.updateBustard(event.name)
82       elif s_gerald_finished.search(fullpath):
83         self.run_status_dict['gerald'] = True
84         self._ci.status.updateGerald(event.name)
85
86     #WARNING: The following order is important!!
87     # Firecrest regex will catch all gerald, bustard, and firecrest
88     # Bustard regex will catch all gerald and bustard
89     # Gerald regex will catch all gerald
90     # So, order needs to be Gerald, Bustard, Firecrest, or this
91     #  won't work properly.
92     elif s_gerald_all.search(fullpath):
93       self._ci.status.updateGerald(event.name)
94     elif s_bustard_all.search(fullpath):
95       self._ci.status.updateBustard(event.name)
96     elif s_firecrest_all.search(fullpath):
97       self._ci.status.updateFirecrest(event.name)
98       
99     #print "Create: %s" % (os.path.join(event.path, event.name))
100
101   def process_IN_DELETE(self, event):
102     #print "Remove %s" % (os.path.join(event.path, event.name))
103     pass
104
105
106
107
108 #FLAGS
109 # Config Step Error
110 RUN_ABORT = 'abort'
111 # Run Step Error
112 RUN_FAILED = 'failed'
113
114
115 #####################################
116 # Configure Step (goat_pipeline.py)
117 #Info
118 s_start = re.compile('Starting Genome Analyzer Pipeline')
119 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
120 s_generating = re.compile('^Generating journals, Makefiles')
121 s_seq_folder = re.compile('^Sequence folder: ')
122 s_seq_folder_sub = re.compile('want to make ')
123 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
124
125 #Errors
126 s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py')
127 s_species_dir_err = re.compile('Error: Lane [1-8]:')
128 s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
129 s_missing_cycles = re.compile('^Error: Tile s_[1-8]_[0-9]+: Different number of cycles: [0-9]+ instead of [0-9]+')
130
131 SUPPRESS_MISSING_CYCLES = False
132
133
134 ##Ignore - Example of out above each ignore regex.
135 #NOTE: Commenting out an ignore will cause it to be
136 # logged as DEBUG with the logging module.
137 #CF_STDERR_IGNORE_LIST = []
138 s_skip = re.compile('s_[0-8]_[0-9]+')
139
140
141 ##########################################
142 # Pipeline Run Step (make -j8 recursive)
143
144 ##Info
145 s_finished = re.compile('finished')
146
147 ##Errors
148 s_make_error = re.compile('^make[\S\s]+Error')
149 s_no_gnuplot = re.compile('gnuplot: command not found')
150 s_no_convert = re.compile('^Can\'t exec "convert"')
151 s_no_ghostscript = re.compile('gs: command not found')
152
153 ##Ignore - Example of out above each ignore regex.
154 #NOTE: Commenting out an ignore will cause it to be
155 # logged as DEBUG with the logging module.
156 #
157 PL_STDERR_IGNORE_LIST = []
158 # Info: PF 11802
159 PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
160 # About to analyse intensity file s_4_0101_sig2.txt
161 PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
162 # Will send output to standard output
163 PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
164 # Found 31877 clusters
165 PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
166 # Will use quality criterion ((CHASTITY>=0.6)
167 PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
168 # Quality criterion translated to (($F[5]>=0.6))
169 PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
170 # opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
171 #  AND
172 # opened s_4_0103_qhg.txt
173 PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
174 # 81129 sequences out of 157651 passed filter criteria
175 PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') )
176
177
178 def pl_stderr_ignore(line):
179   """
180   Searches lines for lines to ignore (i.e. not to log)
181
182   returns True if line should be ignored
183   returns False if line should NOT be ignored
184   """
185   for s in PL_STDERR_IGNORE_LIST:
186     if s.search(line):
187       return True
188   return False
189
190
191 def config_stdout_handler(line, conf_info):
192   """
193   Processes each line of output from GOAT
194   and stores useful information using the logging module
195
196   Loads useful information into conf_info as well, for future
197   use outside the function.
198
199   returns True if found condition that signifies success.
200   """
201
202   # Skip irrelevant line (without logging)
203   if s_skip.search(line):
204     pass
205
206   # Detect invalid command-line arguments
207   elif s_invalid_cmdline.search(line):
208     logging.error("Invalid commandline options!")
209
210   # Detect starting of configuration
211   elif s_start.search(line):
212     logging.info('START: Configuring pipeline')
213
214   # Detect it made it past invalid arguments
215   elif s_gerald.search(line):
216     logging.info('Running make now')
217
218   # Detect that make files have been generated (based on output)
219   elif s_generating.search(line):
220     logging.info('Make files generted')
221     return True
222
223   # Capture run directory
224   elif s_seq_folder.search(line):
225     mo = s_seq_folder_sub.search(line)
226     #Output changed when using --tiles=<tiles>
227     # at least in pipeline v0.3.0b2
228     if mo:
229       firecrest_bustard_gerald_makefile = line[mo.end():]
230       firecrest_bustard_gerald, junk = \
231                                 os.path.split(firecrest_bustard_gerald_makefile)
232       firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
233       firecrest, junk = os.path.split(firecrest_bustard)
234
235       conf_info.bustard_path = firecrest_bustard
236       conf_info.run_path = firecrest
237     
238     #Standard output handling
239     else:
240       print 'Sequence line:', line
241       mo = s_seq_folder.search(line)
242       conf_info.bustard_path = line[mo.end():]
243       conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
244
245   # Log all other output for debugging purposes
246   else:
247     logging.warning('CONF:?: %s' % (line))
248
249   return False
250
251
252
253 def config_stderr_handler(line, conf_info):
254   """
255   Processes each line of output from GOAT
256   and stores useful information using the logging module
257
258   Loads useful information into conf_info as well, for future
259   use outside the function.
260
261   returns RUN_ABORT upon detecting failure;
262           True on success message;
263           False if neutral message
264             (i.e. doesn't signify failure or success)
265   """
266   global SUPPRESS_MISSING_CYCLES
267
268   # Detect invalid species directory error
269   if s_species_dir_err.search(line):
270     logging.error(line)
271     return RUN_ABORT
272   # Detect goat_pipeline.py traceback
273   elif s_goat_traceb.search(line):
274     logging.error("Goat config script died, traceback in debug output")
275     return RUN_ABORT
276   # Detect indication of successful configuration (from stderr; odd, but ok)
277   elif s_stderr_taskcomplete.search(line):
278     logging.info('Configure step successful (from: stderr)')
279     return True
280   # Detect missing cycles
281   elif s_missing_cycles.search(line):
282
283     # Only display error once
284     if not SUPPRESS_MISSING_CYCLES:
285       logging.error("Missing cycles detected; Not all cycles copied?")
286       logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line))
287       SUPPRESS_MISSING_CYCLES = True
288     return RUN_ABORT
289   
290   # Log all other output as debug output
291   else:
292     logging.debug('CONF:STDERR:?: %s' % (line))
293
294   # Neutral (not failure; nor success)
295   return False
296
297
298 #def pipeline_stdout_handler(line, conf_info):
299 #  """
300 #  Processes each line of output from running the pipeline
301 #  and stores useful information using the logging module
302 #
303 #  Loads useful information into conf_info as well, for future
304 #  use outside the function.
305 #
306 #  returns True if found condition that signifies success.
307 #  """
308 #
309 #  #f.write(line + '\n')
310 #
311 #  return True
312
313
314
315 def pipeline_stderr_handler(line, conf_info):
316   """
317   Processes each line of stderr from pipelien run
318   and stores useful information using the logging module
319
320   ##FIXME: Future feature (doesn't actually do this yet)
321   #Loads useful information into conf_info as well, for future
322   #use outside the function.
323
324   returns RUN_FAILED upon detecting failure;
325           #True on success message; (no clear success state)
326           False if neutral message
327             (i.e. doesn't signify failure or success)
328   """
329
330   if pl_stderr_ignore(line):
331     pass
332   elif s_make_error.search(line):
333     logging.error("make error detected; run failed")
334     return RUN_FAILED
335   elif s_no_gnuplot.search(line):
336     logging.error("gnuplot not found")
337     return RUN_FAILED
338   elif s_no_convert.search(line):
339     logging.error("imagemagick's convert command not found")
340     return RUN_FAILED
341   elif s_no_ghostscript.search(line):
342     logging.error("ghostscript not found")
343     return RUN_FAILED
344   else:
345     logging.debug('PIPE:STDERR:?: %s' % (line))
346
347   return False
348
349
350 def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir):
351   """
352   Gets the config file from server...
353   requires config file in:
354     /etc/ga_frontend/ga_frontend.conf
355    or
356     ~/.ga_frontend.conf
357
358   with:
359   [config_file_server]
360   base_host_url: http://host:port
361
362   return True if successful, False is failure
363   """
364   options = getCombinedOptions()
365
366   if options.url is None:
367     logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \
368                   " missing base_host_url option")
369     return False
370
371   try:
372     saveConfigFile(flowcell, options.url, cfg_filepath)
373     conf_info.config_filepath = cfg_filepath
374   except FlowCellNotFound, e:
375     logging.error(e)
376     return False
377   except WebError404, e:
378     logging.error(e)
379     return False
380   except IOError, e:
381     logging.error(e)
382     return False
383   except Exception, e:
384     logging.error(e)
385     return False
386
387   f = open(cfg_filepath, 'r')
388   data = f.read()
389   f.close()
390
391   genome_dict = getAvailableGenomes(genome_dir)
392   mapper_dict = constructMapperDict(genome_dict)
393
394   f = open(cfg_filepath, 'w')
395   f.write(data % (mapper_dict))
396   f.close()
397   
398   return True  
399   
400
401
402 def configure(conf_info):
403   """
404   Attempts to configure the GA pipeline using goat.
405
406   Uses logging module to store information about status.
407
408   returns True if configuration successful, otherwise False.
409   """
410   #ERROR Test:
411   #pipe = subprocess.Popen(['goat_pipeline.py',
412   #                         '--GERALD=config32bk.txt',
413   #                         '--make .',],
414   #                         #'.'],
415   #                        stdout=subprocess.PIPE,
416   #                        stderr=subprocess.PIPE)
417
418   #ERROR Test (2), causes goat_pipeline.py traceback
419   #pipe = subprocess.Popen(['goat_pipeline.py',
420   #                  '--GERALD=%s' % (conf_info.config_filepath),
421   #                         '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104',
422   #                         '--make',
423   #                         '.'],
424   #                        stdout=subprocess.PIPE,
425   #                        stderr=subprocess.PIPE)
426
427   ##########################
428   # Run configuration step
429   #   Not a test; actual configure attempt.
430   #pipe = subprocess.Popen(['goat_pipeline.py',
431   #                  '--GERALD=%s' % (conf_info.config_filepath),
432   #                         '--make',
433   #                         '.'],
434   #                        stdout=subprocess.PIPE,
435   #                        stderr=subprocess.PIPE)
436
437   # CONTINUE HERE
438   #FIXME: this only does a run on 5 tiles on lane 4
439   pipe = subprocess.Popen(['goat_pipeline.py',
440                     '--GERALD=%s' % (conf_info.config_filepath),
441                            #'--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
442                            '--make',
443                            '.'],
444                           stdout=subprocess.PIPE,
445                           stderr=subprocess.PIPE)
446   ##################
447   # Process stdout
448   stdout_line = pipe.stdout.readline()
449
450   complete = False
451   while stdout_line != '':
452     # Handle stdout
453     if config_stdout_handler(stdout_line, conf_info):
454       complete = True
455     stdout_line = pipe.stdout.readline()
456
457
458   error_code = pipe.wait()
459   if error_code:
460     logging.error('Recieved error_code: %s' % (error_code))
461   else:
462     logging.info('We are go for launch!')
463
464   #Process stderr
465   stderr_line = pipe.stderr.readline()
466
467   abort = 'NO!'
468   stderr_success = False
469   while stderr_line != '':
470     stderr_status = config_stderr_handler(stderr_line, conf_info)
471     if stderr_status == RUN_ABORT:
472       abort = RUN_ABORT
473     elif stderr_status is True:
474       stderr_success = True
475     stderr_line = pipe.stderr.readline()
476
477
478   #Success requirements:
479   # 1) The stdout completed without error
480   # 2) The program exited with status 0
481   # 3) No errors found in stdout
482   print '#Expect: True, False, True, True'
483   print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
484   status = complete is True and \
485            bool(error_code) is False and \
486            abort != RUN_ABORT and \
487            stderr_success is True
488
489   # If everything was successful, but for some reason
490   #  we didn't retrieve the path info, log it.
491   if status is True:
492     if conf_info.bustard_path is None or conf_info.run_path is None:
493       logging.error("Failed to retrieve run_path")
494       return False
495   
496   return status
497
498
499 def run_pipeline(conf_info):
500   """
501   Run the pipeline and monitor status.
502   """
503   # Fail if the run_path doesn't actually exist
504   if not os.path.exists(conf_info.run_path):
505     logging.error('Run path does not exist: %s' \
506               % (conf_info.run_path))
507     return False
508
509   # Change cwd to run_path
510   os.chdir(conf_info.run_path)
511   stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
512   stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
513
514   # Create status object
515   conf_info.createStatusObject()
516
517   # Monitor file creation
518   wm = WatchManager()
519   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
520   event = RunEvent(conf_info)
521   notifier = ThreadedNotifier(wm, event)
522   notifier.start()
523   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
524
525   # Log pipeline starting
526   logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
527   
528   # Start the pipeline (and hide!)
529   #pipe = subprocess.Popen(['make',
530   #                         '-j8',
531   #                         'recursive'],
532   #                        stdout=subprocess.PIPE,
533   #                        stderr=subprocess.PIPE)
534
535   fout = open(stdout_filepath, 'w')
536   ferr = open(stderr_filepath, 'w')
537
538   pipe = subprocess.Popen(['make',
539                              '-j8',
540                              'recursive'],
541                              stdout=fout,
542                              stderr=ferr)
543                              #shell=True)
544   # Wait for run to finish
545   retcode = pipe.wait()
546
547
548   # Clean up
549   notifier.stop()
550   fout.close()
551   ferr.close()
552
553   # Process stderr
554   ferr = open(stderr_filepath, 'r')
555
556   run_failed_stderr = False
557   for line in ferr:
558     err_status = pipeline_stderr_handler(line, conf_info)
559     if err_status == RUN_FAILED:
560       run_failed_stderr = True
561
562   ferr.close()
563
564   # Finished file check!
565   print 'RUN SUCCESS CHECK:'
566   for key, value in event.run_status_dict.items():
567     print '  %s: %s' % (key, value)
568
569   dstatus = event.run_status_dict
570
571   # Success or failure check
572   status = (retcode == 0) and \
573            run_failed_stderr is False and \
574            dstatus['firecrest'] is True and \
575            dstatus['bustard'] is True and \
576            dstatus['gerald'] is True
577
578   return status
579
580