Start making documentation for htsworkflow.
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """Core information needed to inspect a runfolder.
2 """
3 from glob import glob
4 import logging
5 import os
6 import re
7 import shutil
8 import stat
9 import subprocess
10 import sys
11 import tarfile
12 import time
13
14 LOGGER = logging.getLogger(__name__)
15
16 from htsworkflow.pipelines import firecrest
17 from htsworkflow.pipelines import ipar
18 from htsworkflow.pipelines import bustard
19 from htsworkflow.pipelines import gerald
20 from htsworkflow.pipelines import ElementTree, \
21                                   EUROPEAN_STRPTIME, EUROPEAN_DATE_RE, \
22                                   VERSION_RE, USER_RE, \
23                                   LANES_PER_FLOWCELL, LANE_LIST
24 from htsworkflow.util.alphanum import alphanum
25 from htsworkflow.util.ethelp import indent, flatten
26 from htsworkflow.util.queuecommands import QueueCommands
27
28 from htsworkflow.pipelines import srf
29
30 class PipelineRun(object):
31     """Capture "interesting" information about a pipeline run
32     
33     :Variables:
34       - `pathname` location of the root of this runfolder
35       - `name` read only property containing name of run xml file
36       - `flowcell_id` read-only property containing flowcell id (bar code)
37       - `datadir` location of the runfolder data dir.
38       - `image_analysis` generic name for Firecrest or IPAR image analysis
39       - `bustard` summary base caller
40       - `gerald` summary of sequence alignment and quality control metrics
41     """
42     XML_VERSION = 1
43     PIPELINE_RUN = 'PipelineRun'
44     FLOWCELL_ID = 'FlowcellID'
45
46     def __init__(self, pathname=None, flowcell_id=None, xml=None):
47         """Initialize a PipelineRun object
48         
49         :Parameters:
50           - `pathname` the root directory of this run folder.
51           - `flowcell_id` the flowcell ID in case it can't be determined
52           - `xml` Allows initializing an object from a serialized xml file.
53           
54         :Types:
55           - `pathname` str
56           - `flowcell_id` str
57           - `ElementTree` str
58         """
59         if pathname is not None:
60           self.pathname = os.path.normpath(pathname)
61         else:
62           self.pathname = None
63         self._name = None
64         self._flowcell_id = flowcell_id
65         self.datadir = None
66         self.image_analysis = None
67         self.bustard = None
68         self.gerald = None
69
70         if xml is not None:
71           self.set_elements(xml)
72
73     def _get_flowcell_id(self):
74         """Return the flowcell ID
75         
76         Attempts to find the flowcell ID through several mechanisms.
77         """
78         # extract flowcell ID
79         if self._flowcell_id is None:
80             self._flowcell_id = self._get_flowcell_id_from_runinfo()
81         if self._flowcell_id is None:
82             self._flowcell_id = self._get_flowcell_id_from_flowcellid()
83         if self._flowcell_id is None:
84             self._flowcell_id = self._get_flowcell_id_from_path()
85         if self._flowcell_id is None:
86             self._flowcell_id = 'unknown'
87
88             LOGGER.warning(
89                 "Flowcell id was not found, guessing %s" % (
90                     self._flowcell_id))
91
92         return self._flowcell_id
93     flowcell_id = property(_get_flowcell_id)
94
95     def _get_flowcell_id_from_flowcellid(self):
96         """Extract flowcell id from a Config/FlowcellId.xml file
97         
98         :return: flowcell_id or None if not found
99         """
100         config_dir = os.path.join(self.pathname, 'Config')
101         flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
102         if os.path.exists(flowcell_id_path):
103             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
104             return flowcell_id_tree.findtext('Text')
105
106     def _get_flowcell_id_from_runinfo(self):
107         """Read RunInfo file for flowcell id
108
109         :return: flowcell_id or None if not found
110         """
111         runinfo = os.path.join(self.pathname, 'RunInfo.xml')
112         if os.path.exists(runinfo):
113             tree = ElementTree.parse(runinfo)
114             root = tree.getroot()
115             fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
116             if len(fc_nodes) == 1:
117                 return fc_nodes[0].text
118
119     def _get_flowcell_id_from_path(self):
120         """Guess a flowcell name from the path
121
122         :return: flowcell_id or None if not found
123         """
124         path_fields = self.pathname.split('_')
125         if len(path_fields) > 0:
126             # guessing last element of filename
127             return path_fields[-1]
128
129     def _get_runfolder_name(self):
130         if self.gerald is None:
131             return None
132         else:
133             return self.gerald.runfolder_name
134     runfolder_name = property(_get_runfolder_name)
135
136     def _get_run_id(self):
137         """Return a identifer for a run.
138         
139         For pre-multiplexing runs this is just the cycle range C1-123
140         For post-multiplexing runs the "suffix" that we add to 
141         differentiate runs will be added to the range.
142         E.g. Unaligned_6mm may produce C1-200_6mm
143         """
144         pass
145         
146     def get_elements(self):
147         """make one master xml file from all of our sub-components.
148         
149         :return: an ElementTree containing all available pipeline
150                  run xml compoents.
151         """
152         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
153         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
154         flowcell.text = self.flowcell_id
155         root.append(self.image_analysis.get_elements())
156         root.append(self.bustard.get_elements())
157         if self.gerald:
158             root.append(self.gerald.get_elements())
159         return root
160
161     def set_elements(self, tree):
162         """Initialize a PipelineRun object from an run.xml ElementTree.
163
164         :param tree: parsed ElementTree
165         :type tree: ElementTree
166         """
167         tag = tree.tag.lower()
168         if tag != PipelineRun.PIPELINE_RUN.lower():
169           raise ValueError('Pipeline Run Expecting %s got %s' % (
170               PipelineRun.PIPELINE_RUN, tag))
171         for element in tree:
172           tag = element.tag.lower()
173           if tag == PipelineRun.FLOWCELL_ID.lower():
174             self._flowcell_id = element.text
175           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
176           # you should only have Firecrest or IPAR, never both of them.
177           elif tag == firecrest.Firecrest.FIRECREST.lower():
178             self.image_analysis = firecrest.Firecrest(xml=element)
179           elif tag == ipar.IPAR.IPAR.lower():
180             self.image_analysis = ipar.IPAR(xml=element)
181           elif tag == bustard.Bustard.BUSTARD.lower():
182             self.bustard = bustard.Bustard(xml=element)
183           elif tag == gerald.Gerald.GERALD.lower():
184             self.gerald = gerald.Gerald(xml=element)
185           elif tag == gerald.CASAVA.GERALD.lower():
186             self.gerald = gerald.CASAVA(xml=element)
187           else:
188             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
189
190     def _get_run_name(self):
191         """Compute the run name for the run xml file
192         
193         Attempts to find the latest date from all of the run 
194         components.
195         
196         :return: run xml name
197         :rtype: str
198         """
199         if self._name is None:
200           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
201           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
202           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
203         return self._name
204     name = property(_get_run_name)
205
206     def save(self, destdir=None):
207         """Save a run xml file.
208         
209         :param destdir: Directory name to save too, uses current directory
210                         if not specified.
211         :type destdir: str
212         """
213         if destdir is None:
214             destdir = ''
215         LOGGER.info("Saving run report " + self.name)
216         xml = self.get_elements()
217         indent(xml)
218         dest_pathname = os.path.join(destdir, self.name)
219         ElementTree.ElementTree(xml).write(dest_pathname)
220
221     def load(self, filename):
222         """Load a run xml into this object.
223         
224         :Parameters:
225           - `filename` location of a run xml file
226           
227         :Types:
228           - `filename` str
229         """
230         LOGGER.info("Loading run report from " + filename)
231         tree = ElementTree.parse(filename).getroot()
232         self.set_elements(tree)
233
234 def load_pipeline_run_xml(pathname):
235     """
236     Load and instantiate a Pipeline run from a run xml file
237
238     :Parameters:
239       - `pathname` location of an run xml file
240
241     :Returns: initialized PipelineRun object
242     """
243     tree = ElementTree.parse(pathname).getroot()
244     run = PipelineRun(xml=tree)
245     return run
246
247 def get_runs(runfolder, flowcell_id=None):
248     """Find all runs associated with a runfolder.
249     
250     We end up with multiple analysis runs as we sometimes
251     need to try with different parameters. This attempts
252     to return a list of all the various runs.
253
254     For example if there are two different GERALD runs, this will
255     generate two different PipelineRun objects, that differ
256     in there gerald component.
257     """
258     datadir = os.path.join(runfolder, 'Data')
259
260     LOGGER.info('Searching for runs in ' + datadir)
261     runs = []
262     # scan for firecrest directories
263     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
264         LOGGER.info('Found firecrest in ' + datadir)
265         image_analysis = firecrest.firecrest(firecrest_pathname)
266         if image_analysis is None:
267             LOGGER.warn(
268                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
269             )
270         else:
271             scan_post_image_analysis(
272                 runs, runfolder, datadir, image_analysis, firecrest_pathname, flowcell_id
273             )
274     # scan for IPAR directories
275     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
276     # The Intensities directory from the RTA software looks a lot like IPAR
277     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
278     for ipar_pathname in ipar_dirs:
279         LOGGER.info('Found ipar directories in ' + datadir)
280         image_analysis = ipar.ipar(ipar_pathname)
281         if image_analysis is None:
282             LOGGER.warn(
283                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
284             )
285         else:
286             scan_post_image_analysis(
287                 runs, runfolder, datadir, image_analysis, ipar_pathname, flowcell_id
288             )
289
290     return runs
291
292 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
293                              pathname, flowcell_id):
294     added = build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id)
295     # If we're a multiplexed run, don't look for older run type.
296     if added > 0:
297         return
298
299     LOGGER.info("Looking for bustard directories in %s" % (pathname,))
300     bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
301     # RTA BaseCalls looks enough like Bustard.
302     bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
303     for bustard_pathname in bustard_dirs:
304         LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
305         b = bustard.bustard(bustard_pathname)
306         build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname,
307                           runfolder, flowcell_id)
308
309
310 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder,
311                       flowcell_id):
312     start = len(runs)
313     gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
314     LOGGER.info("Looking for gerald directories in %s" % (pathname,))
315     for gerald_pathname in glob(gerald_glob):
316         LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
317         try:
318             g = gerald.gerald(gerald_pathname)
319             p = PipelineRun(runfolder, flowcell_id)
320             p.datadir = datadir
321             p.image_analysis = image_analysis
322             p.bustard = b
323             p.gerald = g
324             runs.append(p)
325         except IOError, e:
326             LOGGER.error("Ignoring " + str(e))
327     return len(runs) - start
328
329
330 def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id):
331     start = len(runs)
332     aligned_glob = os.path.join(runfolder, 'Aligned*')
333     unaligned_glob = os.path.join(runfolder, 'Unaligned*')
334
335     aligned_paths = glob(aligned_glob)
336     unaligned_paths = glob(unaligned_glob)
337
338     matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths)
339     LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths))
340
341     for aligned, unaligned in matched_paths:
342         if unaligned is None:
343             LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned)
344             continue
345
346         print "scan for aligned then remove them from unaligned list"
347         try:
348             p = PipelineRun(runfolder, flowcell_id)
349             p.datadir = datadir
350             p.image_analysis = image_analysis
351             p.bustard = bustard.bustard(unaligned)
352             if aligned:
353                 p.gerald = gerald.gerald(aligned)
354             runs.append(p)
355         except IOError, e:
356             LOGGER.error("Ignoring " + str(e))
357     return len(runs) - start
358
359 def hiseq_match_aligned_unaligned(aligned, unaligned):
360     """Match aligned and unaligned folders from seperate lists
361     """
362     unaligned_suffix_re = re.compile('Unaligned(?P<suffix>[\w]*)')
363
364     aligned_by_suffix = build_dir_dict_by_suffix('Aligned', aligned)
365     unaligned_by_suffix = build_dir_dict_by_suffix('Unaligned', unaligned)
366
367     keys = set(aligned_by_suffix.keys()).union(set(unaligned_by_suffix.keys()))
368
369     matches = []
370     for key in keys:
371         a = aligned_by_suffix.get(key)
372         u = unaligned_by_suffix.get(key)
373         matches.append((a, u))
374     return matches
375
376 def build_dir_dict_by_suffix(prefix, dirnames):
377     """Build a dictionary indexed by suffix of last directory name.
378
379     It assumes a constant prefix
380     """
381     regex = re.compile('%s(?P<suffix>[\w]*)' % (prefix,))
382
383     by_suffix = {}
384     for absname in dirnames:
385         basename = os.path.basename(absname)
386         match = regex.match(basename)
387         if match:
388             by_suffix[match.group('suffix')] = absname
389     return by_suffix
390
391 def get_specific_run(gerald_dir):
392     """
393     Given a gerald directory, construct a PipelineRun out of its parents
394
395     Basically this allows specifying a particular run instead of the previous
396     get_runs which scans a runfolder for various combinations of
397     firecrest/ipar/bustard/gerald runs.
398     """
399     from htsworkflow.pipelines import firecrest
400     from htsworkflow.pipelines import ipar
401     from htsworkflow.pipelines import bustard
402     from htsworkflow.pipelines import gerald
403
404     gerald_dir = os.path.expanduser(gerald_dir)
405     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
406     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
407
408     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
409
410     LOGGER.info('--- use-run detected options ---')
411     LOGGER.info('runfolder: %s' % (runfolder_dir,))
412     LOGGER.info('image_dir: %s' % (image_dir,))
413     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
414     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
415
416     # find our processed image dir
417     image_run = None
418     # split into parent, and leaf directory
419     # leaf directory should be an IPAR or firecrest directory
420     data_dir, short_image_dir = os.path.split(image_dir)
421     LOGGER.info('data_dir: %s' % (data_dir,))
422     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
423
424     # guess which type of image processing directory we have by looking
425     # in the leaf directory name
426     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
427         image_run = firecrest.firecrest(image_dir)
428     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
429         image_run = ipar.ipar(image_dir)
430     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
431         image_run = ipar.ipar(image_dir)
432
433     # if we din't find a run, report the error and return
434     if image_run is None:
435         msg = '%s does not contain an image processing step' % (image_dir,)
436         LOGGER.error(msg)
437         return None
438
439     # find our base calling
440     base_calling_run = bustard.bustard(bustard_dir)
441     if base_calling_run is None:
442         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
443         return None
444
445     # find alignments
446     gerald_run = gerald.gerald(gerald_dir)
447     if gerald_run is None:
448         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
449         return None
450
451     p = PipelineRun(runfolder_dir)
452     p.image_analysis = image_run
453     p.bustard = base_calling_run
454     p.gerald = gerald_run
455
456     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
457     return p
458
459 def extract_run_parameters(runs):
460     """
461     Search through runfolder_path for various runs and grab their parameters
462     """
463     for run in runs:
464       run.save()
465
466 def summarize_mapped_reads(genome_map, mapped_reads):
467     """
468     Summarize per chromosome reads into a genome count
469     But handle spike-in/contamination symlinks seperately.
470     """
471     summarized_reads = {}
472     genome_reads = 0
473     genome = 'unknown'
474     for k, v in mapped_reads.items():
475         path, k = os.path.split(k)
476         if len(path) > 0 and path not in genome_map:
477             genome = path
478             genome_reads += v
479         else:
480             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
481     summarized_reads[genome] = genome_reads
482     return summarized_reads
483
484 def summarize_lane(gerald, lane_id):
485     report = []
486     lane_results = gerald.summary.lane_results
487     eland_result = gerald.eland_results[lane_id]
488     report.append("Sample name %s" % (eland_result.sample_name))
489     report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
490
491     if lane_id.read < len(lane_results) and \
492            lane_id.lane in lane_results[lane_id.read]:
493         summary_results = lane_results[lane_id.read][lane_id.lane]
494         cluster = summary_results.cluster
495         report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
496     report.append("Total Reads: %d" % (eland_result.reads))
497
498     if hasattr(eland_result, 'match_codes'):
499         mc = eland_result.match_codes
500         nm = mc['NM']
501         nm_percent = float(nm) / eland_result.reads * 100
502         qc = mc['QC']
503         qc_percent = float(qc) / eland_result.reads * 100
504
505         report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
506         report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
507         report.append('Unique (0,1,2 mismatches) %d %d %d' % \
508                       (mc['U0'], mc['U1'], mc['U2']))
509         report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
510                       (mc['R0'], mc['R1'], mc['R2']))
511
512     if hasattr(eland_result, 'genome_map'):
513         report.append("Mapped Reads")
514         mapped_reads = summarize_mapped_reads(eland_result.genome_map,
515                                               eland_result.mapped_reads)
516         for name, counts in mapped_reads.items():
517             report.append("  %s: %d" % (name, counts))
518
519         report.append('')
520     return report
521
522 def summary_report(runs):
523     """
524     Summarize cluster numbers and mapped read counts for a runfolder
525     """
526     report = []
527     for run in runs:
528         # print a run name?
529         report.append('Summary for %s' % (run.name,))
530         # sort the report
531         eland_keys = sorted(run.gerald.eland_results.keys())
532     for lane_id in eland_keys:
533         report.extend(summarize_lane(run.gerald, lane_id))
534         report.append('---')
535         report.append('')
536     return os.linesep.join(report)
537
538 def is_compressed(filename):
539     if os.path.splitext(filename)[1] == ".gz":
540         return True
541     elif os.path.splitext(filename)[1] == '.bz2':
542         return True
543     else:
544         return False
545
546 def save_flowcell_reports(data_dir, cycle_dir):
547     """
548     Save the flowcell quality reports
549     """
550     data_dir = os.path.abspath(data_dir)
551     status_file = os.path.join(data_dir, 'Status.xml')
552     reports_dir = os.path.join(data_dir, 'reports')
553     reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
554     if os.path.exists(reports_dir):
555         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
556         if os.path.exists(status_file):
557             cmd_list.extend(['Status.xml', 'Status.xsl'])
558         LOGGER.info("Saving reports from " + reports_dir)
559         cwd = os.getcwd()
560         os.chdir(data_dir)
561         q = QueueCommands([" ".join(cmd_list)])
562         q.run()
563         os.chdir(cwd)
564
565
566 def save_summary_file(pipeline, cycle_dir):
567     # Copy Summary.htm
568     gerald_object = pipeline.gerald
569     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
570     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
571     if os.path.exists(gerald_summary):
572         LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
573         shutil.copy(gerald_summary, cycle_dir)
574     elif os.path.exists(status_files_summary):
575         LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
576         shutil.copy(status_files_summary, cycle_dir)
577     else:
578         LOGGER.info('Summary file %s was not found' % (summary_path,))
579
580 def save_ivc_plot(bustard_object, cycle_dir):
581     """
582     Save the IVC page and its supporting images
583     """
584     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
585     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
586     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
587
588     plot_target_path = os.path.join(cycle_dir, 'Plots')
589
590     if os.path.exists(plot_html):
591         LOGGER.debug("Saving %s" % (plot_html,))
592         LOGGER.debug("Saving %s" % (plot_images,))
593         shutil.copy(plot_html, cycle_dir)
594         if not os.path.exists(plot_target_path):
595             os.mkdir(plot_target_path)
596         for plot_file in glob(plot_images):
597             shutil.copy(plot_file, plot_target_path)
598     else:
599         LOGGER.warning('Missing IVC.html file, not archiving')
600
601
602 def compress_score_files(bustard_object, cycle_dir):
603     """
604     Compress score files into our result directory
605     """
606     # check for g.pathname/Temp a new feature of 1.1rc1
607     scores_path = bustard_object.pathname
608     scores_path_temp = os.path.join(scores_path, 'Temp')
609     if os.path.isdir(scores_path_temp):
610         scores_path = scores_path_temp
611
612     # hopefully we have a directory that contains s_*_score files
613     score_files = []
614     for f in os.listdir(scores_path):
615         if re.match('.*_score.txt', f):
616             score_files.append(f)
617
618     tar_cmd = ['tar', 'c'] + score_files
619     bzip_cmd = [ 'bzip2', '-9', '-c' ]
620     tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
621     tar_dest = open(tar_dest_name, 'w')
622     LOGGER.info("Compressing score files from %s" % (scores_path,))
623     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
624     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
625     LOGGER.info("Writing to %s" % (tar_dest_name,))
626
627     env = {'BZIP': '-9'}
628     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
629                            cwd=scores_path)
630     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
631     tar.wait()
632
633
634 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
635     """
636     Compress eland result files into the archive directory
637     """
638     # copy & bzip eland files
639     bz_commands = []
640
641     for key in gerald_object.eland_results:
642         eland_lane = gerald_object.eland_results[key]
643         for source_name in eland_lane.pathnames:
644             if source_name is None:
645               LOGGER.info(
646                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
647             else:
648               path, name = os.path.split(source_name)
649               dest_name = os.path.join(cycle_dir, name)
650               LOGGER.info("Saving eland file %s to %s" % \
651                          (source_name, dest_name))
652
653               if is_compressed(name):
654                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
655                 shutil.copy(source_name, dest_name)
656               else:
657                 # not compressed
658                 dest_name += '.bz2'
659                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
660                 bz_commands.append(" ".join(args))
661                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
662                 #bzip_dest = open(dest_name, 'w')
663                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
664                 #LOGGER.info('Saving to %s' % (dest_name, ))
665                 #bzip.wait()
666
667     if len(bz_commands) > 0:
668       q = QueueCommands(bz_commands, num_jobs)
669       q.run()
670
671
672 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
673     """
674     Iterate over runfolders in runs extracting the most useful information.
675       * run parameters (in run-*.xml)
676       * eland_result files
677       * score files
678       * Summary.htm
679       * srf files (raw sequence & qualities)
680     """
681     if output_base_dir is None:
682         output_base_dir = os.getcwd()
683
684     for r in runs:
685         result_dir = os.path.join(output_base_dir, r.flowcell_id)
686         LOGGER.info("Using %s as result directory" % (result_dir,))
687         if not os.path.exists(result_dir):
688             os.mkdir(result_dir)
689
690         # create cycle_dir
691         cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
692         LOGGER.info("Filling in %s" % (cycle,))
693         cycle_dir = os.path.join(result_dir, cycle)
694         cycle_dir = os.path.abspath(cycle_dir)
695         if os.path.exists(cycle_dir):
696             LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
697             continue
698         else:
699             os.mkdir(cycle_dir)
700
701         # save run file
702         r.save(cycle_dir)
703
704         # save illumina flowcell status report
705         save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
706                               cycle_dir)
707
708         # save stuff from bustard
709         # grab IVC plot
710         save_ivc_plot(r.bustard, cycle_dir)
711
712         # build base call saving commands
713         if site is not None:
714             save_raw_data(num_jobs, r, site, raw_format, cycle_dir)
715
716         # save stuff from GERALD
717         # copy stuff out of the main run
718         g = r.gerald
719
720         # save summary file
721         save_summary_file(r, cycle_dir)
722
723         # compress eland result files
724         compress_eland_results(g, cycle_dir, num_jobs)
725
726         # md5 all the compressed files once we're done
727         md5_commands = srf.make_md5_commands(cycle_dir)
728         srf.run_commands(cycle_dir, md5_commands, num_jobs)
729
730 def save_raw_data(num_jobs, r, site, raw_format, cycle_dir):
731     lanes = []
732     for lane in r.gerald.lanes:
733         lane_parameters = r.gerald.lanes.get(lane, None)
734         if lane_parameters is not None:
735             lanes.append(lane)
736
737     run_name = srf.pathname_to_run_name(r.pathname)
738     seq_cmds = []
739     if raw_format is None:
740         raw_format = r.bustard.sequence_format
741
742     LOGGER.info("Raw Format is: %s" % (raw_format, ))
743     if raw_format == 'fastq':
744         rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
745         LOGGER.info("raw data = %s" % (rawpath,))
746         srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
747     elif raw_format == 'qseq':
748         seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
749     elif raw_format == 'srf':
750         seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
751     else:
752         raise ValueError('Unknown --raw-format=%s' % (raw_format))
753     srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
754
755 def rm_list(files, dry_run=True):
756     for f in files:
757         if os.path.exists(f):
758             LOGGER.info('deleting %s' % (f,))
759             if not dry_run:
760                 if os.path.isdir(f):
761                     shutil.rmtree(f)
762                 else:
763                     os.unlink(f)
764         else:
765             LOGGER.warn("%s doesn't exist." % (f,))
766
767 def clean_runs(runs, dry_run=True):
768     """
769     Clean up run folders to optimize for compression.
770     """
771     if dry_run:
772         LOGGER.info('In dry-run mode')
773
774     for run in runs:
775         LOGGER.info('Cleaninging %s' % (run.pathname,))
776         # rm RunLog*.xml
777         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
778         rm_list(runlogs, dry_run)
779         # rm pipeline_*.txt
780         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
781         rm_list(pipeline_logs, dry_run)
782         # rm gclog.txt?
783         # rm NetCopy.log? Isn't this robocopy?
784         logs = glob(os.path.join(run.pathname, '*.log'))
785         rm_list(logs, dry_run)
786         # rm nfn.log?
787         # Calibration
788         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
789         rm_list(calibration_dir, dry_run)
790         # rm Images/L*
791         LOGGER.info("Cleaning images")
792         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
793         rm_list(image_dirs, dry_run)
794         # rm ReadPrep
795         LOGGER.info("Cleaning ReadPrep*")
796         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
797         rm_list(read_prep_dirs, dry_run)
798         # rm ReadPrep
799         LOGGER.info("Cleaning Thubmnail_images")
800         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
801         rm_list(thumbnail_dirs, dry_run)
802
803         # make clean_intermediate
804         logging.info("Cleaning intermediate files")
805         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
806             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
807                                              cwd=run.image_analysis.pathname,)
808             clean_process.wait()
809
810
811