7c06e217de63d5733e524ae551f06eea40528e11
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """Core information needed to inspect a runfolder.
2 """
3 from glob import glob
4 import logging
5 import os
6 import re
7 import shutil
8 import stat
9 import subprocess
10 import sys
11 import tarfile
12 import time
13
14 LOGGER = logging.getLogger(__name__)
15
16 from htsworkflow.pipelines import firecrest
17 from htsworkflow.pipelines import ipar
18 from htsworkflow.pipelines import bustard
19 from htsworkflow.pipelines import gerald
20 from htsworkflow.pipelines import ElementTree, \
21                                   EUROPEAN_STRPTIME, EUROPEAN_DATE_RE, \
22                                   VERSION_RE, USER_RE, \
23                                   LANES_PER_FLOWCELL, LANE_LIST
24 from htsworkflow.pipelines.samplekey import LANE_SAMPLE_KEYS
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
27 from htsworkflow.util.queuecommands import QueueCommands
28
29 from htsworkflow.pipelines import srf
30
31 class PipelineRun(object):
32     """Capture "interesting" information about a pipeline run
33     
34     :Variables:
35       - `pathname` location of the root of this runfolder
36       - `serialization_filename` read only property containing name of run xml file
37       - `flowcell_id` read-only property containing flowcell id (bar code)
38       - `datadir` location of the runfolder data dir.
39       - `image_analysis` generic name for Firecrest or IPAR image analysis
40       - `bustard` summary base caller
41       - `gerald` summary of sequence alignment and quality control metrics
42     """
43     XML_VERSION = 1
44     PIPELINE_RUN = 'PipelineRun'
45     FLOWCELL_ID = 'FlowcellID'
46
47     def __init__(self, pathname=None, flowcell_id=None, xml=None):
48         """Initialize a PipelineRun object
49         
50         :Parameters:
51           - `pathname` the root directory of this run folder.
52           - `flowcell_id` the flowcell ID in case it can't be determined
53           - `xml` Allows initializing an object from a serialized xml file.
54           
55         :Types:
56           - `pathname` str
57           - `flowcell_id` str
58           - `ElementTree` str
59         """
60         if pathname is not None:
61           self.pathname = os.path.normpath(pathname)
62         else:
63           self.pathname = None
64         self._name = None
65         self._flowcell_id = flowcell_id
66         self.datadir = None
67         self.suffix = None
68         self.image_analysis = None
69         self.bustard = None
70         self.gerald = None
71
72         if xml is not None:
73           self.set_elements(xml)
74
75     def _get_flowcell_id(self):
76         """Return the flowcell ID
77         
78         Attempts to find the flowcell ID through several mechanisms.
79         """
80         # extract flowcell ID
81         if self._flowcell_id is None:
82             self._flowcell_id = self._get_flowcell_id_from_runinfo()
83         if self._flowcell_id is None:
84             self._flowcell_id = self._get_flowcell_id_from_flowcellid()
85         if self._flowcell_id is None:
86             self._flowcell_id = self._get_flowcell_id_from_path()
87         if self._flowcell_id is None:
88             self._flowcell_id = 'unknown'
89
90             LOGGER.warning(
91                 "Flowcell id was not found, guessing %s" % (
92                     self._flowcell_id))
93
94         return self._flowcell_id
95     flowcell_id = property(_get_flowcell_id)
96
97     def _get_flowcell_id_from_flowcellid(self):
98         """Extract flowcell id from a Config/FlowcellId.xml file
99         
100         :return: flowcell_id or None if not found
101         """
102         config_dir = os.path.join(self.pathname, 'Config')
103         flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
104         if os.path.exists(flowcell_id_path):
105             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
106             return flowcell_id_tree.findtext('Text')
107
108     def _get_flowcell_id_from_runinfo(self):
109         """Read RunInfo file for flowcell id
110
111         :return: flowcell_id or None if not found
112         """
113         runinfo = os.path.join(self.pathname, 'RunInfo.xml')
114         if os.path.exists(runinfo):
115             tree = ElementTree.parse(runinfo)
116             root = tree.getroot()
117             fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
118             if len(fc_nodes) == 1:
119                 return fc_nodes[0].text
120
121     def _get_flowcell_id_from_path(self):
122         """Guess a flowcell name from the path
123
124         :return: flowcell_id or None if not found
125         """
126         path_fields = self.pathname.split('_')
127         if len(path_fields) > 0:
128             # guessing last element of filename
129             return path_fields[-1]
130
131     def _get_runfolder_name(self):
132         if self.gerald is None:
133             return None
134         else:
135             return self.gerald.runfolder_name
136     runfolder_name = property(_get_runfolder_name)
137
138     def _get_run_dirname(self):
139         """Return name of directory to hold result files from one analysis
140         
141         For pre-multiplexing runs this is just the cycle range C1-123
142         For post-multiplexing runs the "suffix" that we add to 
143         differentiate runs will be added to the range.
144         E.g. Unaligned_6mm may produce C1-200_6mm
145         """
146         if self.image_analysis is None:
147             raise ValueError("Not initialized yet")
148         start = self.image_analysis.start
149         stop = self.image_analysis.stop
150         cycle_fragment = "C%d-%d" % (start, stop)
151         if self.suffix:
152             cycle_fragment += self.suffix
153
154         return cycle_fragment
155     run_dirname = property(_get_run_dirname)
156
157     def get_elements(self):
158         """make one master xml file from all of our sub-components.
159         
160         :return: an ElementTree containing all available pipeline
161                  run xml compoents.
162         """
163         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
164         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
165         flowcell.text = self.flowcell_id
166         root.append(self.image_analysis.get_elements())
167         root.append(self.bustard.get_elements())
168         if self.gerald:
169             root.append(self.gerald.get_elements())
170         return root
171
172     def set_elements(self, tree):
173         """Initialize a PipelineRun object from an run.xml ElementTree.
174
175         :param tree: parsed ElementTree
176         :type tree: ElementTree
177         """
178         tag = tree.tag.lower()
179         if tag != PipelineRun.PIPELINE_RUN.lower():
180           raise ValueError('Pipeline Run Expecting %s got %s' % (
181               PipelineRun.PIPELINE_RUN, tag))
182         for element in tree:
183           tag = element.tag.lower()
184           if tag == PipelineRun.FLOWCELL_ID.lower():
185             self._flowcell_id = element.text
186           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
187           # you should only have Firecrest or IPAR, never both of them.
188           elif tag == firecrest.Firecrest.FIRECREST.lower():
189             self.image_analysis = firecrest.Firecrest(xml=element)
190           elif tag == ipar.IPAR.IPAR.lower():
191             self.image_analysis = ipar.IPAR(xml=element)
192           elif tag == bustard.Bustard.BUSTARD.lower():
193             self.bustard = bustard.Bustard(xml=element)
194           elif tag == gerald.Gerald.GERALD.lower():
195             self.gerald = gerald.Gerald(xml=element)
196           elif tag == gerald.CASAVA.GERALD.lower():
197             self.gerald = gerald.CASAVA(xml=element)
198           else:
199             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
200
201     def _get_serialization_filename(self):
202         """Compute the filename for the run xml file
203         
204         Attempts to find the latest date from all of the run 
205         components.
206         
207         :return: filename run_{flowcell id}_{timestamp}.xml
208         :rtype: str
209         """
210         if self._name is None:
211           components = [self.image_analysis, self.bustard, self.gerald]
212           tmax = max([ c.time for c in components if c ])
213           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
214           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
215         return self._name
216     serialization_filename = property(_get_serialization_filename)
217
218     def save(self, destdir=None):
219         """Save a run xml file.
220         
221         :param destdir: Directory name to save too, uses current directory
222                         if not specified.
223         :type destdir: str
224         """
225         if destdir is None:
226             destdir = ''
227         LOGGER.info("Saving run report " + self.serialization_filename)
228         xml = self.get_elements()
229         indent(xml)
230         dest_pathname = os.path.join(destdir, self.serialization_filename)
231         ElementTree.ElementTree(xml).write(dest_pathname)
232
233     def load(self, filename):
234         """Load a run xml into this object.
235         
236         :Parameters:
237           - `filename` location of a run xml file
238           
239         :Types:
240           - `filename` str
241         """
242         LOGGER.info("Loading run report from " + filename)
243         tree = ElementTree.parse(filename).getroot()
244         self.set_elements(tree)
245
246 def load_pipeline_run_xml(pathname):
247     """
248     Load and instantiate a Pipeline run from a run xml file
249
250     :Parameters:
251       - `pathname` location of an run xml file
252
253     :Returns: initialized PipelineRun object
254     """
255     tree = ElementTree.parse(pathname).getroot()
256     run = PipelineRun(xml=tree)
257     return run
258
259 def get_runs(runfolder, flowcell_id=None):
260     """Find all runs associated with a runfolder.
261     
262     We end up with multiple analysis runs as we sometimes
263     need to try with different parameters. This attempts
264     to return a list of all the various runs.
265
266     For example if there are two different GERALD runs, this will
267     generate two different PipelineRun objects, that differ
268     in there gerald component.
269     """
270     datadir = os.path.join(runfolder, 'Data')
271
272     LOGGER.info('Searching for runs in ' + datadir)
273     runs = []
274     # scan for firecrest directories
275     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
276         LOGGER.info('Found firecrest in ' + datadir)
277         image_analysis = firecrest.firecrest(firecrest_pathname)
278         if image_analysis is None:
279             LOGGER.warn(
280                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
281             )
282         else:
283             scan_post_image_analysis(
284                 runs, runfolder, datadir, image_analysis, firecrest_pathname, flowcell_id
285             )
286     # scan for IPAR directories
287     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
288     # The Intensities directory from the RTA software looks a lot like IPAR
289     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
290     for ipar_pathname in ipar_dirs:
291         LOGGER.info('Found ipar directories in ' + datadir)
292         image_analysis = ipar.ipar(ipar_pathname)
293         if image_analysis is None:
294             LOGGER.warn(
295                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
296             )
297         else:
298             scan_post_image_analysis(
299                 runs, runfolder, datadir, image_analysis, ipar_pathname, flowcell_id
300             )
301
302     return runs
303
304 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
305                              pathname, flowcell_id):
306     added = build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id)
307     # If we're a multiplexed run, don't look for older run type.
308     if added > 0:
309         return
310
311     LOGGER.info("Looking for bustard directories in %s" % (pathname,))
312     bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
313     # RTA BaseCalls looks enough like Bustard.
314     bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
315     for bustard_pathname in bustard_dirs:
316         LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
317         b = bustard.bustard(bustard_pathname)
318         build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname,
319                           runfolder, flowcell_id)
320
321
322 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder,
323                       flowcell_id):
324     start = len(runs)
325     gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
326     LOGGER.info("Looking for gerald directories in %s" % (pathname,))
327     for gerald_pathname in glob(gerald_glob):
328         LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
329         try:
330             g = gerald.gerald(gerald_pathname)
331             p = PipelineRun(runfolder, flowcell_id)
332             p.datadir = datadir
333             p.image_analysis = image_analysis
334             p.bustard = b
335             p.gerald = g
336             runs.append(p)
337         except IOError, e:
338             LOGGER.error("Ignoring " + str(e))
339     return len(runs) - start
340
341
342 def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id):
343     start = len(runs)
344     aligned_glob = os.path.join(runfolder, 'Aligned*')
345     unaligned_glob = os.path.join(runfolder, 'Unaligned*')
346
347     aligned_paths = glob(aligned_glob)
348     unaligned_paths = glob(unaligned_glob)
349
350     matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths)
351     LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths))
352
353     for aligned, unaligned, suffix in matched_paths:
354         if unaligned is None:
355             LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned)
356             continue
357
358         try:
359             p = PipelineRun(runfolder, flowcell_id)
360             p.datadir = datadir
361             p.suffix = suffix
362             p.image_analysis = image_analysis
363             p.bustard = bustard.bustard(unaligned)
364             assert p.bustard
365             if aligned:
366                 p.gerald = gerald.gerald(aligned)
367             runs.append(p)
368         except IOError, e:
369             LOGGER.error("Ignoring " + str(e))
370     return len(runs) - start
371
372 def hiseq_match_aligned_unaligned(aligned, unaligned):
373     """Match aligned and unaligned folders from seperate lists
374     """
375     unaligned_suffix_re = re.compile('Unaligned(?P<suffix>[\w]*)')
376
377     aligned_by_suffix = build_dir_dict_by_suffix('Aligned', aligned)
378     unaligned_by_suffix = build_dir_dict_by_suffix('Unaligned', unaligned)
379
380     keys = set(aligned_by_suffix.keys()).union(set(unaligned_by_suffix.keys()))
381
382     matches = []
383     for key in keys:
384         a = aligned_by_suffix.get(key)
385         u = unaligned_by_suffix.get(key)
386         matches.append((a, u, key))
387     return matches
388
389 def build_dir_dict_by_suffix(prefix, dirnames):
390     """Build a dictionary indexed by suffix of last directory name.
391
392     It assumes a constant prefix
393     """
394     regex = re.compile('%s(?P<suffix>[\w]*)' % (prefix,))
395
396     by_suffix = {}
397     for absname in dirnames:
398         basename = os.path.basename(absname)
399         match = regex.match(basename)
400         if match:
401             by_suffix[match.group('suffix')] = absname
402     return by_suffix
403
404 def get_specific_run(gerald_dir):
405     """
406     Given a gerald directory, construct a PipelineRun out of its parents
407
408     Basically this allows specifying a particular run instead of the previous
409     get_runs which scans a runfolder for various combinations of
410     firecrest/ipar/bustard/gerald runs.
411     """
412     from htsworkflow.pipelines import firecrest
413     from htsworkflow.pipelines import ipar
414     from htsworkflow.pipelines import bustard
415     from htsworkflow.pipelines import gerald
416
417     gerald_dir = os.path.expanduser(gerald_dir)
418     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
419     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
420
421     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
422
423     LOGGER.info('--- use-run detected options ---')
424     LOGGER.info('runfolder: %s' % (runfolder_dir,))
425     LOGGER.info('image_dir: %s' % (image_dir,))
426     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
427     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
428
429     # find our processed image dir
430     image_run = None
431     # split into parent, and leaf directory
432     # leaf directory should be an IPAR or firecrest directory
433     data_dir, short_image_dir = os.path.split(image_dir)
434     LOGGER.info('data_dir: %s' % (data_dir,))
435     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
436
437     # guess which type of image processing directory we have by looking
438     # in the leaf directory name
439     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
440         image_run = firecrest.firecrest(image_dir)
441     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
442         image_run = ipar.ipar(image_dir)
443     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
444         image_run = ipar.ipar(image_dir)
445
446     # if we din't find a run, report the error and return
447     if image_run is None:
448         msg = '%s does not contain an image processing step' % (image_dir,)
449         LOGGER.error(msg)
450         return None
451
452     # find our base calling
453     base_calling_run = bustard.bustard(bustard_dir)
454     if base_calling_run is None:
455         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
456         return None
457
458     # find alignments
459     gerald_run = gerald.gerald(gerald_dir)
460     if gerald_run is None:
461         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
462         return None
463
464     p = PipelineRun(runfolder_dir)
465     p.image_analysis = image_run
466     p.bustard = base_calling_run
467     p.gerald = gerald_run
468
469     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
470     return p
471
472 def extract_run_parameters(runs):
473     """
474     Search through runfolder_path for various runs and grab their parameters
475     """
476     for run in runs:
477       run.save()
478
479 def summarize_mapped_reads(genome_map, mapped_reads):
480     """
481     Summarize per chromosome reads into a genome count
482     But handle spike-in/contamination symlinks seperately.
483     """
484     summarized_reads = {}
485     genome_reads = 0
486     genome = 'unknown'
487     for k, v in mapped_reads.items():
488         path, k = os.path.split(k)
489         if len(path) > 0 and path not in genome_map:
490             genome = path
491             genome_reads += v
492         else:
493             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
494     summarized_reads[genome] = genome_reads
495     return summarized_reads
496
497 def summarize_lane(gerald, lane_id):
498     report = []
499     lane_results = gerald.summary.lane_results
500     eland_result = gerald.eland_results[lane_id]
501     report.append("Sample name %s" % (eland_result.sample_name))
502     report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
503
504     if lane_id.read < len(lane_results) and \
505            lane_id.lane in lane_results[lane_id.read]:
506         summary_results = lane_results[lane_id.read][lane_id.lane]
507         cluster = summary_results.cluster
508         report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
509     report.append("Total Reads: %d" % (eland_result.reads))
510
511     if hasattr(eland_result, 'match_codes'):
512         mc = eland_result.match_codes
513         nm = mc['NM']
514         nm_percent = float(nm) / eland_result.reads * 100
515         qc = mc['QC']
516         qc_percent = float(qc) / eland_result.reads * 100
517
518         report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
519         report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
520         report.append('Unique (0,1,2 mismatches) %d %d %d' % \
521                       (mc['U0'], mc['U1'], mc['U2']))
522         report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
523                       (mc['R0'], mc['R1'], mc['R2']))
524
525     if hasattr(eland_result, 'genome_map'):
526         report.append("Mapped Reads")
527         mapped_reads = summarize_mapped_reads(eland_result.genome_map,
528                                               eland_result.mapped_reads)
529         for name, counts in mapped_reads.items():
530             report.append("  %s: %d" % (name, counts))
531
532         report.append('')
533     return report
534
535 def summary_report(runs):
536     """
537     Summarize cluster numbers and mapped read counts for a runfolder
538     """
539     report = []
540     eland_keys = []
541     for run in runs:
542         # print a run name?
543         report.append('Summary for %s' % (run.serialization_filename,))
544         # sort the report
545         if run.gerald:
546             eland_keys = sorted(run.gerald.eland_results.keys())
547         else:
548             report.append("Alignment not done, no report possible")
549
550     for lane_id in eland_keys:
551         report.extend(summarize_lane(run.gerald, lane_id))
552         report.append('---')
553         report.append('')
554     return os.linesep.join(report)
555
556 def is_compressed(filename):
557     if os.path.splitext(filename)[1] == ".gz":
558         return True
559     elif os.path.splitext(filename)[1] == '.bz2':
560         return True
561     else:
562         return False
563
564 def save_flowcell_reports(data_dir, run_dirname):
565     """
566     Save the flowcell quality reports
567     """
568     data_dir = os.path.abspath(data_dir)
569     status_file = os.path.join(data_dir, 'Status.xml')
570     reports_dir = os.path.join(data_dir, 'reports')
571     reports_dest = os.path.join(run_dirname, 'flowcell-reports.tar.bz2')
572     if os.path.exists(reports_dir):
573         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
574         if os.path.exists(status_file):
575             cmd_list.extend(['Status.xml', 'Status.xsl'])
576         LOGGER.info("Saving reports from " + reports_dir)
577         cwd = os.getcwd()
578         os.chdir(data_dir)
579         q = QueueCommands([" ".join(cmd_list)])
580         q.run()
581         os.chdir(cwd)
582
583
584 def save_summary_file(pipeline, run_dirname):
585     # Copy Summary.htm
586     gerald_object = pipeline.gerald
587     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
588     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
589     if os.path.exists(gerald_summary):
590         LOGGER.info('Copying %s to %s' % (gerald_summary, run_dirname))
591         shutil.copy(gerald_summary, run_dirname)
592     elif os.path.exists(status_files_summary):
593         LOGGER.info('Copying %s to %s' % (status_files_summary, run_dirname))
594         shutil.copy(status_files_summary, run_dirname)
595     else:
596         LOGGER.info('Summary file %s was not found' % (summary_path,))
597
598 def save_ivc_plot(bustard_object, run_dirname):
599     """
600     Save the IVC page and its supporting images
601     """
602     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
603     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
604     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
605
606     plot_target_path = os.path.join(run_dirname, 'Plots')
607
608     if os.path.exists(plot_html):
609         LOGGER.debug("Saving %s" % (plot_html,))
610         LOGGER.debug("Saving %s" % (plot_images,))
611         shutil.copy(plot_html, run_dirname)
612         if not os.path.exists(plot_target_path):
613             os.mkdir(plot_target_path)
614         for plot_file in glob(plot_images):
615             shutil.copy(plot_file, plot_target_path)
616     else:
617         LOGGER.warning('Missing IVC.html file, not archiving')
618
619
620 def compress_score_files(bustard_object, run_dirname):
621     """
622     Compress score files into our result directory
623     """
624     # check for g.pathname/Temp a new feature of 1.1rc1
625     scores_path = bustard_object.pathname
626     scores_path_temp = os.path.join(scores_path, 'Temp')
627     if os.path.isdir(scores_path_temp):
628         scores_path = scores_path_temp
629
630     # hopefully we have a directory that contains s_*_score files
631     score_files = []
632     for f in os.listdir(scores_path):
633         if re.match('.*_score.txt', f):
634             score_files.append(f)
635
636     tar_cmd = ['tar', 'c'] + score_files
637     bzip_cmd = [ 'bzip2', '-9', '-c' ]
638     tar_dest_name = os.path.join(run_dirname, 'scores.tar.bz2')
639     tar_dest = open(tar_dest_name, 'w')
640     LOGGER.info("Compressing score files from %s" % (scores_path,))
641     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
642     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
643     LOGGER.info("Writing to %s" % (tar_dest_name,))
644
645     env = {'BZIP': '-9'}
646     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
647                            cwd=scores_path)
648     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
649     tar.wait()
650
651
652 def compress_eland_results(gerald_object, run_dirname, num_jobs=1):
653     """
654     Compress eland result files into the archive directory
655     """
656     # copy & bzip eland files
657     bz_commands = []
658
659     for key in gerald_object.eland_results:
660         eland_lane = gerald_object.eland_results[key]
661         for source_name in eland_lane.pathnames:
662             if source_name is None:
663               LOGGER.info(
664                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
665             else:
666               path, name = os.path.split(source_name)
667               dest_name = os.path.join(run_dirname, name)
668               LOGGER.info("Saving eland file %s to %s" % \
669                          (source_name, dest_name))
670
671               if is_compressed(name):
672                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
673                 shutil.copy(source_name, dest_name)
674               else:
675                 # not compressed
676                 dest_name += '.bz2'
677                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
678                 bz_commands.append(" ".join(args))
679                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
680                 #bzip_dest = open(dest_name, 'w')
681                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
682                 #LOGGER.info('Saving to %s' % (dest_name, ))
683                 #bzip.wait()
684
685     if len(bz_commands) > 0:
686       q = QueueCommands(bz_commands, num_jobs)
687       q.run()
688
689
690 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
691     """
692     Iterate over runfolders in runs extracting the most useful information.
693       * run parameters (in run-*.xml)
694       * eland_result files
695       * score files
696       * Summary.htm
697       * srf files (raw sequence & qualities)
698     """
699     if output_base_dir is None:
700         output_base_dir = os.getcwd()
701
702     for r in runs:
703         result_dir = os.path.join(output_base_dir, r.flowcell_id)
704         LOGGER.info("Using %s as result directory" % (result_dir,))
705         if not os.path.exists(result_dir):
706             os.mkdir(result_dir)
707
708         # create directory to add this runs results to
709         LOGGER.info("Filling in %s" % (r.run_dirname,))
710         run_dirname = os.path.join(result_dir, r.run_dirname)
711         run_dirname = os.path.abspath(run_dirname)
712         if os.path.exists(run_dirname):
713             LOGGER.error("%s already exists, not overwriting" % (run_dirname,))
714             continue
715         else:
716             os.mkdir(run_dirname)
717
718         # save run file
719         r.save(run_dirname)
720
721         # save illumina flowcell status report
722         save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
723                               run_dirname)
724
725         # save stuff from bustard
726         # grab IVC plot
727         save_ivc_plot(r.bustard, run_dirname)
728
729         # build base call saving commands
730         if site is not None:
731             save_raw_data(num_jobs, r, site, raw_format, run_dirname)
732
733         # save stuff from GERALD
734         # copy stuff out of the main run
735         if r.gerald:
736             g = r.gerald
737
738             # save summary file
739             save_summary_file(r, run_dirname)
740
741             # compress eland result files
742             compress_eland_results(g, run_dirname, num_jobs)
743
744         # md5 all the compressed files once we're done
745         md5_commands = srf.make_md5_commands(run_dirname)
746         srf.run_commands(run_dirname, md5_commands, num_jobs)
747
748 def save_raw_data(num_jobs, r, site, raw_format, run_dirname):
749     lanes = []
750     if r.gerald:
751         for lane in r.gerald.lanes:
752             lane_parameters = r.gerald.lanes.get(lane, None)
753             if lane_parameters is not None:
754                 lanes.append(lane)
755     else:
756         # assume default list of lanes
757         lanes = LANE_SAMPLE_KEYS
758
759     run_name = srf.pathname_to_run_name(r.pathname)
760     seq_cmds = []
761     if raw_format is None:
762         raw_format = r.bustard.sequence_format
763
764     LOGGER.info("Raw Format is: %s" % (raw_format, ))
765     if raw_format == 'fastq':
766         LOGGER.info("Reading fastq files from %s", r.bustard.pathname)
767         rawpath = os.path.join(r.pathname, r.bustard.pathname)
768         LOGGER.info("raw data = %s" % (rawpath,))
769         srf.copy_hiseq_project_fastqs(run_name, rawpath, site, run_dirname)
770     elif raw_format == 'qseq':
771         seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, run_dirname)
772     elif raw_format == 'srf':
773         seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, run_dirname, 0)
774     else:
775         raise ValueError('Unknown --raw-format=%s' % (raw_format))
776     srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
777
778 def rm_list(files, dry_run=True):
779     for f in files:
780         if os.path.exists(f):
781             LOGGER.info('deleting %s' % (f,))
782             if not dry_run:
783                 if os.path.isdir(f):
784                     shutil.rmtree(f)
785                 else:
786                     os.unlink(f)
787         else:
788             LOGGER.warn("%s doesn't exist." % (f,))
789
790 def clean_runs(runs, dry_run=True):
791     """
792     Clean up run folders to optimize for compression.
793     """
794     if dry_run:
795         LOGGER.info('In dry-run mode')
796
797     for run in runs:
798         LOGGER.info('Cleaninging %s' % (run.pathname,))
799         # rm RunLog*.xml
800         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
801         rm_list(runlogs, dry_run)
802         # rm pipeline_*.txt
803         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
804         rm_list(pipeline_logs, dry_run)
805         # rm gclog.txt?
806         # rm NetCopy.log? Isn't this robocopy?
807         logs = glob(os.path.join(run.pathname, '*.log'))
808         rm_list(logs, dry_run)
809         # rm nfn.log?
810         # Calibration
811         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
812         rm_list(calibration_dir, dry_run)
813         # rm Images/L*
814         LOGGER.info("Cleaning images")
815         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
816         rm_list(image_dirs, dry_run)
817         # rm ReadPrep
818         LOGGER.info("Cleaning ReadPrep*")
819         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
820         rm_list(read_prep_dirs, dry_run)
821         # rm ReadPrep
822         LOGGER.info("Cleaning Thubmnail_images")
823         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
824         rm_list(thumbnail_dirs, dry_run)
825
826         # make clean_intermediate
827         logging.info("Cleaning intermediate files")
828         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
829             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
830                                              cwd=run.image_analysis.pathname,)
831             clean_process.wait()
832
833
834