Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import tarfile
13 import time
14
15 try:
16   from xml.etree import ElementTree
17 except ImportError, e:
18   from elementtree import ElementTree
19
20 EUROPEAN_STRPTIME = "%d-%m-%Y"
21 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
22 VERSION_RE = "([0-9\.]+)"
23 USER_RE = "([a-zA-Z0-9]+)"
24 LANES_PER_FLOWCELL = 8
25 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
26
27 from htsworkflow.util.alphanum import alphanum
28 from htsworkflow.util.ethelp import indent, flatten
29 from htsworkflow.util.queuecommands import QueueCommands
30
31 from htsworkflow.pipelines import srf
32
33 class PipelineRun(object):
34     """
35     Capture "interesting" information about a pipeline run
36     """
37     XML_VERSION = 1
38     PIPELINE_RUN = 'PipelineRun'
39     FLOWCELL_ID = 'FlowcellID'
40
41     def __init__(self, pathname=None, xml=None):
42         if pathname is not None:
43           self.pathname = os.path.normpath(pathname)
44         else:
45           self.pathname = None
46         self._name = None
47         self._flowcell_id = None
48         self.image_analysis = None
49         self.bustard = None
50         self.gerald = None
51
52         if xml is not None:
53           self.set_elements(xml)
54
55     def _get_flowcell_id(self):
56         # extract flowcell ID
57         if self._flowcell_id is None:
58           config_dir = os.path.join(self.pathname, 'Config')
59           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
60           if os.path.exists(flowcell_id_path):
61             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
62             self._flowcell_id = flowcell_id_tree.findtext('Text')
63           else:
64             path_fields = self.pathname.split('_')
65             if len(path_fields) > 0:
66               # guessing last element of filename
67               flowcell_id = path_fields[-1]
68             else:
69               flowcell_id = 'unknown'
70
71             logging.warning(
72               "Flowcell id was not found, guessing %s" % (
73                  flowcell_id))
74             self._flowcell_id = flowcell_id
75         return self._flowcell_id
76     flowcell_id = property(_get_flowcell_id)
77
78     def _get_runfolder_name(self):
79         if self.gerald is None:
80             return None
81         else:
82             return self.gerald.runfolder_name
83     runfolder_name = property(_get_runfolder_name)
84
85     def get_elements(self):
86         """
87         make one master xml file from all of our sub-components.
88         """
89         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
90         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
91         flowcell.text = self.flowcell_id
92         root.append(self.image_analysis.get_elements())
93         root.append(self.bustard.get_elements())
94         root.append(self.gerald.get_elements())
95         return root
96
97     def set_elements(self, tree):
98         # this file gets imported by all the others,
99         # so we need to hide the imports to avoid a cyclic imports
100         from htsworkflow.pipelines import firecrest
101         from htsworkflow.pipelines import ipar
102         from htsworkflow.pipelines import bustard
103         from htsworkflow.pipelines import gerald
104
105         tag = tree.tag.lower()
106         if tag != PipelineRun.PIPELINE_RUN.lower():
107           raise ValueError('Pipeline Run Expecting %s got %s' % (
108               PipelineRun.PIPELINE_RUN, tag))
109         for element in tree:
110           tag = element.tag.lower()
111           if tag == PipelineRun.FLOWCELL_ID.lower():
112             self._flowcell_id = element.text
113           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
114           # you should only have Firecrest or IPAR, never both of them.
115           elif tag == firecrest.Firecrest.FIRECREST.lower():
116             self.image_analysis = firecrest.Firecrest(xml=element)
117           elif tag == ipar.IPAR.IPAR.lower():
118             self.image_analysis = ipar.IPAR(xml=element)
119           elif tag == bustard.Bustard.BUSTARD.lower():
120             self.bustard = bustard.Bustard(xml=element)
121           elif tag == gerald.Gerald.GERALD.lower():
122             self.gerald = gerald.Gerald(xml=element)
123           else:
124             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
125
126     def _get_run_name(self):
127         """
128         Given a run tuple, find the latest date and use that as our name
129         """
130         if self._name is None:
131           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
132           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
133           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
134         return self._name
135     name = property(_get_run_name)
136
137     def save(self, destdir=None):
138         if destdir is None:
139             destdir = ''
140         logging.info("Saving run report " + self.name)
141         xml = self.get_elements()
142         indent(xml)
143         dest_pathname = os.path.join(destdir, self.name)
144         ElementTree.ElementTree(xml).write(dest_pathname)
145
146     def load(self, filename):
147         logging.info("Loading run report from " + filename)
148         tree = ElementTree.parse(filename).getroot()
149         self.set_elements(tree)
150
151 def load_pipeline_run_xml(pathname):
152     """
153     Load and instantiate a Pipeline run from a run xml file
154
155     :Parameters: 
156       - `pathname` : location of an run xml file
157
158     :Returns: initialized PipelineRun object
159     """
160     tree = ElementTree.parse(pathname).getroot()
161     run = PipelineRun(xml=tree)
162     return run
163
164 def get_runs(runfolder):
165     """
166     Search through a run folder for all the various sub component runs
167     and then return a PipelineRun for each different combination.
168
169     For example if there are two different GERALD runs, this will
170     generate two different PipelineRun objects, that differ
171     in there gerald component.
172     """
173     from htsworkflow.pipelines import firecrest
174     from htsworkflow.pipelines import ipar
175     from htsworkflow.pipelines import bustard
176     from htsworkflow.pipelines import gerald
177
178     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
179         logging.info("Looking for bustard directories in %s" % (pathname,))
180         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
181         # RTA BaseCalls looks enough like Bustard.
182         bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
183         for bustard_pathname in bustard_dirs:
184             logging.info("Found bustard directory %s" % (bustard_pathname,))
185             b = bustard.bustard(bustard_pathname)
186             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
187             logging.info("Looking for gerald directories in %s" % (pathname,))
188             for gerald_pathname in glob(gerald_glob):
189                 logging.info("Found gerald directory %s" % (gerald_pathname,))
190                 try:
191                     g = gerald.gerald(gerald_pathname)
192                     p = PipelineRun(runfolder)
193                     p.image_analysis = image_analysis
194                     p.bustard = b
195                     p.gerald = g
196                     runs.append(p)
197                 except IOError, e:
198                     logging.error("Ignoring " + str(e))
199
200     datadir = os.path.join(runfolder, 'Data')
201
202     logging.info('Searching for runs in ' + datadir)
203     runs = []
204     # scan for firecrest directories
205     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
206         logging.info('Found firecrest in ' + datadir)
207         image_analysis = firecrest.firecrest(firecrest_pathname)
208         if image_analysis is None:
209             logging.warn(
210                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
211             )
212         else:
213             scan_post_image_analysis(
214                 runs, runfolder, image_analysis, firecrest_pathname
215             )
216     # scan for IPAR directories
217     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
218     # The Intensities directory from the RTA software looks a lot like IPAR
219     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
220     for ipar_pathname in ipar_dirs:
221         logging.info('Found ipar directories in ' + datadir)
222         image_analysis = ipar.ipar(ipar_pathname)
223         if image_analysis is None:
224             logging.warn(
225                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
226             )
227         else:
228             scan_post_image_analysis(
229                 runs, runfolder, image_analysis, ipar_pathname
230             )
231
232     return runs
233
234 def get_specific_run(gerald_dir):
235     """
236     Given a gerald directory, construct a PipelineRun out of its parents
237
238     Basically this allows specifying a particular run instead of the previous
239     get_runs which scans a runfolder for various combinations of
240     firecrest/ipar/bustard/gerald runs.
241     """
242     from htsworkflow.pipelines import firecrest
243     from htsworkflow.pipelines import ipar
244     from htsworkflow.pipelines import bustard
245     from htsworkflow.pipelines import gerald
246
247     gerald_dir = os.path.expanduser(gerald_dir)
248     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
249     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
250
251     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
252
253     logging.info('--- use-run detected options ---')
254     logging.info('runfolder: %s' % (runfolder_dir,))
255     logging.info('image_dir: %s' % (image_dir,))
256     logging.info('bustard_dir: %s' % (bustard_dir,))
257     logging.info('gerald_dir: %s' % (gerald_dir,))
258
259     # find our processed image dir
260     image_run = None
261     # split into parent, and leaf directory
262     # leaf directory should be an IPAR or firecrest directory
263     data_dir, short_image_dir = os.path.split(image_dir)
264     logging.info('data_dir: %s' % (data_dir,))
265     logging.info('short_iamge_dir: %s' % (short_image_dir,))
266
267     # guess which type of image processing directory we have by looking
268     # in the leaf directory name
269     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
270         image_run = firecrest.firecrest(image_dir)
271     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
272         image_run = ipar.ipar(image_dir)
273     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
274         image_run = ipar.ipar(image_dir)
275
276     # if we din't find a run, report the error and return 
277     if image_run is None:
278         msg = '%s does not contain an image processing step' % (image_dir,)
279         logging.error(msg)
280         return None
281
282     # find our base calling
283     base_calling_run = bustard.bustard(bustard_dir)
284     if base_calling_run is None:
285         logging.error('%s does not contain a bustard run' % (bustard_dir,))
286         return None
287
288     # find alignments
289     gerald_run = gerald.gerald(gerald_dir)
290     if gerald_run is None:
291         logging.error('%s does not contain a gerald run' % (gerald_dir,))
292         return None
293
294     p = PipelineRun(runfolder_dir)
295     p.image_analysis = image_run
296     p.bustard = base_calling_run
297     p.gerald = gerald_run
298
299     logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
300     return p
301
302 def extract_run_parameters(runs):
303     """
304     Search through runfolder_path for various runs and grab their parameters
305     """
306     for run in runs:
307       run.save()
308
309 def summarize_mapped_reads(genome_map, mapped_reads):
310     """
311     Summarize per chromosome reads into a genome count
312     But handle spike-in/contamination symlinks seperately.
313     """
314     summarized_reads = {}
315     genome_reads = 0
316     genome = 'unknown'
317     for k, v in mapped_reads.items():
318         path, k = os.path.split(k)
319         if len(path) > 0 and not genome_map.has_key(path):
320             genome = path
321             genome_reads += v
322         else:
323             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
324     summarized_reads[genome] = genome_reads
325     return summarized_reads
326
327 def summarize_lane(gerald, lane_id):
328     report = []
329     summary_results = gerald.summary.lane_results
330     for end in range(len(summary_results)):
331       eland_result = gerald.eland_results.results[end][lane_id]
332       report.append("Sample name %s" % (eland_result.sample_name))
333       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
334       if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
335           cluster = summary_results[end][eland_result.lane_id].cluster
336           report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
337       report.append("Total Reads: %d" % (eland_result.reads))
338
339       if hasattr(eland_result, 'match_codes'):
340           mc = eland_result.match_codes
341           nm = mc['NM']
342           nm_percent = float(nm) / eland_result.reads * 100
343           qc = mc['QC']
344           qc_percent = float(qc) / eland_result.reads * 100
345
346           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
347           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
348           report.append('Unique (0,1,2 mismatches) %d %d %d' % \
349                         (mc['U0'], mc['U1'], mc['U2']))
350           report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
351                         (mc['R0'], mc['R1'], mc['R2']))
352
353       if hasattr(eland_result, 'genome_map'):
354           report.append("Mapped Reads")
355           mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
356           for name, counts in mapped_reads.items():
357             report.append("  %s: %d" % (name, counts))
358
359       report.append('')
360     return report
361
362 def summary_report(runs):
363     """
364     Summarize cluster numbers and mapped read counts for a runfolder
365     """
366     report = []
367     for run in runs:
368         # print a run name?
369         report.append('Summary for %s' % (run.name,))
370         # sort the report
371         eland_keys = run.gerald.eland_results.results[0].keys()
372         eland_keys.sort(alphanum)
373
374         for lane_id in eland_keys:
375             report.extend(summarize_lane(run.gerald, lane_id))
376             report.append('---')
377             report.append('')
378         return os.linesep.join(report)
379
380 def is_compressed(filename):
381     if os.path.splitext(filename)[1] == ".gz":
382         return True
383     elif os.path.splitext(filename)[1] == '.bz2':
384         return True
385     else:
386         return False
387
388 def save_flowcell_reports(data_dir, cycle_dir):
389     """
390     Save the flowcell quality reports
391     """
392     data_dir = os.path.abspath(data_dir)
393     status_file = os.path.join(data_dir, 'Status.xml')
394     reports_dir = os.path.join(data_dir, 'reports')
395     reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
396     if os.path.exists(reports_dir):
397         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
398         if os.path.exists(status_file):
399             cmd_list.extend(['Status.xml', 'Status.xsl'])
400         logging.info("Saving reports from " + reports_dir)
401         cwd = os.getcwd()
402         os.chdir(data_dir)
403         q = QueueCommands([" ".join(cmd_list)])
404         q.run()
405         os.chdir(cwd)
406
407
408 def save_summary_file(gerald_object, cycle_dir):
409     # Copy Summary.htm
410     summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
411     if os.path.exists(summary_path):
412         logging.info('Copying %s to %s' % (summary_path, cycle_dir))
413         shutil.copy(summary_path, cycle_dir)
414     else:
415         logging.info('Summary file %s was not found' % (summary_path,))
416
417 def save_ivc_plot(bustard_object, cycle_dir):
418     """
419     Save the IVC page and its supporting images
420     """
421     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
422     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
423     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
424
425     plot_target_path = os.path.join(cycle_dir, 'Plots')
426
427     if os.path.exists(plot_html):
428         logging.debug("Saving %s" % (plot_html,))
429         logging.debug("Saving %s" % (plot_images,))
430         shutil.copy(plot_html, cycle_dir)
431         if not os.path.exists(plot_target_path):
432             os.mkdir(plot_target_path)
433         for plot_file in glob(plot_images):
434             shutil.copy(plot_file, plot_target_path)
435     else:
436         logging.warning('Missing IVC.html file, not archiving')
437
438
439 def compress_score_files(bustard_object, cycle_dir):
440     """
441     Compress score files into our result directory
442     """
443     # check for g.pathname/Temp a new feature of 1.1rc1
444     scores_path = bustard_object.pathname
445     scores_path_temp = os.path.join(scores_path, 'Temp')
446     if os.path.isdir(scores_path_temp):
447         scores_path = scores_path_temp
448
449     # hopefully we have a directory that contains s_*_score files
450     score_files = []
451     for f in os.listdir(scores_path):
452         if re.match('.*_score.txt', f):
453             score_files.append(f)
454
455     tar_cmd = ['tar', 'c'] + score_files
456     bzip_cmd = [ 'bzip2', '-9', '-c' ]
457     tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
458     tar_dest = open(tar_dest_name, 'w')
459     logging.info("Compressing score files from %s" % (scores_path,))
460     logging.info("Running tar: " + " ".join(tar_cmd[:10]))
461     logging.info("Running bzip2: " + " ".join(bzip_cmd))
462     logging.info("Writing to %s" % (tar_dest_name,))
463
464     env = {'BZIP': '-9'}
465     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
466                            cwd=scores_path)
467     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
468     tar.wait()
469
470
471 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
472     """
473     Compress eland result files into the archive directory
474     """
475     # copy & bzip eland files
476     bz_commands = []
477
478     for lanes_dictionary in gerald_object.eland_results.results:
479         for eland_lane in lanes_dictionary.values():
480             source_name = eland_lane.pathname
481             if source_name is None:
482               logging.info(
483                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
484             else:
485               path, name = os.path.split(source_name)
486               dest_name = os.path.join(cycle_dir, name)
487               logging.info("Saving eland file %s to %s" % \
488                          (source_name, dest_name))
489
490               if is_compressed(name):
491                 logging.info('Already compressed, Saving to %s' % (dest_name,))
492                 shutil.copy(source_name, dest_name)
493               else:
494                 # not compressed
495                 dest_name += '.bz2'
496                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
497                 bz_commands.append(" ".join(args))
498                 #logging.info('Running: %s' % ( " ".join(args) ))
499                 #bzip_dest = open(dest_name, 'w')
500                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
501                 #logging.info('Saving to %s' % (dest_name, ))
502                 #bzip.wait()
503
504     if len(bz_commands) > 0:
505       q = QueueCommands(bz_commands, num_jobs)
506       q.run()
507
508
509 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
510     """
511     Iterate over runfolders in runs extracting the most useful information.
512       * run parameters (in run-*.xml)
513       * eland_result files
514       * score files
515       * Summary.htm
516       * srf files (raw sequence & qualities)
517     """
518     if output_base_dir is None:
519         output_base_dir = os.getcwd()
520
521     for r in runs:
522       result_dir = os.path.join(output_base_dir, r.flowcell_id)
523       logging.info("Using %s as result directory" % (result_dir,))
524       if not os.path.exists(result_dir):
525         os.mkdir(result_dir)
526
527       # create cycle_dir
528       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
529       logging.info("Filling in %s" % (cycle,))
530       cycle_dir = os.path.join(result_dir, cycle)
531       cycle_dir = os.path.abspath(cycle_dir)
532       if os.path.exists(cycle_dir):
533         logging.error("%s already exists, not overwriting" % (cycle_dir,))
534         continue
535       else:
536         os.mkdir(cycle_dir)
537
538       # save run file
539       r.save(cycle_dir)
540
541       # save illumina flowcell status report
542       save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
543
544       # save stuff from bustard
545       # grab IVC plot
546       save_ivc_plot(r.bustard, cycle_dir)
547
548       # build base call saving commands
549       if site is not None:
550         lanes = []
551         for lane in range(1, 9):
552           if r.gerald.lanes[lane].analysis != 'none':
553             lanes.append(lane)
554
555         run_name = srf.pathname_to_run_name(r.pathname)
556         if raw_format == 'qseq':
557             seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
558         elif raw_format == 'srf':
559             seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
560         else:
561             raise ValueError('Unknown --raw-format=%s' % (raw_format))
562         srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
563
564       # save stuff from GERALD
565       # copy stuff out of the main run
566       g = r.gerald
567
568       # save summary file
569       save_summary_file(g, cycle_dir)
570
571       # compress eland result files
572       compress_eland_results(g, cycle_dir, num_jobs)
573
574       # md5 all the compressed files once we're done
575       md5_commands = srf.make_md5_commands(cycle_dir)
576       srf.run_commands(cycle_dir, md5_commands, num_jobs)
577
578 def rm_list(files, dry_run=True):
579     for f in files:
580         if os.path.exists(f):
581             logging.info('deleting %s' % (f,))
582             if not dry_run:
583                 if os.path.isdir(f):
584                     shutil.rmtree(f)
585                 else:
586                     os.unlink(f)
587         else:
588             logging.warn("%s doesn't exist." % (f,))
589
590 def clean_runs(runs, dry_run=True):
591     """
592     Clean up run folders to optimize for compression.
593     """
594     if dry_run:
595         logging.info('In dry-run mode')
596
597     for run in runs:
598         logging.info('Cleaninging %s' % (run.pathname,))
599         # rm RunLog*.xml
600         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
601         rm_list(runlogs, dry_run)
602         # rm pipeline_*.txt
603         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
604         rm_list(pipeline_logs, dry_run)
605         # rm gclog.txt?
606         # rm NetCopy.log? Isn't this robocopy?
607         logs = glob(os.path.join(run.pathname, '*.log'))
608         rm_list(logs, dry_run)
609         # rm nfn.log?
610         # Calibration
611         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
612         rm_list(calibration_dir, dry_run)
613         # rm Images/L*
614         logging.info("Cleaning images")
615         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
616         rm_list(image_dirs, dry_run)
617         # cd Data/C1-*_Firecrest*
618         logging.info("Cleaning intermediate files")
619         # make clean_intermediate
620         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
621             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
622                                              cwd=run.image_analysis.pathname,)
623             clean_process.wait()
624
625
626