Convert alphanum sort from comparitor to key operator
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """Core information needed to inspect a runfolder.
2 """
3 from glob import glob
4 import logging
5 import os
6 import re
7 import shutil
8 import stat
9 import subprocess
10 import sys
11 import tarfile
12 import time
13
14 LOGGER = logging.getLogger(__name__)
15
16 from htsworkflow.pipelines import firecrest
17 from htsworkflow.pipelines import ipar
18 from htsworkflow.pipelines import bustard
19 from htsworkflow.pipelines import gerald
20 from htsworkflow.pipelines import ElementTree, \
21                                   EUROPEAN_STRPTIME, EUROPEAN_DATE_RE, \
22                                   VERSION_RE, USER_RE, \
23                                   LANES_PER_FLOWCELL, LANE_LIST
24 from htsworkflow.pipelines.samplekey import LANE_SAMPLE_KEYS
25 from htsworkflow.util.ethelp import indent, flatten
26 from htsworkflow.util.queuecommands import QueueCommands
27
28 from htsworkflow.pipelines import srf
29
30 class PipelineRun(object):
31     """Capture "interesting" information about a pipeline run
32
33     :Variables:
34       - `pathname` location of the root of this runfolder
35       - `serialization_filename` read only property containing name of run xml file
36       - `flowcell_id` read-only property containing flowcell id (bar code)
37       - `datadir` location of the runfolder data dir.
38       - `image_analysis` generic name for Firecrest or IPAR image analysis
39       - `bustard` summary base caller
40       - `gerald` summary of sequence alignment and quality control metrics
41     """
42     XML_VERSION = 1
43     PIPELINE_RUN = 'PipelineRun'
44     FLOWCELL_ID = 'FlowcellID'
45
46     def __init__(self, pathname=None, flowcell_id=None, xml=None):
47         """Initialize a PipelineRun object
48
49         :Parameters:
50           - `pathname` the root directory of this run folder.
51           - `flowcell_id` the flowcell ID in case it can't be determined
52           - `xml` Allows initializing an object from a serialized xml file.
53
54         :Types:
55           - `pathname` str
56           - `flowcell_id` str
57           - `ElementTree` str
58         """
59         if pathname is not None:
60           self.pathname = os.path.normpath(pathname)
61         else:
62           self.pathname = None
63         self._name = None
64         self._flowcell_id = flowcell_id
65         self.datadir = None
66         self.suffix = None
67         self.image_analysis = None
68         self.bustard = None
69         self.gerald = None
70
71         if xml is not None:
72           self.set_elements(xml)
73
74     def _get_flowcell_id(self):
75         """Return the flowcell ID
76
77         Attempts to find the flowcell ID through several mechanisms.
78         """
79         # extract flowcell ID
80         if self._flowcell_id is None:
81             self._flowcell_id = self._get_flowcell_id_from_runinfo()
82         if self._flowcell_id is None:
83             self._flowcell_id = self._get_flowcell_id_from_flowcellid()
84         if self._flowcell_id is None:
85             self._flowcell_id = self._get_flowcell_id_from_path()
86         if self._flowcell_id is None:
87             self._flowcell_id = 'unknown'
88
89             LOGGER.warning(
90                 "Flowcell id was not found, guessing %s" % (
91                     self._flowcell_id))
92
93         return self._flowcell_id
94     flowcell_id = property(_get_flowcell_id)
95
96     def _get_flowcell_id_from_flowcellid(self):
97         """Extract flowcell id from a Config/FlowcellId.xml file
98
99         :return: flowcell_id or None if not found
100         """
101         config_dir = os.path.join(self.pathname, 'Config')
102         flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
103         if os.path.exists(flowcell_id_path):
104             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
105             return flowcell_id_tree.findtext('Text')
106
107     def _get_flowcell_id_from_runinfo(self):
108         """Read RunInfo file for flowcell id
109
110         :return: flowcell_id or None if not found
111         """
112         runinfo = os.path.join(self.pathname, 'RunInfo.xml')
113         if os.path.exists(runinfo):
114             tree = ElementTree.parse(runinfo)
115             root = tree.getroot()
116             fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
117             if len(fc_nodes) == 1:
118                 return fc_nodes[0].text
119
120     def _get_flowcell_id_from_path(self):
121         """Guess a flowcell name from the path
122
123         :return: flowcell_id or None if not found
124         """
125         path_fields = self.pathname.split('_')
126         if len(path_fields) > 0:
127             # guessing last element of filename
128             return path_fields[-1]
129
130     def _get_runfolder_name(self):
131         if self.gerald:
132             return self.gerald.runfolder_name
133         elif hasattr(self.image_analysis, 'runfolder_name'):
134             return self.image_analysis.runfolder_name
135         else:
136             return None
137     runfolder_name = property(_get_runfolder_name)
138
139     def _get_run_dirname(self):
140         """Return name of directory to hold result files from one analysis
141
142         For pre-multiplexing runs this is just the cycle range C1-123
143         For post-multiplexing runs the "suffix" that we add to
144         differentiate runs will be added to the range.
145         E.g. Unaligned_6mm may produce C1-200_6mm
146         """
147         if self.image_analysis is None:
148             raise ValueError("Not initialized yet")
149         start = self.image_analysis.start
150         stop = self.image_analysis.stop
151         cycle_fragment = "C%d-%d" % (start, stop)
152         if self.suffix:
153             cycle_fragment += self.suffix
154
155         return cycle_fragment
156     run_dirname = property(_get_run_dirname)
157
158     def get_elements(self):
159         """make one master xml file from all of our sub-components.
160
161         :return: an ElementTree containing all available pipeline
162                  run xml compoents.
163         """
164         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
165         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
166         flowcell.text = self.flowcell_id
167         root.append(self.image_analysis.get_elements())
168         root.append(self.bustard.get_elements())
169         if self.gerald:
170             root.append(self.gerald.get_elements())
171         return root
172
173     def set_elements(self, tree):
174         """Initialize a PipelineRun object from an run.xml ElementTree.
175
176         :param tree: parsed ElementTree
177         :type tree: ElementTree
178         """
179         tag = tree.tag.lower()
180         if tag != PipelineRun.PIPELINE_RUN.lower():
181           raise ValueError('Pipeline Run Expecting %s got %s' % (
182               PipelineRun.PIPELINE_RUN, tag))
183         for element in tree:
184           tag = element.tag.lower()
185           if tag == PipelineRun.FLOWCELL_ID.lower():
186             self._flowcell_id = element.text
187           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
188           # you should only have Firecrest or IPAR, never both of them.
189           elif tag == firecrest.Firecrest.FIRECREST.lower():
190             self.image_analysis = firecrest.Firecrest(xml=element)
191           elif tag == ipar.IPAR.IPAR.lower():
192             self.image_analysis = ipar.IPAR(xml=element)
193           elif tag == bustard.Bustard.BUSTARD.lower():
194             self.bustard = bustard.Bustard(xml=element)
195           elif tag == gerald.Gerald.GERALD.lower():
196             self.gerald = gerald.Gerald(xml=element)
197           elif tag == gerald.CASAVA.GERALD.lower():
198             self.gerald = gerald.CASAVA(xml=element)
199           else:
200             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
201
202     def _get_serialization_filename(self):
203         """Compute the filename for the run xml file
204
205         Attempts to find the latest date from all of the run
206         components.
207
208         :return: filename run_{flowcell id}_{timestamp}.xml
209         :rtype: str
210         """
211         if self._name is None:
212           components = [self.image_analysis, self.bustard, self.gerald]
213           tmax = max([ c.time for c in components if c ])
214           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
215           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
216         return self._name
217     serialization_filename = property(_get_serialization_filename)
218
219     def save(self, destdir=None):
220         """Save a run xml file.
221
222         :param destdir: Directory name to save too, uses current directory
223                         if not specified.
224         :type destdir: str
225         """
226         if destdir is None:
227             destdir = ''
228         LOGGER.info("Saving run report " + self.serialization_filename)
229         xml = self.get_elements()
230         indent(xml)
231         dest_pathname = os.path.join(destdir, self.serialization_filename)
232         ElementTree.ElementTree(xml).write(dest_pathname)
233
234     def load(self, filename):
235         """Load a run xml into this object.
236
237         :Parameters:
238           - `filename` location of a run xml file
239
240         :Types:
241           - `filename` str
242         """
243         LOGGER.info("Loading run report from " + filename)
244         tree = ElementTree.parse(filename).getroot()
245         self.set_elements(tree)
246
247 def load_pipeline_run_xml(pathname):
248     """
249     Load and instantiate a Pipeline run from a run xml file
250
251     :Parameters:
252       - `pathname` location of an run xml file
253
254     :Returns: initialized PipelineRun object
255     """
256     tree = ElementTree.parse(pathname).getroot()
257     run = PipelineRun(xml=tree)
258     return run
259
260 def get_runs(runfolder, flowcell_id=None):
261     """Find all runs associated with a runfolder.
262
263     We end up with multiple analysis runs as we sometimes
264     need to try with different parameters. This attempts
265     to return a list of all the various runs.
266
267     For example if there are two different GERALD runs, this will
268     generate two different PipelineRun objects, that differ
269     in there gerald component.
270     """
271     datadir = os.path.join(runfolder, 'Data')
272
273     LOGGER.info('Searching for runs in ' + datadir)
274     runs = []
275     # scan for firecrest directories
276     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
277         LOGGER.info('Found firecrest in ' + datadir)
278         image_analysis = firecrest.firecrest(firecrest_pathname)
279         if image_analysis is None:
280             LOGGER.warn(
281                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
282             )
283         else:
284             scan_post_image_analysis(
285                 runs, runfolder, datadir, image_analysis, firecrest_pathname, flowcell_id
286             )
287     # scan for IPAR directories
288     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
289     # The Intensities directory from the RTA software looks a lot like IPAR
290     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
291     for ipar_pathname in ipar_dirs:
292         LOGGER.info('Found ipar directories in ' + datadir)
293         image_analysis = ipar.ipar(ipar_pathname)
294         if image_analysis is None:
295             LOGGER.warn(
296                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
297             )
298         else:
299             scan_post_image_analysis(
300                 runs, runfolder, datadir, image_analysis, ipar_pathname, flowcell_id
301             )
302
303     return runs
304
305 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
306                              pathname, flowcell_id):
307     added = build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id)
308     # If we're a multiplexed run, don't look for older run type.
309     if added > 0:
310         return
311
312     LOGGER.info("Looking for bustard directories in %s" % (pathname,))
313     bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
314     # RTA BaseCalls looks enough like Bustard.
315     bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
316     for bustard_pathname in bustard_dirs:
317         LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
318         b = bustard.bustard(bustard_pathname)
319         build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname,
320                           runfolder, flowcell_id)
321
322
323 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder,
324                       flowcell_id):
325     start = len(runs)
326     gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
327     LOGGER.info("Looking for gerald directories in %s" % (pathname,))
328     for gerald_pathname in glob(gerald_glob):
329         LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
330         try:
331             g = gerald.gerald(gerald_pathname)
332             p = PipelineRun(runfolder, flowcell_id)
333             p.datadir = datadir
334             p.image_analysis = image_analysis
335             p.bustard = b
336             p.gerald = g
337             runs.append(p)
338         except IOError as e:
339             LOGGER.error("Ignoring " + str(e))
340     return len(runs) - start
341
342
343 def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id):
344     start = len(runs)
345     aligned_glob = os.path.join(runfolder, 'Aligned*')
346     unaligned_glob = os.path.join(runfolder, 'Unaligned*')
347
348     aligned_paths = glob(aligned_glob)
349     unaligned_paths = glob(unaligned_glob)
350
351     matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths)
352     LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths))
353
354     for aligned, unaligned, suffix in matched_paths:
355         if unaligned is None:
356             LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned)
357             continue
358
359         try:
360             p = PipelineRun(runfolder, flowcell_id)
361             p.datadir = datadir
362             p.suffix = suffix
363             p.image_analysis = image_analysis
364             p.bustard = bustard.bustard(unaligned)
365             if aligned:
366                 p.gerald = gerald.gerald(aligned)
367             runs.append(p)
368         except (IOError, RuntimeError) as e:
369             LOGGER.error("Exception %s", str(e))
370             LOGGER.error("Skipping run in %s", flowcell_id)
371     return len(runs) - start
372
373 def hiseq_match_aligned_unaligned(aligned, unaligned):
374     """Match aligned and unaligned folders from seperate lists
375     """
376     unaligned_suffix_re = re.compile('Unaligned(?P<suffix>[\w]*)')
377
378     aligned_by_suffix = build_dir_dict_by_suffix('Aligned', aligned)
379     unaligned_by_suffix = build_dir_dict_by_suffix('Unaligned', unaligned)
380
381     keys = set(aligned_by_suffix.keys()).union(set(unaligned_by_suffix.keys()))
382
383     matches = []
384     for key in keys:
385         a = aligned_by_suffix.get(key)
386         u = unaligned_by_suffix.get(key)
387         matches.append((a, u, key))
388     return matches
389
390 def build_dir_dict_by_suffix(prefix, dirnames):
391     """Build a dictionary indexed by suffix of last directory name.
392
393     It assumes a constant prefix
394     """
395     regex = re.compile('%s(?P<suffix>[\w]*)' % (prefix,))
396
397     by_suffix = {}
398     for absname in dirnames:
399         basename = os.path.basename(absname)
400         match = regex.match(basename)
401         if match:
402             by_suffix[match.group('suffix')] = absname
403     return by_suffix
404
405 def get_specific_run(gerald_dir):
406     """
407     Given a gerald directory, construct a PipelineRun out of its parents
408
409     Basically this allows specifying a particular run instead of the previous
410     get_runs which scans a runfolder for various combinations of
411     firecrest/ipar/bustard/gerald runs.
412     """
413     from htsworkflow.pipelines import firecrest
414     from htsworkflow.pipelines import ipar
415     from htsworkflow.pipelines import bustard
416     from htsworkflow.pipelines import gerald
417
418     gerald_dir = os.path.expanduser(gerald_dir)
419     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
420     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
421
422     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
423
424     LOGGER.info('--- use-run detected options ---')
425     LOGGER.info('runfolder: %s' % (runfolder_dir,))
426     LOGGER.info('image_dir: %s' % (image_dir,))
427     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
428     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
429
430     # find our processed image dir
431     image_run = None
432     # split into parent, and leaf directory
433     # leaf directory should be an IPAR or firecrest directory
434     data_dir, short_image_dir = os.path.split(image_dir)
435     LOGGER.info('data_dir: %s' % (data_dir,))
436     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
437
438     # guess which type of image processing directory we have by looking
439     # in the leaf directory name
440     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
441         image_run = firecrest.firecrest(image_dir)
442     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
443         image_run = ipar.ipar(image_dir)
444     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
445         image_run = ipar.ipar(image_dir)
446
447     # if we din't find a run, report the error and return
448     if image_run is None:
449         msg = '%s does not contain an image processing step' % (image_dir,)
450         LOGGER.error(msg)
451         return None
452
453     # find our base calling
454     base_calling_run = bustard.bustard(bustard_dir)
455     if base_calling_run is None:
456         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
457         return None
458
459     # find alignments
460     gerald_run = gerald.gerald(gerald_dir)
461     if gerald_run is None:
462         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
463         return None
464
465     p = PipelineRun(runfolder_dir)
466     p.image_analysis = image_run
467     p.bustard = base_calling_run
468     p.gerald = gerald_run
469
470     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
471     return p
472
473 def extract_run_parameters(runs):
474     """
475     Search through runfolder_path for various runs and grab their parameters
476     """
477     for run in runs:
478       run.save()
479
480 def summarize_mapped_reads(genome_map, mapped_reads):
481     """
482     Summarize per chromosome reads into a genome count
483     But handle spike-in/contamination symlinks seperately.
484     """
485     summarized_reads = {}
486     genome_reads = 0
487     genome = 'unknown'
488     for k, v in mapped_reads.items():
489         path, k = os.path.split(k)
490         if len(path) > 0 and path not in genome_map:
491             genome = path
492             genome_reads += v
493         else:
494             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
495     summarized_reads[genome] = genome_reads
496     return summarized_reads
497
498 def summarize_lane(gerald, lane_id):
499     report = []
500     lane_results = gerald.summary.lane_results
501     eland_result = gerald.eland_results[lane_id]
502     report.append("Sample name %s" % (eland_result.sample_name))
503     report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
504
505     if lane_id.read < len(lane_results) and \
506            lane_id.lane in lane_results[lane_id.read]:
507         summary_results = lane_results[lane_id.read][lane_id.lane]
508         cluster = summary_results.cluster
509         report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
510     report.append("Total Reads: %d" % (eland_result.reads))
511
512     if hasattr(eland_result, 'match_codes'):
513         mc = eland_result.match_codes
514         nm = mc['NM']
515         nm_percent = float(nm) / eland_result.reads * 100
516         qc = mc['QC']
517         qc_percent = float(qc) / eland_result.reads * 100
518
519         report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
520         report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
521         report.append('Unique (0,1,2 mismatches) %d %d %d' % \
522                       (mc['U0'], mc['U1'], mc['U2']))
523         report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
524                       (mc['R0'], mc['R1'], mc['R2']))
525
526     if hasattr(eland_result, 'genome_map'):
527         report.append("Mapped Reads")
528         mapped_reads = summarize_mapped_reads(eland_result.genome_map,
529                                               eland_result.mapped_reads)
530         for name, counts in mapped_reads.items():
531             report.append("  %s: %d" % (name, counts))
532
533         report.append('')
534     return report
535
536 def summary_report(runs):
537     """
538     Summarize cluster numbers and mapped read counts for a runfolder
539     """
540     report = []
541     eland_keys = []
542     for run in runs:
543         # print a run name?
544         report.append('Summary for %s' % (run.serialization_filename,))
545         # sort the report
546         if run.gerald:
547             eland_keys = sorted(run.gerald.eland_results.keys())
548         else:
549             report.append("Alignment not done, no report possible")
550
551     for lane_id in eland_keys:
552         report.extend(summarize_lane(run.gerald, lane_id))
553         report.append('---')
554         report.append('')
555     return os.linesep.join(report)
556
557 def is_compressed(filename):
558     if os.path.splitext(filename)[1] == ".gz":
559         return True
560     elif os.path.splitext(filename)[1] == '.bz2':
561         return True
562     else:
563         return False
564
565 def save_flowcell_reports(data_dir, run_dirname):
566     """
567     Save the flowcell quality reports
568     """
569     data_dir = os.path.abspath(data_dir)
570     status_file = os.path.join(data_dir, 'Status.xml')
571     reports_dir = os.path.join(data_dir, 'reports')
572     reports_dest = os.path.join(run_dirname, 'flowcell-reports.tar.bz2')
573     if os.path.exists(reports_dir):
574         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
575         if os.path.exists(status_file):
576             cmd_list.extend(['Status.xml', 'Status.xsl'])
577         LOGGER.info("Saving reports from " + reports_dir)
578         cwd = os.getcwd()
579         os.chdir(data_dir)
580         q = QueueCommands([" ".join(cmd_list)])
581         q.run()
582         os.chdir(cwd)
583
584
585 def save_summary_file(pipeline, run_dirname):
586     # Copy Summary.htm
587     gerald_object = pipeline.gerald
588     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
589     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
590     if os.path.exists(gerald_summary):
591         LOGGER.info('Copying %s to %s' % (gerald_summary, run_dirname))
592         shutil.copy(gerald_summary, run_dirname)
593     elif os.path.exists(status_files_summary):
594         LOGGER.info('Copying %s to %s' % (status_files_summary, run_dirname))
595         shutil.copy(status_files_summary, run_dirname)
596     else:
597         LOGGER.info('Summary file %s was not found' % (summary_path,))
598
599 def save_ivc_plot(bustard_object, run_dirname):
600     """
601     Save the IVC page and its supporting images
602     """
603     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
604     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
605     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
606
607     plot_target_path = os.path.join(run_dirname, 'Plots')
608
609     if os.path.exists(plot_html):
610         LOGGER.debug("Saving %s" % (plot_html,))
611         LOGGER.debug("Saving %s" % (plot_images,))
612         shutil.copy(plot_html, run_dirname)
613         if not os.path.exists(plot_target_path):
614             os.mkdir(plot_target_path)
615         for plot_file in glob(plot_images):
616             shutil.copy(plot_file, plot_target_path)
617     else:
618         LOGGER.warning('Missing IVC.html file, not archiving')
619
620
621 def compress_score_files(bustard_object, run_dirname):
622     """
623     Compress score files into our result directory
624     """
625     # check for g.pathname/Temp a new feature of 1.1rc1
626     scores_path = bustard_object.pathname
627     scores_path_temp = os.path.join(scores_path, 'Temp')
628     if os.path.isdir(scores_path_temp):
629         scores_path = scores_path_temp
630
631     # hopefully we have a directory that contains s_*_score files
632     score_files = []
633     for f in os.listdir(scores_path):
634         if re.match('.*_score.txt', f):
635             score_files.append(f)
636
637     tar_cmd = ['tar', 'c'] + score_files
638     bzip_cmd = [ 'bzip2', '-9', '-c' ]
639     tar_dest_name = os.path.join(run_dirname, 'scores.tar.bz2')
640     tar_dest = open(tar_dest_name, 'w')
641     LOGGER.info("Compressing score files from %s" % (scores_path,))
642     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
643     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
644     LOGGER.info("Writing to %s" % (tar_dest_name,))
645
646     env = {'BZIP': '-9'}
647     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
648                            cwd=scores_path)
649     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
650     tar.wait()
651
652
653 def compress_eland_results(gerald_object, run_dirname, num_jobs=1):
654     """
655     Compress eland result files into the archive directory
656     """
657     # copy & bzip eland files
658     bz_commands = []
659
660     for key in gerald_object.eland_results:
661         eland_lane = gerald_object.eland_results[key]
662         for source_name in eland_lane.pathnames:
663             if source_name is None:
664               LOGGER.info(
665                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
666             else:
667               path, name = os.path.split(source_name)
668               dest_name = os.path.join(run_dirname, name)
669               LOGGER.info("Saving eland file %s to %s" % \
670                          (source_name, dest_name))
671
672               if is_compressed(name):
673                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
674                 shutil.copy(source_name, dest_name)
675               else:
676                 # not compressed
677                 dest_name += '.bz2'
678                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
679                 bz_commands.append(" ".join(args))
680                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
681                 #bzip_dest = open(dest_name, 'w')
682                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
683                 #LOGGER.info('Saving to %s' % (dest_name, ))
684                 #bzip.wait()
685
686     if len(bz_commands) > 0:
687       q = QueueCommands(bz_commands, num_jobs)
688       q.run()
689
690
691 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
692     """
693     Iterate over runfolders in runs extracting the most useful information.
694       * run parameters (in run-*.xml)
695       * eland_result files
696       * score files
697       * Summary.htm
698       * srf files (raw sequence & qualities)
699     """
700     if output_base_dir is None:
701         output_base_dir = os.getcwd()
702
703     for r in runs:
704         result_dir = os.path.join(output_base_dir, r.flowcell_id)
705         LOGGER.info("Using %s as result directory" % (result_dir,))
706         if not os.path.exists(result_dir):
707             os.mkdir(result_dir)
708
709         # create directory to add this runs results to
710         LOGGER.info("Filling in %s" % (r.run_dirname,))
711         run_dirname = os.path.join(result_dir, r.run_dirname)
712         run_dirname = os.path.abspath(run_dirname)
713         if os.path.exists(run_dirname):
714             LOGGER.error("%s already exists, not overwriting" % (run_dirname,))
715             continue
716         else:
717             os.mkdir(run_dirname)
718
719         # save run file
720         r.save(run_dirname)
721
722         # save illumina flowcell status report
723         save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
724                               run_dirname)
725
726         # save stuff from bustard
727         # grab IVC plot
728         save_ivc_plot(r.bustard, run_dirname)
729
730         # build base call saving commands
731         if site is not None:
732             save_raw_data(num_jobs, r, site, raw_format, run_dirname)
733
734         # save stuff from GERALD
735         # copy stuff out of the main run
736         if r.gerald:
737             g = r.gerald
738
739             # save summary file
740             save_summary_file(r, run_dirname)
741
742             # compress eland result files
743             compress_eland_results(g, run_dirname, num_jobs)
744
745         # md5 all the compressed files once we're done
746         md5_commands = srf.make_md5_commands(run_dirname)
747         srf.run_commands(run_dirname, md5_commands, num_jobs)
748
749 def save_raw_data(num_jobs, r, site, raw_format, run_dirname):
750     lanes = []
751     if r.gerald:
752         for lane in r.gerald.lanes:
753             lane_parameters = r.gerald.lanes.get(lane, None)
754             if lane_parameters is not None:
755                 lanes.append(lane)
756     else:
757         # assume default list of lanes
758         lanes = LANE_SAMPLE_KEYS
759
760     run_name = srf.pathname_to_run_name(r.pathname)
761     seq_cmds = []
762     if raw_format is None:
763         raw_format = r.bustard.sequence_format
764
765     LOGGER.info("Raw Format is: %s" % (raw_format, ))
766     if raw_format == 'fastq':
767         LOGGER.info("Reading fastq files from %s", r.bustard.pathname)
768         rawpath = os.path.join(r.pathname, r.bustard.pathname)
769         LOGGER.info("raw data = %s" % (rawpath,))
770         srf.copy_hiseq_project_fastqs(run_name, rawpath, site, run_dirname)
771     elif raw_format == 'qseq':
772         seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, run_dirname)
773     elif raw_format == 'srf':
774         seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, run_dirname, 0)
775     else:
776         raise ValueError('Unknown --raw-format=%s' % (raw_format))
777     srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
778
779 def rm_list(files, dry_run=True):
780     for f in files:
781         if os.path.exists(f):
782             LOGGER.info('deleting %s' % (f,))
783             if not dry_run:
784                 if os.path.isdir(f):
785                     shutil.rmtree(f)
786                 else:
787                     os.unlink(f)
788         else:
789             LOGGER.warn("%s doesn't exist." % (f,))
790
791 def clean_runs(runs, dry_run=True):
792     """
793     Clean up run folders to optimize for compression.
794     """
795     if dry_run:
796         LOGGER.info('In dry-run mode')
797
798     for run in runs:
799         LOGGER.info('Cleaninging %s' % (run.pathname,))
800         # rm RunLog*.xml
801         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
802         rm_list(runlogs, dry_run)
803         # rm pipeline_*.txt
804         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
805         rm_list(pipeline_logs, dry_run)
806         # rm gclog.txt?
807         # rm NetCopy.log? Isn't this robocopy?
808         logs = glob(os.path.join(run.pathname, '*.log'))
809         rm_list(logs, dry_run)
810         # rm nfn.log?
811         # Calibration
812         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
813         rm_list(calibration_dir, dry_run)
814         # rm Images/L*
815         LOGGER.info("Cleaning images")
816         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
817         rm_list(image_dirs, dry_run)
818         # rm ReadPrep
819         LOGGER.info("Cleaning ReadPrep*")
820         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
821         rm_list(read_prep_dirs, dry_run)
822         # rm ReadPrep
823         LOGGER.info("Cleaning Thubmnail_images")
824         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
825         rm_list(thumbnail_dirs, dry_run)
826
827         # make clean_intermediate
828         logging.info("Cleaning intermediate files")
829         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
830             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
831                                              cwd=run.image_analysis.pathname,)
832             clean_process.wait()