5d1e16bce460da443da7bebea56220e388770c88
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """Core information needed to inspect a runfolder.
2 """
3 from glob import glob
4 import logging
5 import os
6 import re
7 import shutil
8 import stat
9 import subprocess
10 import sys
11 import tarfile
12 import time
13
14 LOGGER = logging.getLogger(__name__)
15
16 from htsworkflow.pipelines import firecrest
17 from htsworkflow.pipelines import ipar
18 from htsworkflow.pipelines import bustard
19 from htsworkflow.pipelines import gerald
20 from htsworkflow.pipelines import ElementTree, \
21                                   EUROPEAN_STRPTIME, EUROPEAN_DATE_RE, \
22                                   VERSION_RE, USER_RE, \
23                                   LANES_PER_FLOWCELL, LANE_LIST
24 from htsworkflow.util.alphanum import alphanum
25 from htsworkflow.util.ethelp import indent, flatten
26 from htsworkflow.util.queuecommands import QueueCommands
27
28 from htsworkflow.pipelines import srf
29
30 class PipelineRun(object):
31     """Capture "interesting" information about a pipeline run
32     
33     :Variables:
34       - `pathname` location of the root of this runfolder
35       - `serialization_filename` read only property containing name of run xml file
36       - `flowcell_id` read-only property containing flowcell id (bar code)
37       - `datadir` location of the runfolder data dir.
38       - `image_analysis` generic name for Firecrest or IPAR image analysis
39       - `bustard` summary base caller
40       - `gerald` summary of sequence alignment and quality control metrics
41     """
42     XML_VERSION = 1
43     PIPELINE_RUN = 'PipelineRun'
44     FLOWCELL_ID = 'FlowcellID'
45
46     def __init__(self, pathname=None, flowcell_id=None, xml=None):
47         """Initialize a PipelineRun object
48         
49         :Parameters:
50           - `pathname` the root directory of this run folder.
51           - `flowcell_id` the flowcell ID in case it can't be determined
52           - `xml` Allows initializing an object from a serialized xml file.
53           
54         :Types:
55           - `pathname` str
56           - `flowcell_id` str
57           - `ElementTree` str
58         """
59         if pathname is not None:
60           self.pathname = os.path.normpath(pathname)
61         else:
62           self.pathname = None
63         self._name = None
64         self._flowcell_id = flowcell_id
65         self.datadir = None
66         self.suffix = None
67         self.image_analysis = None
68         self.bustard = None
69         self.gerald = None
70
71         if xml is not None:
72           self.set_elements(xml)
73
74     def _get_flowcell_id(self):
75         """Return the flowcell ID
76         
77         Attempts to find the flowcell ID through several mechanisms.
78         """
79         # extract flowcell ID
80         if self._flowcell_id is None:
81             self._flowcell_id = self._get_flowcell_id_from_runinfo()
82         if self._flowcell_id is None:
83             self._flowcell_id = self._get_flowcell_id_from_flowcellid()
84         if self._flowcell_id is None:
85             self._flowcell_id = self._get_flowcell_id_from_path()
86         if self._flowcell_id is None:
87             self._flowcell_id = 'unknown'
88
89             LOGGER.warning(
90                 "Flowcell id was not found, guessing %s" % (
91                     self._flowcell_id))
92
93         return self._flowcell_id
94     flowcell_id = property(_get_flowcell_id)
95
96     def _get_flowcell_id_from_flowcellid(self):
97         """Extract flowcell id from a Config/FlowcellId.xml file
98         
99         :return: flowcell_id or None if not found
100         """
101         config_dir = os.path.join(self.pathname, 'Config')
102         flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
103         if os.path.exists(flowcell_id_path):
104             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
105             return flowcell_id_tree.findtext('Text')
106
107     def _get_flowcell_id_from_runinfo(self):
108         """Read RunInfo file for flowcell id
109
110         :return: flowcell_id or None if not found
111         """
112         runinfo = os.path.join(self.pathname, 'RunInfo.xml')
113         if os.path.exists(runinfo):
114             tree = ElementTree.parse(runinfo)
115             root = tree.getroot()
116             fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
117             if len(fc_nodes) == 1:
118                 return fc_nodes[0].text
119
120     def _get_flowcell_id_from_path(self):
121         """Guess a flowcell name from the path
122
123         :return: flowcell_id or None if not found
124         """
125         path_fields = self.pathname.split('_')
126         if len(path_fields) > 0:
127             # guessing last element of filename
128             return path_fields[-1]
129
130     def _get_runfolder_name(self):
131         if self.gerald is None:
132             return None
133         else:
134             return self.gerald.runfolder_name
135     runfolder_name = property(_get_runfolder_name)
136
137     def _get_run_dirname(self):
138         """Return name of directory to hold result files from one analysis
139         
140         For pre-multiplexing runs this is just the cycle range C1-123
141         For post-multiplexing runs the "suffix" that we add to 
142         differentiate runs will be added to the range.
143         E.g. Unaligned_6mm may produce C1-200_6mm
144         """
145         if self.image_analysis is None:
146             raise ValueError("Not initialized yet")
147         start = self.image_analysis.start
148         stop = self.image_analysis.stop
149         cycle_fragment = "C%d-%d" % (start, stop)
150         if self.suffix:
151             cycle_fragment += self.suffix
152
153         return cycle_fragment
154     run_dirname = property(_get_run_dirname)
155
156     def get_elements(self):
157         """make one master xml file from all of our sub-components.
158         
159         :return: an ElementTree containing all available pipeline
160                  run xml compoents.
161         """
162         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
163         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
164         flowcell.text = self.flowcell_id
165         root.append(self.image_analysis.get_elements())
166         root.append(self.bustard.get_elements())
167         if self.gerald:
168             root.append(self.gerald.get_elements())
169         return root
170
171     def set_elements(self, tree):
172         """Initialize a PipelineRun object from an run.xml ElementTree.
173
174         :param tree: parsed ElementTree
175         :type tree: ElementTree
176         """
177         tag = tree.tag.lower()
178         if tag != PipelineRun.PIPELINE_RUN.lower():
179           raise ValueError('Pipeline Run Expecting %s got %s' % (
180               PipelineRun.PIPELINE_RUN, tag))
181         for element in tree:
182           tag = element.tag.lower()
183           if tag == PipelineRun.FLOWCELL_ID.lower():
184             self._flowcell_id = element.text
185           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
186           # you should only have Firecrest or IPAR, never both of them.
187           elif tag == firecrest.Firecrest.FIRECREST.lower():
188             self.image_analysis = firecrest.Firecrest(xml=element)
189           elif tag == ipar.IPAR.IPAR.lower():
190             self.image_analysis = ipar.IPAR(xml=element)
191           elif tag == bustard.Bustard.BUSTARD.lower():
192             self.bustard = bustard.Bustard(xml=element)
193           elif tag == gerald.Gerald.GERALD.lower():
194             self.gerald = gerald.Gerald(xml=element)
195           elif tag == gerald.CASAVA.GERALD.lower():
196             self.gerald = gerald.CASAVA(xml=element)
197           else:
198             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
199
200     def _get_serialization_filename(self):
201         """Compute the filename for the run xml file
202         
203         Attempts to find the latest date from all of the run 
204         components.
205         
206         :return: filename run_{flowcell id}_{timestamp}.xml
207         :rtype: str
208         """
209         if self._name is None:
210           components = [self.image_analysis, self.bustard, self.gerald]
211           tmax = max([ c.time for c in components if c ])
212           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
213           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
214         return self._name
215     serialization_filename = property(_get_serialization_filename)
216
217     def save(self, destdir=None):
218         """Save a run xml file.
219         
220         :param destdir: Directory name to save too, uses current directory
221                         if not specified.
222         :type destdir: str
223         """
224         if destdir is None:
225             destdir = ''
226         LOGGER.info("Saving run report " + self.serialization_filename)
227         xml = self.get_elements()
228         indent(xml)
229         dest_pathname = os.path.join(destdir, self.serialization_filename)
230         ElementTree.ElementTree(xml).write(dest_pathname)
231
232     def load(self, filename):
233         """Load a run xml into this object.
234         
235         :Parameters:
236           - `filename` location of a run xml file
237           
238         :Types:
239           - `filename` str
240         """
241         LOGGER.info("Loading run report from " + filename)
242         tree = ElementTree.parse(filename).getroot()
243         self.set_elements(tree)
244
245 def load_pipeline_run_xml(pathname):
246     """
247     Load and instantiate a Pipeline run from a run xml file
248
249     :Parameters:
250       - `pathname` location of an run xml file
251
252     :Returns: initialized PipelineRun object
253     """
254     tree = ElementTree.parse(pathname).getroot()
255     run = PipelineRun(xml=tree)
256     return run
257
258 def get_runs(runfolder, flowcell_id=None):
259     """Find all runs associated with a runfolder.
260     
261     We end up with multiple analysis runs as we sometimes
262     need to try with different parameters. This attempts
263     to return a list of all the various runs.
264
265     For example if there are two different GERALD runs, this will
266     generate two different PipelineRun objects, that differ
267     in there gerald component.
268     """
269     datadir = os.path.join(runfolder, 'Data')
270
271     LOGGER.info('Searching for runs in ' + datadir)
272     runs = []
273     # scan for firecrest directories
274     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
275         LOGGER.info('Found firecrest in ' + datadir)
276         image_analysis = firecrest.firecrest(firecrest_pathname)
277         if image_analysis is None:
278             LOGGER.warn(
279                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
280             )
281         else:
282             scan_post_image_analysis(
283                 runs, runfolder, datadir, image_analysis, firecrest_pathname, flowcell_id
284             )
285     # scan for IPAR directories
286     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
287     # The Intensities directory from the RTA software looks a lot like IPAR
288     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
289     for ipar_pathname in ipar_dirs:
290         LOGGER.info('Found ipar directories in ' + datadir)
291         image_analysis = ipar.ipar(ipar_pathname)
292         if image_analysis is None:
293             LOGGER.warn(
294                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
295             )
296         else:
297             scan_post_image_analysis(
298                 runs, runfolder, datadir, image_analysis, ipar_pathname, flowcell_id
299             )
300
301     return runs
302
303 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
304                              pathname, flowcell_id):
305     added = build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id)
306     # If we're a multiplexed run, don't look for older run type.
307     if added > 0:
308         return
309
310     LOGGER.info("Looking for bustard directories in %s" % (pathname,))
311     bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
312     # RTA BaseCalls looks enough like Bustard.
313     bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
314     for bustard_pathname in bustard_dirs:
315         LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
316         b = bustard.bustard(bustard_pathname)
317         build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname,
318                           runfolder, flowcell_id)
319
320
321 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder,
322                       flowcell_id):
323     start = len(runs)
324     gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
325     LOGGER.info("Looking for gerald directories in %s" % (pathname,))
326     for gerald_pathname in glob(gerald_glob):
327         LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
328         try:
329             g = gerald.gerald(gerald_pathname)
330             p = PipelineRun(runfolder, flowcell_id)
331             p.datadir = datadir
332             p.image_analysis = image_analysis
333             p.bustard = b
334             p.gerald = g
335             runs.append(p)
336         except IOError, e:
337             LOGGER.error("Ignoring " + str(e))
338     return len(runs) - start
339
340
341 def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id):
342     start = len(runs)
343     aligned_glob = os.path.join(runfolder, 'Aligned*')
344     unaligned_glob = os.path.join(runfolder, 'Unaligned*')
345
346     aligned_paths = glob(aligned_glob)
347     unaligned_paths = glob(unaligned_glob)
348
349     matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths)
350     LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths))
351
352     for aligned, unaligned, suffix in matched_paths:
353         if unaligned is None:
354             LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned)
355             continue
356
357         try:
358             p = PipelineRun(runfolder, flowcell_id)
359             p.datadir = datadir
360             p.suffix = suffix
361             p.image_analysis = image_analysis
362             p.bustard = bustard.bustard(unaligned)
363             assert p.bustard
364             if aligned:
365                 p.gerald = gerald.gerald(aligned)
366             runs.append(p)
367         except IOError, e:
368             LOGGER.error("Ignoring " + str(e))
369     return len(runs) - start
370
371 def hiseq_match_aligned_unaligned(aligned, unaligned):
372     """Match aligned and unaligned folders from seperate lists
373     """
374     unaligned_suffix_re = re.compile('Unaligned(?P<suffix>[\w]*)')
375
376     aligned_by_suffix = build_dir_dict_by_suffix('Aligned', aligned)
377     unaligned_by_suffix = build_dir_dict_by_suffix('Unaligned', unaligned)
378
379     keys = set(aligned_by_suffix.keys()).union(set(unaligned_by_suffix.keys()))
380
381     matches = []
382     for key in keys:
383         a = aligned_by_suffix.get(key)
384         u = unaligned_by_suffix.get(key)
385         matches.append((a, u, key))
386     return matches
387
388 def build_dir_dict_by_suffix(prefix, dirnames):
389     """Build a dictionary indexed by suffix of last directory name.
390
391     It assumes a constant prefix
392     """
393     regex = re.compile('%s(?P<suffix>[\w]*)' % (prefix,))
394
395     by_suffix = {}
396     for absname in dirnames:
397         basename = os.path.basename(absname)
398         match = regex.match(basename)
399         if match:
400             by_suffix[match.group('suffix')] = absname
401     return by_suffix
402
403 def get_specific_run(gerald_dir):
404     """
405     Given a gerald directory, construct a PipelineRun out of its parents
406
407     Basically this allows specifying a particular run instead of the previous
408     get_runs which scans a runfolder for various combinations of
409     firecrest/ipar/bustard/gerald runs.
410     """
411     from htsworkflow.pipelines import firecrest
412     from htsworkflow.pipelines import ipar
413     from htsworkflow.pipelines import bustard
414     from htsworkflow.pipelines import gerald
415
416     gerald_dir = os.path.expanduser(gerald_dir)
417     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
418     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
419
420     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
421
422     LOGGER.info('--- use-run detected options ---')
423     LOGGER.info('runfolder: %s' % (runfolder_dir,))
424     LOGGER.info('image_dir: %s' % (image_dir,))
425     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
426     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
427
428     # find our processed image dir
429     image_run = None
430     # split into parent, and leaf directory
431     # leaf directory should be an IPAR or firecrest directory
432     data_dir, short_image_dir = os.path.split(image_dir)
433     LOGGER.info('data_dir: %s' % (data_dir,))
434     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
435
436     # guess which type of image processing directory we have by looking
437     # in the leaf directory name
438     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
439         image_run = firecrest.firecrest(image_dir)
440     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
441         image_run = ipar.ipar(image_dir)
442     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
443         image_run = ipar.ipar(image_dir)
444
445     # if we din't find a run, report the error and return
446     if image_run is None:
447         msg = '%s does not contain an image processing step' % (image_dir,)
448         LOGGER.error(msg)
449         return None
450
451     # find our base calling
452     base_calling_run = bustard.bustard(bustard_dir)
453     if base_calling_run is None:
454         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
455         return None
456
457     # find alignments
458     gerald_run = gerald.gerald(gerald_dir)
459     if gerald_run is None:
460         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
461         return None
462
463     p = PipelineRun(runfolder_dir)
464     p.image_analysis = image_run
465     p.bustard = base_calling_run
466     p.gerald = gerald_run
467
468     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
469     return p
470
471 def extract_run_parameters(runs):
472     """
473     Search through runfolder_path for various runs and grab their parameters
474     """
475     for run in runs:
476       run.save()
477
478 def summarize_mapped_reads(genome_map, mapped_reads):
479     """
480     Summarize per chromosome reads into a genome count
481     But handle spike-in/contamination symlinks seperately.
482     """
483     summarized_reads = {}
484     genome_reads = 0
485     genome = 'unknown'
486     for k, v in mapped_reads.items():
487         path, k = os.path.split(k)
488         if len(path) > 0 and path not in genome_map:
489             genome = path
490             genome_reads += v
491         else:
492             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
493     summarized_reads[genome] = genome_reads
494     return summarized_reads
495
496 def summarize_lane(gerald, lane_id):
497     report = []
498     lane_results = gerald.summary.lane_results
499     eland_result = gerald.eland_results[lane_id]
500     report.append("Sample name %s" % (eland_result.sample_name))
501     report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
502
503     if lane_id.read < len(lane_results) and \
504            lane_id.lane in lane_results[lane_id.read]:
505         summary_results = lane_results[lane_id.read][lane_id.lane]
506         cluster = summary_results.cluster
507         report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
508     report.append("Total Reads: %d" % (eland_result.reads))
509
510     if hasattr(eland_result, 'match_codes'):
511         mc = eland_result.match_codes
512         nm = mc['NM']
513         nm_percent = float(nm) / eland_result.reads * 100
514         qc = mc['QC']
515         qc_percent = float(qc) / eland_result.reads * 100
516
517         report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
518         report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
519         report.append('Unique (0,1,2 mismatches) %d %d %d' % \
520                       (mc['U0'], mc['U1'], mc['U2']))
521         report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
522                       (mc['R0'], mc['R1'], mc['R2']))
523
524     if hasattr(eland_result, 'genome_map'):
525         report.append("Mapped Reads")
526         mapped_reads = summarize_mapped_reads(eland_result.genome_map,
527                                               eland_result.mapped_reads)
528         for name, counts in mapped_reads.items():
529             report.append("  %s: %d" % (name, counts))
530
531         report.append('')
532     return report
533
534 def summary_report(runs):
535     """
536     Summarize cluster numbers and mapped read counts for a runfolder
537     """
538     report = []
539     eland_keys = []
540     for run in runs:
541         # print a run name?
542         report.append('Summary for %s' % (run.serialization_filename,))
543         # sort the report
544         if run.gerald:
545             eland_keys = sorted(run.gerald.eland_results.keys())
546         else:
547             report.append("Alignment not done, no report possible")
548
549     for lane_id in eland_keys:
550         report.extend(summarize_lane(run.gerald, lane_id))
551         report.append('---')
552         report.append('')
553     return os.linesep.join(report)
554
555 def is_compressed(filename):
556     if os.path.splitext(filename)[1] == ".gz":
557         return True
558     elif os.path.splitext(filename)[1] == '.bz2':
559         return True
560     else:
561         return False
562
563 def save_flowcell_reports(data_dir, run_dirname):
564     """
565     Save the flowcell quality reports
566     """
567     data_dir = os.path.abspath(data_dir)
568     status_file = os.path.join(data_dir, 'Status.xml')
569     reports_dir = os.path.join(data_dir, 'reports')
570     reports_dest = os.path.join(run_dirname, 'flowcell-reports.tar.bz2')
571     if os.path.exists(reports_dir):
572         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
573         if os.path.exists(status_file):
574             cmd_list.extend(['Status.xml', 'Status.xsl'])
575         LOGGER.info("Saving reports from " + reports_dir)
576         cwd = os.getcwd()
577         os.chdir(data_dir)
578         q = QueueCommands([" ".join(cmd_list)])
579         q.run()
580         os.chdir(cwd)
581
582
583 def save_summary_file(pipeline, run_dirname):
584     # Copy Summary.htm
585     gerald_object = pipeline.gerald
586     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
587     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
588     if os.path.exists(gerald_summary):
589         LOGGER.info('Copying %s to %s' % (gerald_summary, run_dirname))
590         shutil.copy(gerald_summary, run_dirname)
591     elif os.path.exists(status_files_summary):
592         LOGGER.info('Copying %s to %s' % (status_files_summary, run_dirname))
593         shutil.copy(status_files_summary, run_dirname)
594     else:
595         LOGGER.info('Summary file %s was not found' % (summary_path,))
596
597 def save_ivc_plot(bustard_object, run_dirname):
598     """
599     Save the IVC page and its supporting images
600     """
601     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
602     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
603     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
604
605     plot_target_path = os.path.join(run_dirname, 'Plots')
606
607     if os.path.exists(plot_html):
608         LOGGER.debug("Saving %s" % (plot_html,))
609         LOGGER.debug("Saving %s" % (plot_images,))
610         shutil.copy(plot_html, run_dirname)
611         if not os.path.exists(plot_target_path):
612             os.mkdir(plot_target_path)
613         for plot_file in glob(plot_images):
614             shutil.copy(plot_file, plot_target_path)
615     else:
616         LOGGER.warning('Missing IVC.html file, not archiving')
617
618
619 def compress_score_files(bustard_object, run_dirname):
620     """
621     Compress score files into our result directory
622     """
623     # check for g.pathname/Temp a new feature of 1.1rc1
624     scores_path = bustard_object.pathname
625     scores_path_temp = os.path.join(scores_path, 'Temp')
626     if os.path.isdir(scores_path_temp):
627         scores_path = scores_path_temp
628
629     # hopefully we have a directory that contains s_*_score files
630     score_files = []
631     for f in os.listdir(scores_path):
632         if re.match('.*_score.txt', f):
633             score_files.append(f)
634
635     tar_cmd = ['tar', 'c'] + score_files
636     bzip_cmd = [ 'bzip2', '-9', '-c' ]
637     tar_dest_name = os.path.join(run_dirname, 'scores.tar.bz2')
638     tar_dest = open(tar_dest_name, 'w')
639     LOGGER.info("Compressing score files from %s" % (scores_path,))
640     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
641     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
642     LOGGER.info("Writing to %s" % (tar_dest_name,))
643
644     env = {'BZIP': '-9'}
645     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
646                            cwd=scores_path)
647     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
648     tar.wait()
649
650
651 def compress_eland_results(gerald_object, run_dirname, num_jobs=1):
652     """
653     Compress eland result files into the archive directory
654     """
655     # copy & bzip eland files
656     bz_commands = []
657
658     for key in gerald_object.eland_results:
659         eland_lane = gerald_object.eland_results[key]
660         for source_name in eland_lane.pathnames:
661             if source_name is None:
662               LOGGER.info(
663                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
664             else:
665               path, name = os.path.split(source_name)
666               dest_name = os.path.join(run_dirname, name)
667               LOGGER.info("Saving eland file %s to %s" % \
668                          (source_name, dest_name))
669
670               if is_compressed(name):
671                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
672                 shutil.copy(source_name, dest_name)
673               else:
674                 # not compressed
675                 dest_name += '.bz2'
676                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
677                 bz_commands.append(" ".join(args))
678                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
679                 #bzip_dest = open(dest_name, 'w')
680                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
681                 #LOGGER.info('Saving to %s' % (dest_name, ))
682                 #bzip.wait()
683
684     if len(bz_commands) > 0:
685       q = QueueCommands(bz_commands, num_jobs)
686       q.run()
687
688
689 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
690     """
691     Iterate over runfolders in runs extracting the most useful information.
692       * run parameters (in run-*.xml)
693       * eland_result files
694       * score files
695       * Summary.htm
696       * srf files (raw sequence & qualities)
697     """
698     if output_base_dir is None:
699         output_base_dir = os.getcwd()
700
701     for r in runs:
702         result_dir = os.path.join(output_base_dir, r.flowcell_id)
703         LOGGER.info("Using %s as result directory" % (result_dir,))
704         if not os.path.exists(result_dir):
705             os.mkdir(result_dir)
706
707         # create directory to add this runs results to
708         LOGGER.info("Filling in %s" % (r.run_dirname,))
709         run_dirname = os.path.join(result_dir, r.run_dirname)
710         run_dirname = os.path.abspath(run_dirname)
711         if os.path.exists(run_dirname):
712             LOGGER.error("%s already exists, not overwriting" % (run_dirname,))
713             continue
714         else:
715             os.mkdir(run_dirname)
716
717         # save run file
718         r.save(run_dirname)
719
720         # save illumina flowcell status report
721         save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
722                               run_dirname)
723
724         # save stuff from bustard
725         # grab IVC plot
726         save_ivc_plot(r.bustard, run_dirname)
727
728         # build base call saving commands
729         if site is not None:
730             save_raw_data(num_jobs, r, site, raw_format, run_dirname)
731
732         # save stuff from GERALD
733         # copy stuff out of the main run
734         if r.gerald:
735             g = r.gerald
736
737             # save summary file
738             save_summary_file(r, run_dirname)
739
740             # compress eland result files
741             compress_eland_results(g, run_dirname, num_jobs)
742
743         # md5 all the compressed files once we're done
744         md5_commands = srf.make_md5_commands(run_dirname)
745         srf.run_commands(run_dirname, md5_commands, num_jobs)
746
747 def save_raw_data(num_jobs, r, site, raw_format, run_dirname):
748     lanes = []
749     if r.gerald:
750         for lane in r.gerald.lanes:
751             lane_parameters = r.gerald.lanes.get(lane, None)
752             if lane_parameters is not None:
753                 lanes.append(lane)
754     else:
755         # assume default list of lanes
756         lanes = LANE_LIST
757
758     run_name = srf.pathname_to_run_name(r.pathname)
759     seq_cmds = []
760     if raw_format is None:
761         raw_format = r.bustard.sequence_format
762
763     LOGGER.info("Raw Format is: %s" % (raw_format, ))
764     if raw_format == 'fastq':
765         LOGGER.info("Reading fastq files from %s", r.bustard.pathname)
766         rawpath = os.path.join(r.pathname, r.bustard.pathname)
767         LOGGER.info("raw data = %s" % (rawpath,))
768         srf.copy_hiseq_project_fastqs(run_name, rawpath, site, run_dirname)
769     elif raw_format == 'qseq':
770         seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, run_dirname)
771     elif raw_format == 'srf':
772         seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, run_dirname, 0)
773     else:
774         raise ValueError('Unknown --raw-format=%s' % (raw_format))
775     srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
776
777 def rm_list(files, dry_run=True):
778     for f in files:
779         if os.path.exists(f):
780             LOGGER.info('deleting %s' % (f,))
781             if not dry_run:
782                 if os.path.isdir(f):
783                     shutil.rmtree(f)
784                 else:
785                     os.unlink(f)
786         else:
787             LOGGER.warn("%s doesn't exist." % (f,))
788
789 def clean_runs(runs, dry_run=True):
790     """
791     Clean up run folders to optimize for compression.
792     """
793     if dry_run:
794         LOGGER.info('In dry-run mode')
795
796     for run in runs:
797         LOGGER.info('Cleaninging %s' % (run.pathname,))
798         # rm RunLog*.xml
799         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
800         rm_list(runlogs, dry_run)
801         # rm pipeline_*.txt
802         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
803         rm_list(pipeline_logs, dry_run)
804         # rm gclog.txt?
805         # rm NetCopy.log? Isn't this robocopy?
806         logs = glob(os.path.join(run.pathname, '*.log'))
807         rm_list(logs, dry_run)
808         # rm nfn.log?
809         # Calibration
810         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
811         rm_list(calibration_dir, dry_run)
812         # rm Images/L*
813         LOGGER.info("Cleaning images")
814         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
815         rm_list(image_dirs, dry_run)
816         # rm ReadPrep
817         LOGGER.info("Cleaning ReadPrep*")
818         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
819         rm_list(read_prep_dirs, dry_run)
820         # rm ReadPrep
821         LOGGER.info("Cleaning Thubmnail_images")
822         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
823         rm_list(thumbnail_dirs, dry_run)
824
825         # make clean_intermediate
826         logging.info("Cleaning intermediate files")
827         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
828             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
829                                              cwd=run.image_analysis.pathname,)
830             clean_process.wait()
831
832
833