Guess genome name for building compressed mapping counts from genomesize.xml
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import tarfile
13 import time
14
15 import lxml.etree as ElementTree
16
17 LOGGER = logging.getLogger(__name__)
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
25
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
28 from htsworkflow.util.queuecommands import QueueCommands
29
30 from htsworkflow.pipelines import srf
31
32 class PipelineRun(object):
33     """
34     Capture "interesting" information about a pipeline run
35     """
36     XML_VERSION = 1
37     PIPELINE_RUN = 'PipelineRun'
38     FLOWCELL_ID = 'FlowcellID'
39
40     def __init__(self, pathname=None, flowcell_id=None, xml=None):
41         if pathname is not None:
42           self.pathname = os.path.normpath(pathname)
43         else:
44           self.pathname = None
45         self._name = None
46         self._flowcell_id = flowcell_id
47         self.datadir = None
48         self.image_analysis = None
49         self.bustard = None
50         self.gerald = None
51
52         if xml is not None:
53           self.set_elements(xml)
54
55     def _get_flowcell_id(self):
56         # extract flowcell ID
57         if self._flowcell_id is None:
58             config_dir = os.path.join(self.pathname, 'Config')
59             flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
60             if os.path.exists(flowcell_id_path):
61                 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
62                 self._flowcell_id = flowcell_id_tree.findtext('Text')
63             else:
64                 path_fields = self.pathname.split('_')
65                 if len(path_fields) > 0:
66                     # guessing last element of filename
67                    self._flowcell_id = path_fields[-1]
68                 else:
69                    self._flowcell_id = 'unknown'
70
71                    LOGGER.warning(
72                        "Flowcell id was not found, guessing %s" % (
73                        self._flowcell_id))
74
75         return self._flowcell_id
76     flowcell_id = property(_get_flowcell_id)
77
78     def _get_runfolder_name(self):
79         if self.gerald is None:
80             return None
81         else:
82             return self.gerald.runfolder_name
83     runfolder_name = property(_get_runfolder_name)
84
85     def get_elements(self):
86         """
87         make one master xml file from all of our sub-components.
88         """
89         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
90         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
91         flowcell.text = self.flowcell_id
92         root.append(self.image_analysis.get_elements())
93         root.append(self.bustard.get_elements())
94         root.append(self.gerald.get_elements())
95         return root
96
97     def set_elements(self, tree):
98         # this file gets imported by all the others,
99         # so we need to hide the imports to avoid a cyclic imports
100         from htsworkflow.pipelines import firecrest
101         from htsworkflow.pipelines import ipar
102         from htsworkflow.pipelines import bustard
103         from htsworkflow.pipelines import gerald
104
105         tag = tree.tag.lower()
106         if tag != PipelineRun.PIPELINE_RUN.lower():
107           raise ValueError('Pipeline Run Expecting %s got %s' % (
108               PipelineRun.PIPELINE_RUN, tag))
109         for element in tree:
110           tag = element.tag.lower()
111           if tag == PipelineRun.FLOWCELL_ID.lower():
112             self._flowcell_id = element.text
113           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
114           # you should only have Firecrest or IPAR, never both of them.
115           elif tag == firecrest.Firecrest.FIRECREST.lower():
116             self.image_analysis = firecrest.Firecrest(xml=element)
117           elif tag == ipar.IPAR.IPAR.lower():
118             self.image_analysis = ipar.IPAR(xml=element)
119           elif tag == bustard.Bustard.BUSTARD.lower():
120             self.bustard = bustard.Bustard(xml=element)
121           elif tag == gerald.Gerald.GERALD.lower():
122             self.gerald = gerald.Gerald(xml=element)
123           elif tag == gerald.CASAVA.GERALD.lower():
124             self.gerald = gerald.CASAVA(xml=element)
125           else:
126             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
127
128     def _get_run_name(self):
129         """
130         Given a run tuple, find the latest date and use that as our name
131         """
132         if self._name is None:
133           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
134           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
135           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
136         return self._name
137     name = property(_get_run_name)
138
139     def save(self, destdir=None):
140         if destdir is None:
141             destdir = ''
142         LOGGER.info("Saving run report " + self.name)
143         xml = self.get_elements()
144         indent(xml)
145         dest_pathname = os.path.join(destdir, self.name)
146         ElementTree.ElementTree(xml).write(dest_pathname)
147
148     def load(self, filename):
149         LOGGER.info("Loading run report from " + filename)
150         tree = ElementTree.parse(filename).getroot()
151         self.set_elements(tree)
152
153 def load_pipeline_run_xml(pathname):
154     """
155     Load and instantiate a Pipeline run from a run xml file
156
157     :Parameters:
158       - `pathname` : location of an run xml file
159
160     :Returns: initialized PipelineRun object
161     """
162     tree = ElementTree.parse(pathname).getroot()
163     run = PipelineRun(xml=tree)
164     return run
165
166 def get_runs(runfolder, flowcell_id=None):
167     """
168     Search through a run folder for all the various sub component runs
169     and then return a PipelineRun for each different combination.
170
171     For example if there are two different GERALD runs, this will
172     generate two different PipelineRun objects, that differ
173     in there gerald component.
174     """
175     from htsworkflow.pipelines import firecrest
176     from htsworkflow.pipelines import ipar
177     from htsworkflow.pipelines import bustard
178     from htsworkflow.pipelines import gerald
179
180     def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname):
181         LOGGER.info("Looking for bustard directories in %s" % (pathname,))
182         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
183         # RTA BaseCalls looks enough like Bustard.
184         bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
185         for bustard_pathname in bustard_dirs:
186             LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
187             b = bustard.bustard(bustard_pathname)
188             build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
189
190             build_aligned_runs(image_analysis, runs, b, datadir, runfolder)
191
192     def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
193         gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
194         LOGGER.info("Looking for gerald directories in %s" % (pathname,))
195         for gerald_pathname in glob(gerald_glob):
196             LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
197             try:
198                 g = gerald.gerald(gerald_pathname)
199                 p = PipelineRun(runfolder, flowcell_id)
200                 p.datadir = datadir
201                 p.image_analysis = image_analysis
202                 p.bustard = b
203                 p.gerald = g
204                 runs.append(p)
205             except IOError, e:
206                 LOGGER.error("Ignoring " + str(e))
207
208
209     def build_aligned_runs(image_analysis, runs, b, datadir, runfolder):
210         aligned_glob = os.path.join(runfolder, 'Aligned*')
211         for aligned in glob(aligned_glob):
212             LOGGER.info("Found aligned directory %s" % (aligned,))
213             try:
214                 g = gerald.gerald(aligned)
215                 p = PipelineRun(runfolder, flowcell_id)
216                 p.datadir = datadir
217                 p.image_analysis = image_analysis
218                 p.bustard = b
219                 p.gerald = g
220                 runs.append(p)
221             except IOError, e:
222                 LOGGER.error("Ignoring " + str(e))
223
224     datadir = os.path.join(runfolder, 'Data')
225
226     LOGGER.info('Searching for runs in ' + datadir)
227     runs = []
228     # scan for firecrest directories
229     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
230         LOGGER.info('Found firecrest in ' + datadir)
231         image_analysis = firecrest.firecrest(firecrest_pathname)
232         if image_analysis is None:
233             LOGGER.warn(
234                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
235             )
236         else:
237             scan_post_image_analysis(
238                 runs, runfolder, datadir, image_analysis, firecrest_pathname
239             )
240     # scan for IPAR directories
241     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
242     # The Intensities directory from the RTA software looks a lot like IPAR
243     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
244     for ipar_pathname in ipar_dirs:
245         LOGGER.info('Found ipar directories in ' + datadir)
246         image_analysis = ipar.ipar(ipar_pathname)
247         if image_analysis is None:
248             LOGGER.warn(
249                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
250             )
251         else:
252             scan_post_image_analysis(
253                 runs, runfolder, datadir, image_analysis, ipar_pathname
254             )
255
256     return runs
257
258 def get_specific_run(gerald_dir):
259     """
260     Given a gerald directory, construct a PipelineRun out of its parents
261
262     Basically this allows specifying a particular run instead of the previous
263     get_runs which scans a runfolder for various combinations of
264     firecrest/ipar/bustard/gerald runs.
265     """
266     from htsworkflow.pipelines import firecrest
267     from htsworkflow.pipelines import ipar
268     from htsworkflow.pipelines import bustard
269     from htsworkflow.pipelines import gerald
270
271     gerald_dir = os.path.expanduser(gerald_dir)
272     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
273     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
274
275     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
276
277     LOGGER.info('--- use-run detected options ---')
278     LOGGER.info('runfolder: %s' % (runfolder_dir,))
279     LOGGER.info('image_dir: %s' % (image_dir,))
280     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
281     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
282
283     # find our processed image dir
284     image_run = None
285     # split into parent, and leaf directory
286     # leaf directory should be an IPAR or firecrest directory
287     data_dir, short_image_dir = os.path.split(image_dir)
288     LOGGER.info('data_dir: %s' % (data_dir,))
289     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
290
291     # guess which type of image processing directory we have by looking
292     # in the leaf directory name
293     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
294         image_run = firecrest.firecrest(image_dir)
295     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
296         image_run = ipar.ipar(image_dir)
297     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
298         image_run = ipar.ipar(image_dir)
299
300     # if we din't find a run, report the error and return
301     if image_run is None:
302         msg = '%s does not contain an image processing step' % (image_dir,)
303         LOGGER.error(msg)
304         return None
305
306     # find our base calling
307     base_calling_run = bustard.bustard(bustard_dir)
308     if base_calling_run is None:
309         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
310         return None
311
312     # find alignments
313     gerald_run = gerald.gerald(gerald_dir)
314     if gerald_run is None:
315         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
316         return None
317
318     p = PipelineRun(runfolder_dir)
319     p.image_analysis = image_run
320     p.bustard = base_calling_run
321     p.gerald = gerald_run
322
323     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
324     return p
325
326 def extract_run_parameters(runs):
327     """
328     Search through runfolder_path for various runs and grab their parameters
329     """
330     for run in runs:
331       run.save()
332
333 def summarize_mapped_reads(genome_map, mapped_reads):
334     """
335     Summarize per chromosome reads into a genome count
336     But handle spike-in/contamination symlinks seperately.
337     """
338     summarized_reads = {}
339     genome_reads = 0
340     genome = 'unknown'
341     for k, v in mapped_reads.items():
342         path, k = os.path.split(k)
343         if len(path) > 0 and not genome_map.has_key(path):
344             genome = path
345             genome_reads += v
346         else:
347             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
348     summarized_reads[genome] = genome_reads
349     return summarized_reads
350
351 def summarize_lane(gerald, lane_id):
352     report = []
353     summary_results = gerald.summary.lane_results
354     for end in range(len(summary_results)):
355       eland_result = gerald.eland_results.results[end][lane_id]
356       report.append("Sample name %s" % (eland_result.sample_name))
357       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
358       if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
359           cluster = summary_results[end][eland_result.lane_id].cluster
360           report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
361       report.append("Total Reads: %d" % (eland_result.reads))
362
363       if hasattr(eland_result, 'match_codes'):
364           mc = eland_result.match_codes
365           nm = mc['NM']
366           nm_percent = float(nm) / eland_result.reads * 100
367           qc = mc['QC']
368           qc_percent = float(qc) / eland_result.reads * 100
369
370           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
371           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
372           report.append('Unique (0,1,2 mismatches) %d %d %d' % \
373                         (mc['U0'], mc['U1'], mc['U2']))
374           report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
375                         (mc['R0'], mc['R1'], mc['R2']))
376
377       if hasattr(eland_result, 'genome_map'):
378           report.append("Mapped Reads")
379           mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
380           for name, counts in mapped_reads.items():
381             report.append("  %s: %d" % (name, counts))
382
383       report.append('')
384     return report
385
386 def summary_report(runs):
387     """
388     Summarize cluster numbers and mapped read counts for a runfolder
389     """
390     report = []
391     for run in runs:
392         # print a run name?
393         report.append('Summary for %s' % (run.name,))
394         # sort the report
395         eland_keys = run.gerald.eland_results.results[0].keys()
396         eland_keys.sort(alphanum)
397
398         for lane_id in eland_keys:
399             report.extend(summarize_lane(run.gerald, lane_id))
400             report.append('---')
401             report.append('')
402         return os.linesep.join(report)
403
404 def is_compressed(filename):
405     if os.path.splitext(filename)[1] == ".gz":
406         return True
407     elif os.path.splitext(filename)[1] == '.bz2':
408         return True
409     else:
410         return False
411
412 def save_flowcell_reports(data_dir, cycle_dir):
413     """
414     Save the flowcell quality reports
415     """
416     data_dir = os.path.abspath(data_dir)
417     status_file = os.path.join(data_dir, 'Status.xml')
418     reports_dir = os.path.join(data_dir, 'reports')
419     reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
420     if os.path.exists(reports_dir):
421         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
422         if os.path.exists(status_file):
423             cmd_list.extend(['Status.xml', 'Status.xsl'])
424         LOGGER.info("Saving reports from " + reports_dir)
425         cwd = os.getcwd()
426         os.chdir(data_dir)
427         q = QueueCommands([" ".join(cmd_list)])
428         q.run()
429         os.chdir(cwd)
430
431
432 def save_summary_file(pipeline, cycle_dir):
433     # Copy Summary.htm
434     gerald_object = pipeline.gerald
435     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
436     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
437     if os.path.exists(gerald_summary):
438         LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
439         shutil.copy(gerald_summary, cycle_dir)
440     elif os.path.exists(status_files_summary):
441         LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
442         shutil.copy(status_files_summary, cycle_dir)
443     else:
444         LOGGER.info('Summary file %s was not found' % (summary_path,))
445
446 def save_ivc_plot(bustard_object, cycle_dir):
447     """
448     Save the IVC page and its supporting images
449     """
450     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
451     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
452     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
453
454     plot_target_path = os.path.join(cycle_dir, 'Plots')
455
456     if os.path.exists(plot_html):
457         LOGGER.debug("Saving %s" % (plot_html,))
458         LOGGER.debug("Saving %s" % (plot_images,))
459         shutil.copy(plot_html, cycle_dir)
460         if not os.path.exists(plot_target_path):
461             os.mkdir(plot_target_path)
462         for plot_file in glob(plot_images):
463             shutil.copy(plot_file, plot_target_path)
464     else:
465         LOGGER.warning('Missing IVC.html file, not archiving')
466
467
468 def compress_score_files(bustard_object, cycle_dir):
469     """
470     Compress score files into our result directory
471     """
472     # check for g.pathname/Temp a new feature of 1.1rc1
473     scores_path = bustard_object.pathname
474     scores_path_temp = os.path.join(scores_path, 'Temp')
475     if os.path.isdir(scores_path_temp):
476         scores_path = scores_path_temp
477
478     # hopefully we have a directory that contains s_*_score files
479     score_files = []
480     for f in os.listdir(scores_path):
481         if re.match('.*_score.txt', f):
482             score_files.append(f)
483
484     tar_cmd = ['tar', 'c'] + score_files
485     bzip_cmd = [ 'bzip2', '-9', '-c' ]
486     tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
487     tar_dest = open(tar_dest_name, 'w')
488     LOGGER.info("Compressing score files from %s" % (scores_path,))
489     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
490     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
491     LOGGER.info("Writing to %s" % (tar_dest_name,))
492
493     env = {'BZIP': '-9'}
494     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
495                            cwd=scores_path)
496     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
497     tar.wait()
498
499
500 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
501     """
502     Compress eland result files into the archive directory
503     """
504     # copy & bzip eland files
505     bz_commands = []
506
507     for key in gerald_object.eland_results:
508         eland_lane = gerald_object.eland_results[key]
509         for source_name in eland_lane.pathnames:
510             if source_name is None:
511               LOGGER.info(
512                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
513             else:
514               path, name = os.path.split(source_name)
515               dest_name = os.path.join(cycle_dir, name)
516               LOGGER.info("Saving eland file %s to %s" % \
517                          (source_name, dest_name))
518
519               if is_compressed(name):
520                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
521                 shutil.copy(source_name, dest_name)
522               else:
523                 # not compressed
524                 dest_name += '.bz2'
525                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
526                 bz_commands.append(" ".join(args))
527                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
528                 #bzip_dest = open(dest_name, 'w')
529                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
530                 #LOGGER.info('Saving to %s' % (dest_name, ))
531                 #bzip.wait()
532
533     if len(bz_commands) > 0:
534       q = QueueCommands(bz_commands, num_jobs)
535       q.run()
536
537
538 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
539     """
540     Iterate over runfolders in runs extracting the most useful information.
541       * run parameters (in run-*.xml)
542       * eland_result files
543       * score files
544       * Summary.htm
545       * srf files (raw sequence & qualities)
546     """
547     if output_base_dir is None:
548         output_base_dir = os.getcwd()
549
550     for r in runs:
551       result_dir = os.path.join(output_base_dir, r.flowcell_id)
552       LOGGER.info("Using %s as result directory" % (result_dir,))
553       if not os.path.exists(result_dir):
554         os.mkdir(result_dir)
555
556       # create cycle_dir
557       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
558       LOGGER.info("Filling in %s" % (cycle,))
559       cycle_dir = os.path.join(result_dir, cycle)
560       cycle_dir = os.path.abspath(cycle_dir)
561       if os.path.exists(cycle_dir):
562         LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
563         continue
564       else:
565         os.mkdir(cycle_dir)
566
567       # save run file
568       r.save(cycle_dir)
569
570       # save illumina flowcell status report
571       save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
572
573       # save stuff from bustard
574       # grab IVC plot
575       save_ivc_plot(r.bustard, cycle_dir)
576
577       # build base call saving commands
578       if site is not None:
579         lanes = []
580         for lane in range(1, 9):
581           lane_parameters = r.gerald.lanes.get(lane, None)
582           if lane_parameters is not None and lane_parameters.analysis != 'none':
583             lanes.append(lane)
584
585         run_name = srf.pathname_to_run_name(r.pathname)
586         seq_cmds = []
587         LOGGER.info("Raw Format is: %s" % (raw_format, ))
588         if raw_format == 'fastq':
589             rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
590             LOGGER.info("raw data = %s" % (rawpath,))
591             srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
592         elif raw_format == 'qseq':
593             seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
594         elif raw_format == 'srf':
595             seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
596         else:
597             raise ValueError('Unknown --raw-format=%s' % (raw_format))
598         srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
599
600       # save stuff from GERALD
601       # copy stuff out of the main run
602       g = r.gerald
603
604       # save summary file
605       save_summary_file(r, cycle_dir)
606
607       # compress eland result files
608       compress_eland_results(g, cycle_dir, num_jobs)
609
610       # md5 all the compressed files once we're done
611       md5_commands = srf.make_md5_commands(cycle_dir)
612       srf.run_commands(cycle_dir, md5_commands, num_jobs)
613
614 def rm_list(files, dry_run=True):
615     for f in files:
616         if os.path.exists(f):
617             LOGGER.info('deleting %s' % (f,))
618             if not dry_run:
619                 if os.path.isdir(f):
620                     shutil.rmtree(f)
621                 else:
622                     os.unlink(f)
623         else:
624             LOGGER.warn("%s doesn't exist." % (f,))
625
626 def clean_runs(runs, dry_run=True):
627     """
628     Clean up run folders to optimize for compression.
629     """
630     if dry_run:
631         LOGGER.info('In dry-run mode')
632
633     for run in runs:
634         LOGGER.info('Cleaninging %s' % (run.pathname,))
635         # rm RunLog*.xml
636         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
637         rm_list(runlogs, dry_run)
638         # rm pipeline_*.txt
639         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
640         rm_list(pipeline_logs, dry_run)
641         # rm gclog.txt?
642         # rm NetCopy.log? Isn't this robocopy?
643         logs = glob(os.path.join(run.pathname, '*.log'))
644         rm_list(logs, dry_run)
645         # rm nfn.log?
646         # Calibration
647         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
648         rm_list(calibration_dir, dry_run)
649         # rm Images/L*
650         LOGGER.info("Cleaning images")
651         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
652         rm_list(image_dirs, dry_run)
653         # rm ReadPrep
654         LOGGER.info("Cleaning ReadPrep*")
655         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
656         rm_list(read_prep_dirs, dry_run)
657         # rm ReadPrep
658         LOGGER.info("Cleaning Thubmnail_images")
659         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
660         rm_list(thumbnail_dirs, dry_run)
661
662         # make clean_intermediate
663         logging.info("Cleaning intermediate files")
664         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
665             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
666                                              cwd=run.image_analysis.pathname,)
667             clean_process.wait()
668
669
670