Recent IPAR xml config blocks include the runfolder name
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """Core information needed to inspect a runfolder.
2 """
3 from glob import glob
4 import logging
5 import os
6 import re
7 import shutil
8 import stat
9 import subprocess
10 import sys
11 import tarfile
12 import time
13
14 LOGGER = logging.getLogger(__name__)
15
16 from htsworkflow.pipelines import firecrest
17 from htsworkflow.pipelines import ipar
18 from htsworkflow.pipelines import bustard
19 from htsworkflow.pipelines import gerald
20 from htsworkflow.pipelines import ElementTree, \
21                                   EUROPEAN_STRPTIME, EUROPEAN_DATE_RE, \
22                                   VERSION_RE, USER_RE, \
23                                   LANES_PER_FLOWCELL, LANE_LIST
24 from htsworkflow.pipelines.samplekey import LANE_SAMPLE_KEYS
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
27 from htsworkflow.util.queuecommands import QueueCommands
28
29 from htsworkflow.pipelines import srf
30
31 class PipelineRun(object):
32     """Capture "interesting" information about a pipeline run
33     
34     :Variables:
35       - `pathname` location of the root of this runfolder
36       - `serialization_filename` read only property containing name of run xml file
37       - `flowcell_id` read-only property containing flowcell id (bar code)
38       - `datadir` location of the runfolder data dir.
39       - `image_analysis` generic name for Firecrest or IPAR image analysis
40       - `bustard` summary base caller
41       - `gerald` summary of sequence alignment and quality control metrics
42     """
43     XML_VERSION = 1
44     PIPELINE_RUN = 'PipelineRun'
45     FLOWCELL_ID = 'FlowcellID'
46
47     def __init__(self, pathname=None, flowcell_id=None, xml=None):
48         """Initialize a PipelineRun object
49         
50         :Parameters:
51           - `pathname` the root directory of this run folder.
52           - `flowcell_id` the flowcell ID in case it can't be determined
53           - `xml` Allows initializing an object from a serialized xml file.
54           
55         :Types:
56           - `pathname` str
57           - `flowcell_id` str
58           - `ElementTree` str
59         """
60         if pathname is not None:
61           self.pathname = os.path.normpath(pathname)
62         else:
63           self.pathname = None
64         self._name = None
65         self._flowcell_id = flowcell_id
66         self.datadir = None
67         self.suffix = None
68         self.image_analysis = None
69         self.bustard = None
70         self.gerald = None
71
72         if xml is not None:
73           self.set_elements(xml)
74
75     def _get_flowcell_id(self):
76         """Return the flowcell ID
77         
78         Attempts to find the flowcell ID through several mechanisms.
79         """
80         # extract flowcell ID
81         if self._flowcell_id is None:
82             self._flowcell_id = self._get_flowcell_id_from_runinfo()
83         if self._flowcell_id is None:
84             self._flowcell_id = self._get_flowcell_id_from_flowcellid()
85         if self._flowcell_id is None:
86             self._flowcell_id = self._get_flowcell_id_from_path()
87         if self._flowcell_id is None:
88             self._flowcell_id = 'unknown'
89
90             LOGGER.warning(
91                 "Flowcell id was not found, guessing %s" % (
92                     self._flowcell_id))
93
94         return self._flowcell_id
95     flowcell_id = property(_get_flowcell_id)
96
97     def _get_flowcell_id_from_flowcellid(self):
98         """Extract flowcell id from a Config/FlowcellId.xml file
99         
100         :return: flowcell_id or None if not found
101         """
102         config_dir = os.path.join(self.pathname, 'Config')
103         flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
104         if os.path.exists(flowcell_id_path):
105             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
106             return flowcell_id_tree.findtext('Text')
107
108     def _get_flowcell_id_from_runinfo(self):
109         """Read RunInfo file for flowcell id
110
111         :return: flowcell_id or None if not found
112         """
113         runinfo = os.path.join(self.pathname, 'RunInfo.xml')
114         if os.path.exists(runinfo):
115             tree = ElementTree.parse(runinfo)
116             root = tree.getroot()
117             fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
118             if len(fc_nodes) == 1:
119                 return fc_nodes[0].text
120
121     def _get_flowcell_id_from_path(self):
122         """Guess a flowcell name from the path
123
124         :return: flowcell_id or None if not found
125         """
126         path_fields = self.pathname.split('_')
127         if len(path_fields) > 0:
128             # guessing last element of filename
129             return path_fields[-1]
130
131     def _get_runfolder_name(self):
132         if self.gerald:
133             return self.gerald.runfolder_name
134         elif hasattr(self.image_analysis, 'runfolder_name'):
135             return self.image_analysis.runfolder_name
136         else:
137             return None
138     runfolder_name = property(_get_runfolder_name)
139
140     def _get_run_dirname(self):
141         """Return name of directory to hold result files from one analysis
142         
143         For pre-multiplexing runs this is just the cycle range C1-123
144         For post-multiplexing runs the "suffix" that we add to 
145         differentiate runs will be added to the range.
146         E.g. Unaligned_6mm may produce C1-200_6mm
147         """
148         if self.image_analysis is None:
149             raise ValueError("Not initialized yet")
150         start = self.image_analysis.start
151         stop = self.image_analysis.stop
152         cycle_fragment = "C%d-%d" % (start, stop)
153         if self.suffix:
154             cycle_fragment += self.suffix
155
156         return cycle_fragment
157     run_dirname = property(_get_run_dirname)
158
159     def get_elements(self):
160         """make one master xml file from all of our sub-components.
161         
162         :return: an ElementTree containing all available pipeline
163                  run xml compoents.
164         """
165         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
166         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
167         flowcell.text = self.flowcell_id
168         root.append(self.image_analysis.get_elements())
169         root.append(self.bustard.get_elements())
170         if self.gerald:
171             root.append(self.gerald.get_elements())
172         return root
173
174     def set_elements(self, tree):
175         """Initialize a PipelineRun object from an run.xml ElementTree.
176
177         :param tree: parsed ElementTree
178         :type tree: ElementTree
179         """
180         tag = tree.tag.lower()
181         if tag != PipelineRun.PIPELINE_RUN.lower():
182           raise ValueError('Pipeline Run Expecting %s got %s' % (
183               PipelineRun.PIPELINE_RUN, tag))
184         for element in tree:
185           tag = element.tag.lower()
186           if tag == PipelineRun.FLOWCELL_ID.lower():
187             self._flowcell_id = element.text
188           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
189           # you should only have Firecrest or IPAR, never both of them.
190           elif tag == firecrest.Firecrest.FIRECREST.lower():
191             self.image_analysis = firecrest.Firecrest(xml=element)
192           elif tag == ipar.IPAR.IPAR.lower():
193             self.image_analysis = ipar.IPAR(xml=element)
194           elif tag == bustard.Bustard.BUSTARD.lower():
195             self.bustard = bustard.Bustard(xml=element)
196           elif tag == gerald.Gerald.GERALD.lower():
197             self.gerald = gerald.Gerald(xml=element)
198           elif tag == gerald.CASAVA.GERALD.lower():
199             self.gerald = gerald.CASAVA(xml=element)
200           else:
201             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
202
203     def _get_serialization_filename(self):
204         """Compute the filename for the run xml file
205         
206         Attempts to find the latest date from all of the run 
207         components.
208         
209         :return: filename run_{flowcell id}_{timestamp}.xml
210         :rtype: str
211         """
212         if self._name is None:
213           components = [self.image_analysis, self.bustard, self.gerald]
214           tmax = max([ c.time for c in components if c ])
215           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
216           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
217         return self._name
218     serialization_filename = property(_get_serialization_filename)
219
220     def save(self, destdir=None):
221         """Save a run xml file.
222         
223         :param destdir: Directory name to save too, uses current directory
224                         if not specified.
225         :type destdir: str
226         """
227         if destdir is None:
228             destdir = ''
229         LOGGER.info("Saving run report " + self.serialization_filename)
230         xml = self.get_elements()
231         indent(xml)
232         dest_pathname = os.path.join(destdir, self.serialization_filename)
233         ElementTree.ElementTree(xml).write(dest_pathname)
234
235     def load(self, filename):
236         """Load a run xml into this object.
237         
238         :Parameters:
239           - `filename` location of a run xml file
240           
241         :Types:
242           - `filename` str
243         """
244         LOGGER.info("Loading run report from " + filename)
245         tree = ElementTree.parse(filename).getroot()
246         self.set_elements(tree)
247
248 def load_pipeline_run_xml(pathname):
249     """
250     Load and instantiate a Pipeline run from a run xml file
251
252     :Parameters:
253       - `pathname` location of an run xml file
254
255     :Returns: initialized PipelineRun object
256     """
257     tree = ElementTree.parse(pathname).getroot()
258     run = PipelineRun(xml=tree)
259     return run
260
261 def get_runs(runfolder, flowcell_id=None):
262     """Find all runs associated with a runfolder.
263     
264     We end up with multiple analysis runs as we sometimes
265     need to try with different parameters. This attempts
266     to return a list of all the various runs.
267
268     For example if there are two different GERALD runs, this will
269     generate two different PipelineRun objects, that differ
270     in there gerald component.
271     """
272     datadir = os.path.join(runfolder, 'Data')
273
274     LOGGER.info('Searching for runs in ' + datadir)
275     runs = []
276     # scan for firecrest directories
277     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
278         LOGGER.info('Found firecrest in ' + datadir)
279         image_analysis = firecrest.firecrest(firecrest_pathname)
280         if image_analysis is None:
281             LOGGER.warn(
282                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
283             )
284         else:
285             scan_post_image_analysis(
286                 runs, runfolder, datadir, image_analysis, firecrest_pathname, flowcell_id
287             )
288     # scan for IPAR directories
289     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
290     # The Intensities directory from the RTA software looks a lot like IPAR
291     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
292     for ipar_pathname in ipar_dirs:
293         LOGGER.info('Found ipar directories in ' + datadir)
294         image_analysis = ipar.ipar(ipar_pathname)
295         if image_analysis is None:
296             LOGGER.warn(
297                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
298             )
299         else:
300             scan_post_image_analysis(
301                 runs, runfolder, datadir, image_analysis, ipar_pathname, flowcell_id
302             )
303
304     return runs
305
306 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
307                              pathname, flowcell_id):
308     added = build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id)
309     # If we're a multiplexed run, don't look for older run type.
310     if added > 0:
311         return
312
313     LOGGER.info("Looking for bustard directories in %s" % (pathname,))
314     bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
315     # RTA BaseCalls looks enough like Bustard.
316     bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
317     for bustard_pathname in bustard_dirs:
318         LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
319         b = bustard.bustard(bustard_pathname)
320         build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname,
321                           runfolder, flowcell_id)
322
323
324 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder,
325                       flowcell_id):
326     start = len(runs)
327     gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
328     LOGGER.info("Looking for gerald directories in %s" % (pathname,))
329     for gerald_pathname in glob(gerald_glob):
330         LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
331         try:
332             g = gerald.gerald(gerald_pathname)
333             p = PipelineRun(runfolder, flowcell_id)
334             p.datadir = datadir
335             p.image_analysis = image_analysis
336             p.bustard = b
337             p.gerald = g
338             runs.append(p)
339         except IOError, e:
340             LOGGER.error("Ignoring " + str(e))
341     return len(runs) - start
342
343
344 def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id):
345     start = len(runs)
346     aligned_glob = os.path.join(runfolder, 'Aligned*')
347     unaligned_glob = os.path.join(runfolder, 'Unaligned*')
348
349     aligned_paths = glob(aligned_glob)
350     unaligned_paths = glob(unaligned_glob)
351
352     matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths)
353     LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths))
354
355     for aligned, unaligned, suffix in matched_paths:
356         if unaligned is None:
357             LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned)
358             continue
359
360         try:
361             p = PipelineRun(runfolder, flowcell_id)
362             p.datadir = datadir
363             p.suffix = suffix
364             p.image_analysis = image_analysis
365             p.bustard = bustard.bustard(unaligned)
366             assert p.bustard
367             if aligned:
368                 p.gerald = gerald.gerald(aligned)
369             runs.append(p)
370         except IOError, e:
371             LOGGER.error("Ignoring " + str(e))
372     return len(runs) - start
373
374 def hiseq_match_aligned_unaligned(aligned, unaligned):
375     """Match aligned and unaligned folders from seperate lists
376     """
377     unaligned_suffix_re = re.compile('Unaligned(?P<suffix>[\w]*)')
378
379     aligned_by_suffix = build_dir_dict_by_suffix('Aligned', aligned)
380     unaligned_by_suffix = build_dir_dict_by_suffix('Unaligned', unaligned)
381
382     keys = set(aligned_by_suffix.keys()).union(set(unaligned_by_suffix.keys()))
383
384     matches = []
385     for key in keys:
386         a = aligned_by_suffix.get(key)
387         u = unaligned_by_suffix.get(key)
388         matches.append((a, u, key))
389     return matches
390
391 def build_dir_dict_by_suffix(prefix, dirnames):
392     """Build a dictionary indexed by suffix of last directory name.
393
394     It assumes a constant prefix
395     """
396     regex = re.compile('%s(?P<suffix>[\w]*)' % (prefix,))
397
398     by_suffix = {}
399     for absname in dirnames:
400         basename = os.path.basename(absname)
401         match = regex.match(basename)
402         if match:
403             by_suffix[match.group('suffix')] = absname
404     return by_suffix
405
406 def get_specific_run(gerald_dir):
407     """
408     Given a gerald directory, construct a PipelineRun out of its parents
409
410     Basically this allows specifying a particular run instead of the previous
411     get_runs which scans a runfolder for various combinations of
412     firecrest/ipar/bustard/gerald runs.
413     """
414     from htsworkflow.pipelines import firecrest
415     from htsworkflow.pipelines import ipar
416     from htsworkflow.pipelines import bustard
417     from htsworkflow.pipelines import gerald
418
419     gerald_dir = os.path.expanduser(gerald_dir)
420     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
421     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
422
423     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
424
425     LOGGER.info('--- use-run detected options ---')
426     LOGGER.info('runfolder: %s' % (runfolder_dir,))
427     LOGGER.info('image_dir: %s' % (image_dir,))
428     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
429     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
430
431     # find our processed image dir
432     image_run = None
433     # split into parent, and leaf directory
434     # leaf directory should be an IPAR or firecrest directory
435     data_dir, short_image_dir = os.path.split(image_dir)
436     LOGGER.info('data_dir: %s' % (data_dir,))
437     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
438
439     # guess which type of image processing directory we have by looking
440     # in the leaf directory name
441     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
442         image_run = firecrest.firecrest(image_dir)
443     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
444         image_run = ipar.ipar(image_dir)
445     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
446         image_run = ipar.ipar(image_dir)
447
448     # if we din't find a run, report the error and return
449     if image_run is None:
450         msg = '%s does not contain an image processing step' % (image_dir,)
451         LOGGER.error(msg)
452         return None
453
454     # find our base calling
455     base_calling_run = bustard.bustard(bustard_dir)
456     if base_calling_run is None:
457         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
458         return None
459
460     # find alignments
461     gerald_run = gerald.gerald(gerald_dir)
462     if gerald_run is None:
463         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
464         return None
465
466     p = PipelineRun(runfolder_dir)
467     p.image_analysis = image_run
468     p.bustard = base_calling_run
469     p.gerald = gerald_run
470
471     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
472     return p
473
474 def extract_run_parameters(runs):
475     """
476     Search through runfolder_path for various runs and grab their parameters
477     """
478     for run in runs:
479       run.save()
480
481 def summarize_mapped_reads(genome_map, mapped_reads):
482     """
483     Summarize per chromosome reads into a genome count
484     But handle spike-in/contamination symlinks seperately.
485     """
486     summarized_reads = {}
487     genome_reads = 0
488     genome = 'unknown'
489     for k, v in mapped_reads.items():
490         path, k = os.path.split(k)
491         if len(path) > 0 and path not in genome_map:
492             genome = path
493             genome_reads += v
494         else:
495             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
496     summarized_reads[genome] = genome_reads
497     return summarized_reads
498
499 def summarize_lane(gerald, lane_id):
500     report = []
501     lane_results = gerald.summary.lane_results
502     eland_result = gerald.eland_results[lane_id]
503     report.append("Sample name %s" % (eland_result.sample_name))
504     report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
505
506     if lane_id.read < len(lane_results) and \
507            lane_id.lane in lane_results[lane_id.read]:
508         summary_results = lane_results[lane_id.read][lane_id.lane]
509         cluster = summary_results.cluster
510         report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
511     report.append("Total Reads: %d" % (eland_result.reads))
512
513     if hasattr(eland_result, 'match_codes'):
514         mc = eland_result.match_codes
515         nm = mc['NM']
516         nm_percent = float(nm) / eland_result.reads * 100
517         qc = mc['QC']
518         qc_percent = float(qc) / eland_result.reads * 100
519
520         report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
521         report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
522         report.append('Unique (0,1,2 mismatches) %d %d %d' % \
523                       (mc['U0'], mc['U1'], mc['U2']))
524         report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
525                       (mc['R0'], mc['R1'], mc['R2']))
526
527     if hasattr(eland_result, 'genome_map'):
528         report.append("Mapped Reads")
529         mapped_reads = summarize_mapped_reads(eland_result.genome_map,
530                                               eland_result.mapped_reads)
531         for name, counts in mapped_reads.items():
532             report.append("  %s: %d" % (name, counts))
533
534         report.append('')
535     return report
536
537 def summary_report(runs):
538     """
539     Summarize cluster numbers and mapped read counts for a runfolder
540     """
541     report = []
542     eland_keys = []
543     for run in runs:
544         # print a run name?
545         report.append('Summary for %s' % (run.serialization_filename,))
546         # sort the report
547         if run.gerald:
548             eland_keys = sorted(run.gerald.eland_results.keys())
549         else:
550             report.append("Alignment not done, no report possible")
551
552     for lane_id in eland_keys:
553         report.extend(summarize_lane(run.gerald, lane_id))
554         report.append('---')
555         report.append('')
556     return os.linesep.join(report)
557
558 def is_compressed(filename):
559     if os.path.splitext(filename)[1] == ".gz":
560         return True
561     elif os.path.splitext(filename)[1] == '.bz2':
562         return True
563     else:
564         return False
565
566 def save_flowcell_reports(data_dir, run_dirname):
567     """
568     Save the flowcell quality reports
569     """
570     data_dir = os.path.abspath(data_dir)
571     status_file = os.path.join(data_dir, 'Status.xml')
572     reports_dir = os.path.join(data_dir, 'reports')
573     reports_dest = os.path.join(run_dirname, 'flowcell-reports.tar.bz2')
574     if os.path.exists(reports_dir):
575         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
576         if os.path.exists(status_file):
577             cmd_list.extend(['Status.xml', 'Status.xsl'])
578         LOGGER.info("Saving reports from " + reports_dir)
579         cwd = os.getcwd()
580         os.chdir(data_dir)
581         q = QueueCommands([" ".join(cmd_list)])
582         q.run()
583         os.chdir(cwd)
584
585
586 def save_summary_file(pipeline, run_dirname):
587     # Copy Summary.htm
588     gerald_object = pipeline.gerald
589     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
590     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
591     if os.path.exists(gerald_summary):
592         LOGGER.info('Copying %s to %s' % (gerald_summary, run_dirname))
593         shutil.copy(gerald_summary, run_dirname)
594     elif os.path.exists(status_files_summary):
595         LOGGER.info('Copying %s to %s' % (status_files_summary, run_dirname))
596         shutil.copy(status_files_summary, run_dirname)
597     else:
598         LOGGER.info('Summary file %s was not found' % (summary_path,))
599
600 def save_ivc_plot(bustard_object, run_dirname):
601     """
602     Save the IVC page and its supporting images
603     """
604     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
605     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
606     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
607
608     plot_target_path = os.path.join(run_dirname, 'Plots')
609
610     if os.path.exists(plot_html):
611         LOGGER.debug("Saving %s" % (plot_html,))
612         LOGGER.debug("Saving %s" % (plot_images,))
613         shutil.copy(plot_html, run_dirname)
614         if not os.path.exists(plot_target_path):
615             os.mkdir(plot_target_path)
616         for plot_file in glob(plot_images):
617             shutil.copy(plot_file, plot_target_path)
618     else:
619         LOGGER.warning('Missing IVC.html file, not archiving')
620
621
622 def compress_score_files(bustard_object, run_dirname):
623     """
624     Compress score files into our result directory
625     """
626     # check for g.pathname/Temp a new feature of 1.1rc1
627     scores_path = bustard_object.pathname
628     scores_path_temp = os.path.join(scores_path, 'Temp')
629     if os.path.isdir(scores_path_temp):
630         scores_path = scores_path_temp
631
632     # hopefully we have a directory that contains s_*_score files
633     score_files = []
634     for f in os.listdir(scores_path):
635         if re.match('.*_score.txt', f):
636             score_files.append(f)
637
638     tar_cmd = ['tar', 'c'] + score_files
639     bzip_cmd = [ 'bzip2', '-9', '-c' ]
640     tar_dest_name = os.path.join(run_dirname, 'scores.tar.bz2')
641     tar_dest = open(tar_dest_name, 'w')
642     LOGGER.info("Compressing score files from %s" % (scores_path,))
643     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
644     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
645     LOGGER.info("Writing to %s" % (tar_dest_name,))
646
647     env = {'BZIP': '-9'}
648     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
649                            cwd=scores_path)
650     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
651     tar.wait()
652
653
654 def compress_eland_results(gerald_object, run_dirname, num_jobs=1):
655     """
656     Compress eland result files into the archive directory
657     """
658     # copy & bzip eland files
659     bz_commands = []
660
661     for key in gerald_object.eland_results:
662         eland_lane = gerald_object.eland_results[key]
663         for source_name in eland_lane.pathnames:
664             if source_name is None:
665               LOGGER.info(
666                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
667             else:
668               path, name = os.path.split(source_name)
669               dest_name = os.path.join(run_dirname, name)
670               LOGGER.info("Saving eland file %s to %s" % \
671                          (source_name, dest_name))
672
673               if is_compressed(name):
674                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
675                 shutil.copy(source_name, dest_name)
676               else:
677                 # not compressed
678                 dest_name += '.bz2'
679                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
680                 bz_commands.append(" ".join(args))
681                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
682                 #bzip_dest = open(dest_name, 'w')
683                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
684                 #LOGGER.info('Saving to %s' % (dest_name, ))
685                 #bzip.wait()
686
687     if len(bz_commands) > 0:
688       q = QueueCommands(bz_commands, num_jobs)
689       q.run()
690
691
692 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
693     """
694     Iterate over runfolders in runs extracting the most useful information.
695       * run parameters (in run-*.xml)
696       * eland_result files
697       * score files
698       * Summary.htm
699       * srf files (raw sequence & qualities)
700     """
701     if output_base_dir is None:
702         output_base_dir = os.getcwd()
703
704     for r in runs:
705         result_dir = os.path.join(output_base_dir, r.flowcell_id)
706         LOGGER.info("Using %s as result directory" % (result_dir,))
707         if not os.path.exists(result_dir):
708             os.mkdir(result_dir)
709
710         # create directory to add this runs results to
711         LOGGER.info("Filling in %s" % (r.run_dirname,))
712         run_dirname = os.path.join(result_dir, r.run_dirname)
713         run_dirname = os.path.abspath(run_dirname)
714         if os.path.exists(run_dirname):
715             LOGGER.error("%s already exists, not overwriting" % (run_dirname,))
716             continue
717         else:
718             os.mkdir(run_dirname)
719
720         # save run file
721         r.save(run_dirname)
722
723         # save illumina flowcell status report
724         save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
725                               run_dirname)
726
727         # save stuff from bustard
728         # grab IVC plot
729         save_ivc_plot(r.bustard, run_dirname)
730
731         # build base call saving commands
732         if site is not None:
733             save_raw_data(num_jobs, r, site, raw_format, run_dirname)
734
735         # save stuff from GERALD
736         # copy stuff out of the main run
737         if r.gerald:
738             g = r.gerald
739
740             # save summary file
741             save_summary_file(r, run_dirname)
742
743             # compress eland result files
744             compress_eland_results(g, run_dirname, num_jobs)
745
746         # md5 all the compressed files once we're done
747         md5_commands = srf.make_md5_commands(run_dirname)
748         srf.run_commands(run_dirname, md5_commands, num_jobs)
749
750 def save_raw_data(num_jobs, r, site, raw_format, run_dirname):
751     lanes = []
752     if r.gerald:
753         for lane in r.gerald.lanes:
754             lane_parameters = r.gerald.lanes.get(lane, None)
755             if lane_parameters is not None:
756                 lanes.append(lane)
757     else:
758         # assume default list of lanes
759         lanes = LANE_SAMPLE_KEYS
760
761     run_name = srf.pathname_to_run_name(r.pathname)
762     seq_cmds = []
763     if raw_format is None:
764         raw_format = r.bustard.sequence_format
765
766     LOGGER.info("Raw Format is: %s" % (raw_format, ))
767     if raw_format == 'fastq':
768         LOGGER.info("Reading fastq files from %s", r.bustard.pathname)
769         rawpath = os.path.join(r.pathname, r.bustard.pathname)
770         LOGGER.info("raw data = %s" % (rawpath,))
771         srf.copy_hiseq_project_fastqs(run_name, rawpath, site, run_dirname)
772     elif raw_format == 'qseq':
773         seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, run_dirname)
774     elif raw_format == 'srf':
775         seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, run_dirname, 0)
776     else:
777         raise ValueError('Unknown --raw-format=%s' % (raw_format))
778     srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
779
780 def rm_list(files, dry_run=True):
781     for f in files:
782         if os.path.exists(f):
783             LOGGER.info('deleting %s' % (f,))
784             if not dry_run:
785                 if os.path.isdir(f):
786                     shutil.rmtree(f)
787                 else:
788                     os.unlink(f)
789         else:
790             LOGGER.warn("%s doesn't exist." % (f,))
791
792 def clean_runs(runs, dry_run=True):
793     """
794     Clean up run folders to optimize for compression.
795     """
796     if dry_run:
797         LOGGER.info('In dry-run mode')
798
799     for run in runs:
800         LOGGER.info('Cleaninging %s' % (run.pathname,))
801         # rm RunLog*.xml
802         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
803         rm_list(runlogs, dry_run)
804         # rm pipeline_*.txt
805         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
806         rm_list(pipeline_logs, dry_run)
807         # rm gclog.txt?
808         # rm NetCopy.log? Isn't this robocopy?
809         logs = glob(os.path.join(run.pathname, '*.log'))
810         rm_list(logs, dry_run)
811         # rm nfn.log?
812         # Calibration
813         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
814         rm_list(calibration_dir, dry_run)
815         # rm Images/L*
816         LOGGER.info("Cleaning images")
817         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
818         rm_list(image_dirs, dry_run)
819         # rm ReadPrep
820         LOGGER.info("Cleaning ReadPrep*")
821         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
822         rm_list(read_prep_dirs, dry_run)
823         # rm ReadPrep
824         LOGGER.info("Cleaning Thubmnail_images")
825         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
826         rm_list(thumbnail_dirs, dry_run)
827
828         # make clean_intermediate
829         logging.info("Cleaning intermediate files")
830         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
831             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
832                                              cwd=run.image_analysis.pathname,)
833             clean_process.wait()
834
835
836