A better resolution to a possible circular dependency.
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import tarfile
13 import time
14
15 LOGGER = logging.getLogger(__name__)
16
17 from htsworkflow.pipelines import firecrest
18 from htsworkflow.pipelines import ipar
19 from htsworkflow.pipelines import bustard
20 from htsworkflow.pipelines import gerald
21 from htsworkflow.pipelines import ElementTree, \
22                                   EUROPEAN_STRPTIME, EUROPEAN_DATE_RE, \
23                                   VERSION_RE, USER_RE, \
24                                   LANES_PER_FLOWCELL, LANE_LIST
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
27 from htsworkflow.util.queuecommands import QueueCommands
28
29 from htsworkflow.pipelines import srf
30
31 class PipelineRun(object):
32     """
33     Capture "interesting" information about a pipeline run
34     """
35     XML_VERSION = 1
36     PIPELINE_RUN = 'PipelineRun'
37     FLOWCELL_ID = 'FlowcellID'
38
39     def __init__(self, pathname=None, flowcell_id=None, xml=None):
40         if pathname is not None:
41           self.pathname = os.path.normpath(pathname)
42         else:
43           self.pathname = None
44         self._name = None
45         self._flowcell_id = flowcell_id
46         self.datadir = None
47         self.image_analysis = None
48         self.bustard = None
49         self.gerald = None
50
51         if xml is not None:
52           self.set_elements(xml)
53
54     def _get_flowcell_id(self):
55         # extract flowcell ID
56         if self._flowcell_id is None:
57             self._flowcell_id = self._get_flowcell_id_from_runinfo()
58         if self._flowcell_id is None:
59             self._flowcell_id = self._get_flowcell_id_from_flowcellid()
60         if self._flowcell_id is None:
61             self._flowcell_id = self._get_flowcell_id_from_path()
62         if self._flowcell_id is None:
63             self._flowcell_id = 'unknown'
64
65             LOGGER.warning(
66                 "Flowcell id was not found, guessing %s" % (
67                     self._flowcell_id))
68
69         return self._flowcell_id
70     flowcell_id = property(_get_flowcell_id)
71
72     def _get_flowcell_id_from_flowcellid(self):
73         """Extract flowcell id from a Config/FlowcellId.xml file
74         """
75         config_dir = os.path.join(self.pathname, 'Config')
76         flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
77         if os.path.exists(flowcell_id_path):
78             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
79             return flowcell_id_tree.findtext('Text')
80
81     def _get_flowcell_id_from_runinfo(self):
82         """Read RunInfo file for flowcell id
83         """
84         runinfo = os.path.join(self.pathname, 'RunInfo.xml')
85         if os.path.exists(runinfo):
86             tree = ElementTree.parse(runinfo)
87             root = tree.getroot()
88             fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
89             if len(fc_nodes) == 1:
90                 return fc_nodes[0].text
91
92
93     def _get_flowcell_id_from_path(self):
94         """Guess a flowcell name from the path
95         """
96         path_fields = self.pathname.split('_')
97         if len(path_fields) > 0:
98             # guessing last element of filename
99             return path_fields[-1]
100
101     def _get_runfolder_name(self):
102         if self.gerald is None:
103             return None
104         else:
105             return self.gerald.runfolder_name
106     runfolder_name = property(_get_runfolder_name)
107
108     def get_elements(self):
109         """
110         make one master xml file from all of our sub-components.
111         """
112         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
113         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
114         flowcell.text = self.flowcell_id
115         root.append(self.image_analysis.get_elements())
116         root.append(self.bustard.get_elements())
117         root.append(self.gerald.get_elements())
118         return root
119
120     def set_elements(self, tree):
121         # this file gets imported by all the others,
122         # so we need to hide the imports to avoid a cyclic imports
123
124         tag = tree.tag.lower()
125         if tag != PipelineRun.PIPELINE_RUN.lower():
126           raise ValueError('Pipeline Run Expecting %s got %s' % (
127               PipelineRun.PIPELINE_RUN, tag))
128         for element in tree:
129           tag = element.tag.lower()
130           if tag == PipelineRun.FLOWCELL_ID.lower():
131             self._flowcell_id = element.text
132           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
133           # you should only have Firecrest or IPAR, never both of them.
134           elif tag == firecrest.Firecrest.FIRECREST.lower():
135             self.image_analysis = firecrest.Firecrest(xml=element)
136           elif tag == ipar.IPAR.IPAR.lower():
137             self.image_analysis = ipar.IPAR(xml=element)
138           elif tag == bustard.Bustard.BUSTARD.lower():
139             self.bustard = bustard.Bustard(xml=element)
140           elif tag == gerald.Gerald.GERALD.lower():
141             self.gerald = gerald.Gerald(xml=element)
142           elif tag == gerald.CASAVA.GERALD.lower():
143             self.gerald = gerald.CASAVA(xml=element)
144           else:
145             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
146
147     def _get_run_name(self):
148         """
149         Given a run tuple, find the latest date and use that as our name
150         """
151         if self._name is None:
152           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
153           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
154           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
155         return self._name
156     name = property(_get_run_name)
157
158     def save(self, destdir=None):
159         if destdir is None:
160             destdir = ''
161         LOGGER.info("Saving run report " + self.name)
162         xml = self.get_elements()
163         indent(xml)
164         dest_pathname = os.path.join(destdir, self.name)
165         ElementTree.ElementTree(xml).write(dest_pathname)
166
167     def load(self, filename):
168         LOGGER.info("Loading run report from " + filename)
169         tree = ElementTree.parse(filename).getroot()
170         self.set_elements(tree)
171
172 def load_pipeline_run_xml(pathname):
173     """
174     Load and instantiate a Pipeline run from a run xml file
175
176     :Parameters:
177       - `pathname` : location of an run xml file
178
179     :Returns: initialized PipelineRun object
180     """
181     tree = ElementTree.parse(pathname).getroot()
182     run = PipelineRun(xml=tree)
183     return run
184
185 def get_runs(runfolder, flowcell_id=None):
186     """
187     Search through a run folder for all the various sub component runs
188     and then return a PipelineRun for each different combination.
189
190     For example if there are two different GERALD runs, this will
191     generate two different PipelineRun objects, that differ
192     in there gerald component.
193     """
194     from htsworkflow.pipelines import firecrest
195     from htsworkflow.pipelines import ipar
196     from htsworkflow.pipelines import bustard
197     from htsworkflow.pipelines import gerald
198
199     datadir = os.path.join(runfolder, 'Data')
200
201     LOGGER.info('Searching for runs in ' + datadir)
202     runs = []
203     # scan for firecrest directories
204     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
205         LOGGER.info('Found firecrest in ' + datadir)
206         image_analysis = firecrest.firecrest(firecrest_pathname)
207         if image_analysis is None:
208             LOGGER.warn(
209                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
210             )
211         else:
212             scan_post_image_analysis(
213                 runs, runfolder, datadir, image_analysis, firecrest_pathname, flowcell_id
214             )
215     # scan for IPAR directories
216     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
217     # The Intensities directory from the RTA software looks a lot like IPAR
218     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
219     for ipar_pathname in ipar_dirs:
220         LOGGER.info('Found ipar directories in ' + datadir)
221         image_analysis = ipar.ipar(ipar_pathname)
222         if image_analysis is None:
223             LOGGER.warn(
224                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
225             )
226         else:
227             scan_post_image_analysis(
228                 runs, runfolder, datadir, image_analysis, ipar_pathname, flowcell_id
229             )
230
231     return runs
232
233 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
234                              pathname, flowcell_id):
235     added = build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id)
236     # If we're a multiplexed run, don't look for older run type.
237     if added > 0:
238         return
239
240     LOGGER.info("Looking for bustard directories in %s" % (pathname,))
241     bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
242     # RTA BaseCalls looks enough like Bustard.
243     bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
244     for bustard_pathname in bustard_dirs:
245         LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
246         b = bustard.bustard(bustard_pathname)
247         build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname,
248                           runfolder, flowcell_id)
249
250
251 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder,
252                       flowcell_id):
253     start = len(runs)
254     gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
255     LOGGER.info("Looking for gerald directories in %s" % (pathname,))
256     for gerald_pathname in glob(gerald_glob):
257         LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
258         try:
259             g = gerald.gerald(gerald_pathname)
260             p = PipelineRun(runfolder, flowcell_id)
261             p.datadir = datadir
262             p.image_analysis = image_analysis
263             p.bustard = b
264             p.gerald = g
265             runs.append(p)
266         except IOError, e:
267             LOGGER.error("Ignoring " + str(e))
268     return len(runs) - start
269
270
271 def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id):
272     start = len(runs)
273     aligned_glob = os.path.join(runfolder, 'Aligned*')
274     unaligned_glob = os.path.join(runfolder, 'Unaligned*')
275
276     aligned_paths = glob(aligned_glob)
277     unaligned_paths = glob(unaligned_glob)
278
279     matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths)
280     LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths))
281
282     for aligned, unaligned in matched_paths:
283         if unaligned is None:
284             LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned)
285             continue
286
287         g = gerald.gerald(aligned)
288         print "scan for aligned then remove them from unaligned list"
289         try:
290             p = PipelineRun(runfolder, flowcell_id)
291             p.datadir = datadir
292             p.image_analysis = image_analysis
293             p.bustard = bustard.bustard(unaligned)
294             if aligned:
295                 p.gerald = gerald.gerald(aligned)
296             runs.append(p)
297         except IOError, e:
298             LOGGER.error("Ignoring " + str(e))
299     return len(runs) - start
300
301 def hiseq_match_aligned_unaligned(aligned, unaligned):
302     """Match aligned and unaligned folders from seperate lists
303     """
304     unaligned_suffix_re = re.compile('Unaligned(?P<suffix>[\w]*)')
305
306     aligned_by_suffix = build_dir_dict_by_suffix('Aligned', aligned)
307     unaligned_by_suffix = build_dir_dict_by_suffix('Unaligned', unaligned)
308
309     keys = set(aligned_by_suffix.keys()).union(set(unaligned_by_suffix.keys()))
310
311     matches = []
312     for key in keys:
313         a = aligned_by_suffix.get(key)
314         u = unaligned_by_suffix.get(key)
315         matches.append((a, u))
316     return matches
317
318 def build_dir_dict_by_suffix(prefix, dirnames):
319     """Build a dictionary indexed by suffix of last directory name.
320
321     It assumes a constant prefix
322     """
323     regex = re.compile('%s(?P<suffix>[\w]*)' % (prefix,))
324
325     by_suffix = {}
326     for absname in dirnames:
327         basename = os.path.basename(absname)
328         match = regex.match(basename)
329         if match:
330             by_suffix[match.group('suffix')] = absname
331     return by_suffix
332
333 def get_specific_run(gerald_dir):
334     """
335     Given a gerald directory, construct a PipelineRun out of its parents
336
337     Basically this allows specifying a particular run instead of the previous
338     get_runs which scans a runfolder for various combinations of
339     firecrest/ipar/bustard/gerald runs.
340     """
341     from htsworkflow.pipelines import firecrest
342     from htsworkflow.pipelines import ipar
343     from htsworkflow.pipelines import bustard
344     from htsworkflow.pipelines import gerald
345
346     gerald_dir = os.path.expanduser(gerald_dir)
347     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
348     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
349
350     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
351
352     LOGGER.info('--- use-run detected options ---')
353     LOGGER.info('runfolder: %s' % (runfolder_dir,))
354     LOGGER.info('image_dir: %s' % (image_dir,))
355     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
356     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
357
358     # find our processed image dir
359     image_run = None
360     # split into parent, and leaf directory
361     # leaf directory should be an IPAR or firecrest directory
362     data_dir, short_image_dir = os.path.split(image_dir)
363     LOGGER.info('data_dir: %s' % (data_dir,))
364     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
365
366     # guess which type of image processing directory we have by looking
367     # in the leaf directory name
368     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
369         image_run = firecrest.firecrest(image_dir)
370     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
371         image_run = ipar.ipar(image_dir)
372     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
373         image_run = ipar.ipar(image_dir)
374
375     # if we din't find a run, report the error and return
376     if image_run is None:
377         msg = '%s does not contain an image processing step' % (image_dir,)
378         LOGGER.error(msg)
379         return None
380
381     # find our base calling
382     base_calling_run = bustard.bustard(bustard_dir)
383     if base_calling_run is None:
384         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
385         return None
386
387     # find alignments
388     gerald_run = gerald.gerald(gerald_dir)
389     if gerald_run is None:
390         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
391         return None
392
393     p = PipelineRun(runfolder_dir)
394     p.image_analysis = image_run
395     p.bustard = base_calling_run
396     p.gerald = gerald_run
397
398     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
399     return p
400
401 def extract_run_parameters(runs):
402     """
403     Search through runfolder_path for various runs and grab their parameters
404     """
405     for run in runs:
406       run.save()
407
408 def summarize_mapped_reads(genome_map, mapped_reads):
409     """
410     Summarize per chromosome reads into a genome count
411     But handle spike-in/contamination symlinks seperately.
412     """
413     summarized_reads = {}
414     genome_reads = 0
415     genome = 'unknown'
416     for k, v in mapped_reads.items():
417         path, k = os.path.split(k)
418         if len(path) > 0 and path not in genome_map:
419             genome = path
420             genome_reads += v
421         else:
422             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
423     summarized_reads[genome] = genome_reads
424     return summarized_reads
425
426 def summarize_lane(gerald, lane_id):
427     report = []
428     lane_results = gerald.summary.lane_results
429     eland_result = gerald.eland_results[lane_id]
430     report.append("Sample name %s" % (eland_result.sample_name))
431     report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
432
433     if lane_id.read < len(lane_results) and \
434            lane_id.lane in lane_results[lane_id.read]:
435         summary_results = lane_results[lane_id.read][lane_id.lane]
436         cluster = summary_results.cluster
437         report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
438     report.append("Total Reads: %d" % (eland_result.reads))
439
440     if hasattr(eland_result, 'match_codes'):
441         mc = eland_result.match_codes
442         nm = mc['NM']
443         nm_percent = float(nm) / eland_result.reads * 100
444         qc = mc['QC']
445         qc_percent = float(qc) / eland_result.reads * 100
446
447         report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
448         report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
449         report.append('Unique (0,1,2 mismatches) %d %d %d' % \
450                       (mc['U0'], mc['U1'], mc['U2']))
451         report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
452                       (mc['R0'], mc['R1'], mc['R2']))
453
454     if hasattr(eland_result, 'genome_map'):
455         report.append("Mapped Reads")
456         mapped_reads = summarize_mapped_reads(eland_result.genome_map,
457                                               eland_result.mapped_reads)
458         for name, counts in mapped_reads.items():
459             report.append("  %s: %d" % (name, counts))
460
461         report.append('')
462     return report
463
464 def summary_report(runs):
465     """
466     Summarize cluster numbers and mapped read counts for a runfolder
467     """
468     report = []
469     for run in runs:
470         # print a run name?
471         report.append('Summary for %s' % (run.name,))
472         # sort the report
473         eland_keys = sorted(run.gerald.eland_results.keys())
474     for lane_id in eland_keys:
475         report.extend(summarize_lane(run.gerald, lane_id))
476         report.append('---')
477         report.append('')
478     return os.linesep.join(report)
479
480 def is_compressed(filename):
481     if os.path.splitext(filename)[1] == ".gz":
482         return True
483     elif os.path.splitext(filename)[1] == '.bz2':
484         return True
485     else:
486         return False
487
488 def save_flowcell_reports(data_dir, cycle_dir):
489     """
490     Save the flowcell quality reports
491     """
492     data_dir = os.path.abspath(data_dir)
493     status_file = os.path.join(data_dir, 'Status.xml')
494     reports_dir = os.path.join(data_dir, 'reports')
495     reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
496     if os.path.exists(reports_dir):
497         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
498         if os.path.exists(status_file):
499             cmd_list.extend(['Status.xml', 'Status.xsl'])
500         LOGGER.info("Saving reports from " + reports_dir)
501         cwd = os.getcwd()
502         os.chdir(data_dir)
503         q = QueueCommands([" ".join(cmd_list)])
504         q.run()
505         os.chdir(cwd)
506
507
508 def save_summary_file(pipeline, cycle_dir):
509     # Copy Summary.htm
510     gerald_object = pipeline.gerald
511     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
512     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
513     if os.path.exists(gerald_summary):
514         LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
515         shutil.copy(gerald_summary, cycle_dir)
516     elif os.path.exists(status_files_summary):
517         LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
518         shutil.copy(status_files_summary, cycle_dir)
519     else:
520         LOGGER.info('Summary file %s was not found' % (summary_path,))
521
522 def save_ivc_plot(bustard_object, cycle_dir):
523     """
524     Save the IVC page and its supporting images
525     """
526     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
527     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
528     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
529
530     plot_target_path = os.path.join(cycle_dir, 'Plots')
531
532     if os.path.exists(plot_html):
533         LOGGER.debug("Saving %s" % (plot_html,))
534         LOGGER.debug("Saving %s" % (plot_images,))
535         shutil.copy(plot_html, cycle_dir)
536         if not os.path.exists(plot_target_path):
537             os.mkdir(plot_target_path)
538         for plot_file in glob(plot_images):
539             shutil.copy(plot_file, plot_target_path)
540     else:
541         LOGGER.warning('Missing IVC.html file, not archiving')
542
543
544 def compress_score_files(bustard_object, cycle_dir):
545     """
546     Compress score files into our result directory
547     """
548     # check for g.pathname/Temp a new feature of 1.1rc1
549     scores_path = bustard_object.pathname
550     scores_path_temp = os.path.join(scores_path, 'Temp')
551     if os.path.isdir(scores_path_temp):
552         scores_path = scores_path_temp
553
554     # hopefully we have a directory that contains s_*_score files
555     score_files = []
556     for f in os.listdir(scores_path):
557         if re.match('.*_score.txt', f):
558             score_files.append(f)
559
560     tar_cmd = ['tar', 'c'] + score_files
561     bzip_cmd = [ 'bzip2', '-9', '-c' ]
562     tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
563     tar_dest = open(tar_dest_name, 'w')
564     LOGGER.info("Compressing score files from %s" % (scores_path,))
565     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
566     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
567     LOGGER.info("Writing to %s" % (tar_dest_name,))
568
569     env = {'BZIP': '-9'}
570     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
571                            cwd=scores_path)
572     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
573     tar.wait()
574
575
576 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
577     """
578     Compress eland result files into the archive directory
579     """
580     # copy & bzip eland files
581     bz_commands = []
582
583     for key in gerald_object.eland_results:
584         eland_lane = gerald_object.eland_results[key]
585         for source_name in eland_lane.pathnames:
586             if source_name is None:
587               LOGGER.info(
588                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
589             else:
590               path, name = os.path.split(source_name)
591               dest_name = os.path.join(cycle_dir, name)
592               LOGGER.info("Saving eland file %s to %s" % \
593                          (source_name, dest_name))
594
595               if is_compressed(name):
596                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
597                 shutil.copy(source_name, dest_name)
598               else:
599                 # not compressed
600                 dest_name += '.bz2'
601                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
602                 bz_commands.append(" ".join(args))
603                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
604                 #bzip_dest = open(dest_name, 'w')
605                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
606                 #LOGGER.info('Saving to %s' % (dest_name, ))
607                 #bzip.wait()
608
609     if len(bz_commands) > 0:
610       q = QueueCommands(bz_commands, num_jobs)
611       q.run()
612
613
614 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
615     """
616     Iterate over runfolders in runs extracting the most useful information.
617       * run parameters (in run-*.xml)
618       * eland_result files
619       * score files
620       * Summary.htm
621       * srf files (raw sequence & qualities)
622     """
623     if output_base_dir is None:
624         output_base_dir = os.getcwd()
625
626     for r in runs:
627         result_dir = os.path.join(output_base_dir, r.flowcell_id)
628         LOGGER.info("Using %s as result directory" % (result_dir,))
629         if not os.path.exists(result_dir):
630             os.mkdir(result_dir)
631
632         # create cycle_dir
633         cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
634         LOGGER.info("Filling in %s" % (cycle,))
635         cycle_dir = os.path.join(result_dir, cycle)
636         cycle_dir = os.path.abspath(cycle_dir)
637         if os.path.exists(cycle_dir):
638             LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
639             continue
640         else:
641             os.mkdir(cycle_dir)
642
643         # save run file
644         r.save(cycle_dir)
645
646         # save illumina flowcell status report
647         save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
648                               cycle_dir)
649
650         # save stuff from bustard
651         # grab IVC plot
652         save_ivc_plot(r.bustard, cycle_dir)
653
654         # build base call saving commands
655         if site is not None:
656             save_raw_data(num_jobs, r, site, raw_format, cycle_dir)
657
658         # save stuff from GERALD
659         # copy stuff out of the main run
660         g = r.gerald
661
662         # save summary file
663         save_summary_file(r, cycle_dir)
664
665         # compress eland result files
666         compress_eland_results(g, cycle_dir, num_jobs)
667
668         # md5 all the compressed files once we're done
669         md5_commands = srf.make_md5_commands(cycle_dir)
670         srf.run_commands(cycle_dir, md5_commands, num_jobs)
671
672 def save_raw_data(num_jobs, r, site, raw_format, cycle_dir):
673     lanes = []
674     for lane in r.gerald.lanes:
675         lane_parameters = r.gerald.lanes.get(lane, None)
676         if lane_parameters is not None:
677             lanes.append(lane)
678
679     run_name = srf.pathname_to_run_name(r.pathname)
680     seq_cmds = []
681     if raw_format is None:
682         raw_format = r.bustard.sequence_format
683
684     LOGGER.info("Raw Format is: %s" % (raw_format, ))
685     if raw_format == 'fastq':
686         rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
687         LOGGER.info("raw data = %s" % (rawpath,))
688         srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
689     elif raw_format == 'qseq':
690         seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
691     elif raw_format == 'srf':
692         seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
693     else:
694         raise ValueError('Unknown --raw-format=%s' % (raw_format))
695     srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
696
697 def rm_list(files, dry_run=True):
698     for f in files:
699         if os.path.exists(f):
700             LOGGER.info('deleting %s' % (f,))
701             if not dry_run:
702                 if os.path.isdir(f):
703                     shutil.rmtree(f)
704                 else:
705                     os.unlink(f)
706         else:
707             LOGGER.warn("%s doesn't exist." % (f,))
708
709 def clean_runs(runs, dry_run=True):
710     """
711     Clean up run folders to optimize for compression.
712     """
713     if dry_run:
714         LOGGER.info('In dry-run mode')
715
716     for run in runs:
717         LOGGER.info('Cleaninging %s' % (run.pathname,))
718         # rm RunLog*.xml
719         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
720         rm_list(runlogs, dry_run)
721         # rm pipeline_*.txt
722         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
723         rm_list(pipeline_logs, dry_run)
724         # rm gclog.txt?
725         # rm NetCopy.log? Isn't this robocopy?
726         logs = glob(os.path.join(run.pathname, '*.log'))
727         rm_list(logs, dry_run)
728         # rm nfn.log?
729         # Calibration
730         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
731         rm_list(calibration_dir, dry_run)
732         # rm Images/L*
733         LOGGER.info("Cleaning images")
734         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
735         rm_list(image_dirs, dry_run)
736         # rm ReadPrep
737         LOGGER.info("Cleaning ReadPrep*")
738         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
739         rm_list(read_prep_dirs, dry_run)
740         # rm ReadPrep
741         LOGGER.info("Cleaning Thubmnail_images")
742         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
743         rm_list(thumbnail_dirs, dry_run)
744
745         # make clean_intermediate
746         logging.info("Cleaning intermediate files")
747         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
748             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
749                                              cwd=run.image_analysis.pathname,)
750             clean_process.wait()
751
752
753