Return a gerald version number as a number and not a cvs string.
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import tarfile
13 import time
14
15 try:
16     from xml.etree import ElementTree
17 except ImportError, e:
18     from elementtree import ElementTree
19
20 LOGGER = logging.getLogger(__name__)
21
22 EUROPEAN_STRPTIME = "%d-%m-%Y"
23 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
24 VERSION_RE = "([0-9\.]+)"
25 USER_RE = "([a-zA-Z0-9]+)"
26 LANES_PER_FLOWCELL = 8
27 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
28
29 from htsworkflow.util.alphanum import alphanum
30 from htsworkflow.util.ethelp import indent, flatten
31 from htsworkflow.util.queuecommands import QueueCommands
32
33 from htsworkflow.pipelines import srf
34
35 class PipelineRun(object):
36     """
37     Capture "interesting" information about a pipeline run
38     """
39     XML_VERSION = 1
40     PIPELINE_RUN = 'PipelineRun'
41     FLOWCELL_ID = 'FlowcellID'
42
43     def __init__(self, pathname=None, flowcell_id=None, xml=None):
44         if pathname is not None:
45           self.pathname = os.path.normpath(pathname)
46         else:
47           self.pathname = None
48         self._name = None
49         self._flowcell_id = flowcell_id
50         self.datadir = None
51         self.image_analysis = None
52         self.bustard = None
53         self.gerald = None
54
55         if xml is not None:
56           self.set_elements(xml)
57
58     def _get_flowcell_id(self):
59         # extract flowcell ID
60         if self._flowcell_id is None:
61             config_dir = os.path.join(self.pathname, 'Config')
62             flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
63             if os.path.exists(flowcell_id_path):
64                 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
65                 self._flowcell_id = flowcell_id_tree.findtext('Text')
66             else:
67                 path_fields = self.pathname.split('_')
68                 if len(path_fields) > 0:
69                     # guessing last element of filename
70                    self._flowcell_id = path_fields[-1]
71                 else:
72                    self._flowcell_id = 'unknown'
73
74                    LOGGER.warning(
75                        "Flowcell id was not found, guessing %s" % (
76                        self._flowcell_id))
77
78         return self._flowcell_id
79     flowcell_id = property(_get_flowcell_id)
80
81     def _get_runfolder_name(self):
82         if self.gerald is None:
83             return None
84         else:
85             return self.gerald.runfolder_name
86     runfolder_name = property(_get_runfolder_name)
87
88     def get_elements(self):
89         """
90         make one master xml file from all of our sub-components.
91         """
92         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
93         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
94         flowcell.text = self.flowcell_id
95         root.append(self.image_analysis.get_elements())
96         root.append(self.bustard.get_elements())
97         root.append(self.gerald.get_elements())
98         return root
99
100     def set_elements(self, tree):
101         # this file gets imported by all the others,
102         # so we need to hide the imports to avoid a cyclic imports
103         from htsworkflow.pipelines import firecrest
104         from htsworkflow.pipelines import ipar
105         from htsworkflow.pipelines import bustard
106         from htsworkflow.pipelines import gerald
107
108         tag = tree.tag.lower()
109         if tag != PipelineRun.PIPELINE_RUN.lower():
110           raise ValueError('Pipeline Run Expecting %s got %s' % (
111               PipelineRun.PIPELINE_RUN, tag))
112         for element in tree:
113           tag = element.tag.lower()
114           if tag == PipelineRun.FLOWCELL_ID.lower():
115             self._flowcell_id = element.text
116           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
117           # you should only have Firecrest or IPAR, never both of them.
118           elif tag == firecrest.Firecrest.FIRECREST.lower():
119             self.image_analysis = firecrest.Firecrest(xml=element)
120           elif tag == ipar.IPAR.IPAR.lower():
121             self.image_analysis = ipar.IPAR(xml=element)
122           elif tag == bustard.Bustard.BUSTARD.lower():
123             self.bustard = bustard.Bustard(xml=element)
124           elif tag == gerald.Gerald.GERALD.lower():
125             self.gerald = gerald.Gerald(xml=element)
126           elif tag == gerald.CASAVA.GERALD.lower():
127             self.gerald = gerald.CASAVA(xml=element)
128           else:
129             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
130
131     def _get_run_name(self):
132         """
133         Given a run tuple, find the latest date and use that as our name
134         """
135         if self._name is None:
136           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
137           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
138           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
139         return self._name
140     name = property(_get_run_name)
141
142     def save(self, destdir=None):
143         if destdir is None:
144             destdir = ''
145         LOGGER.info("Saving run report " + self.name)
146         xml = self.get_elements()
147         indent(xml)
148         dest_pathname = os.path.join(destdir, self.name)
149         ElementTree.ElementTree(xml).write(dest_pathname)
150
151     def load(self, filename):
152         LOGGER.info("Loading run report from " + filename)
153         tree = ElementTree.parse(filename).getroot()
154         self.set_elements(tree)
155
156 def load_pipeline_run_xml(pathname):
157     """
158     Load and instantiate a Pipeline run from a run xml file
159
160     :Parameters:
161       - `pathname` : location of an run xml file
162
163     :Returns: initialized PipelineRun object
164     """
165     tree = ElementTree.parse(pathname).getroot()
166     run = PipelineRun(xml=tree)
167     return run
168
169 def get_runs(runfolder, flowcell_id=None):
170     """
171     Search through a run folder for all the various sub component runs
172     and then return a PipelineRun for each different combination.
173
174     For example if there are two different GERALD runs, this will
175     generate two different PipelineRun objects, that differ
176     in there gerald component.
177     """
178     from htsworkflow.pipelines import firecrest
179     from htsworkflow.pipelines import ipar
180     from htsworkflow.pipelines import bustard
181     from htsworkflow.pipelines import gerald
182
183     def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname):
184         LOGGER.info("Looking for bustard directories in %s" % (pathname,))
185         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
186         # RTA BaseCalls looks enough like Bustard.
187         bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
188         for bustard_pathname in bustard_dirs:
189             LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
190             b = bustard.bustard(bustard_pathname)
191             build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
192
193             build_aligned_runs(image_analysis, runs, b, datadir, runfolder)
194
195     def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
196         gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
197         LOGGER.info("Looking for gerald directories in %s" % (pathname,))
198         for gerald_pathname in glob(gerald_glob):
199             LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
200             try:
201                 g = gerald.gerald(gerald_pathname)
202                 p = PipelineRun(runfolder, flowcell_id)
203                 p.datadir = datadir
204                 p.image_analysis = image_analysis
205                 p.bustard = b
206                 p.gerald = g
207                 runs.append(p)
208             except IOError, e:
209                 LOGGER.error("Ignoring " + str(e))
210
211
212     def build_aligned_runs(image_analysis, runs, b, datadir, runfolder):
213         aligned_glob = os.path.join(runfolder, 'Aligned*')
214         for aligned in glob(aligned_glob):
215             LOGGER.info("Found aligned directory %s" % (aligned,))
216             try:
217                 g = gerald.HiSeq(aligned)
218                 p = PipelineRun(runfolder, flowcell_id)
219                 p.datadir = datadir
220                 p.image_analysis = image_analysis
221                 p.bustard = b
222                 p.gerald = g
223                 runs.append(p)
224             except IOError, e:
225                 LOGGER.error("Ignoring " + str(e))
226
227     datadir = os.path.join(runfolder, 'Data')
228
229     LOGGER.info('Searching for runs in ' + datadir)
230     runs = []
231     # scan for firecrest directories
232     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
233         LOGGER.info('Found firecrest in ' + datadir)
234         image_analysis = firecrest.firecrest(firecrest_pathname)
235         if image_analysis is None:
236             LOGGER.warn(
237                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
238             )
239         else:
240             scan_post_image_analysis(
241                 runs, runfolder, datadir, image_analysis, firecrest_pathname
242             )
243     # scan for IPAR directories
244     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
245     # The Intensities directory from the RTA software looks a lot like IPAR
246     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
247     for ipar_pathname in ipar_dirs:
248         LOGGER.info('Found ipar directories in ' + datadir)
249         image_analysis = ipar.ipar(ipar_pathname)
250         if image_analysis is None:
251             LOGGER.warn(
252                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
253             )
254         else:
255             scan_post_image_analysis(
256                 runs, runfolder, datadir, image_analysis, ipar_pathname
257             )
258
259     return runs
260
261 def get_specific_run(gerald_dir):
262     """
263     Given a gerald directory, construct a PipelineRun out of its parents
264
265     Basically this allows specifying a particular run instead of the previous
266     get_runs which scans a runfolder for various combinations of
267     firecrest/ipar/bustard/gerald runs.
268     """
269     from htsworkflow.pipelines import firecrest
270     from htsworkflow.pipelines import ipar
271     from htsworkflow.pipelines import bustard
272     from htsworkflow.pipelines import gerald
273
274     gerald_dir = os.path.expanduser(gerald_dir)
275     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
276     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
277
278     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
279
280     LOGGER.info('--- use-run detected options ---')
281     LOGGER.info('runfolder: %s' % (runfolder_dir,))
282     LOGGER.info('image_dir: %s' % (image_dir,))
283     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
284     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
285
286     # find our processed image dir
287     image_run = None
288     # split into parent, and leaf directory
289     # leaf directory should be an IPAR or firecrest directory
290     data_dir, short_image_dir = os.path.split(image_dir)
291     LOGGER.info('data_dir: %s' % (data_dir,))
292     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
293
294     # guess which type of image processing directory we have by looking
295     # in the leaf directory name
296     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
297         image_run = firecrest.firecrest(image_dir)
298     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
299         image_run = ipar.ipar(image_dir)
300     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
301         image_run = ipar.ipar(image_dir)
302
303     # if we din't find a run, report the error and return
304     if image_run is None:
305         msg = '%s does not contain an image processing step' % (image_dir,)
306         LOGGER.error(msg)
307         return None
308
309     # find our base calling
310     base_calling_run = bustard.bustard(bustard_dir)
311     if base_calling_run is None:
312         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
313         return None
314
315     # find alignments
316     gerald_run = gerald.gerald(gerald_dir)
317     if gerald_run is None:
318         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
319         return None
320
321     p = PipelineRun(runfolder_dir)
322     p.image_analysis = image_run
323     p.bustard = base_calling_run
324     p.gerald = gerald_run
325
326     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
327     return p
328
329 def extract_run_parameters(runs):
330     """
331     Search through runfolder_path for various runs and grab their parameters
332     """
333     for run in runs:
334       run.save()
335
336 def summarize_mapped_reads(genome_map, mapped_reads):
337     """
338     Summarize per chromosome reads into a genome count
339     But handle spike-in/contamination symlinks seperately.
340     """
341     summarized_reads = {}
342     genome_reads = 0
343     genome = 'unknown'
344     for k, v in mapped_reads.items():
345         path, k = os.path.split(k)
346         if len(path) > 0 and not genome_map.has_key(path):
347             genome = path
348             genome_reads += v
349         else:
350             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
351     summarized_reads[genome] = genome_reads
352     return summarized_reads
353
354 def summarize_lane(gerald, lane_id):
355     report = []
356     summary_results = gerald.summary.lane_results
357     for end in range(len(summary_results)):
358       eland_result = gerald.eland_results.results[end][lane_id]
359       report.append("Sample name %s" % (eland_result.sample_name))
360       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
361       if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
362           cluster = summary_results[end][eland_result.lane_id].cluster
363           report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
364       report.append("Total Reads: %d" % (eland_result.reads))
365
366       if hasattr(eland_result, 'match_codes'):
367           mc = eland_result.match_codes
368           nm = mc['NM']
369           nm_percent = float(nm) / eland_result.reads * 100
370           qc = mc['QC']
371           qc_percent = float(qc) / eland_result.reads * 100
372
373           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
374           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
375           report.append('Unique (0,1,2 mismatches) %d %d %d' % \
376                         (mc['U0'], mc['U1'], mc['U2']))
377           report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
378                         (mc['R0'], mc['R1'], mc['R2']))
379
380       if hasattr(eland_result, 'genome_map'):
381           report.append("Mapped Reads")
382           mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
383           for name, counts in mapped_reads.items():
384             report.append("  %s: %d" % (name, counts))
385
386       report.append('')
387     return report
388
389 def summary_report(runs):
390     """
391     Summarize cluster numbers and mapped read counts for a runfolder
392     """
393     report = []
394     for run in runs:
395         # print a run name?
396         report.append('Summary for %s' % (run.name,))
397         # sort the report
398         eland_keys = run.gerald.eland_results.results[0].keys()
399         eland_keys.sort(alphanum)
400
401         for lane_id in eland_keys:
402             report.extend(summarize_lane(run.gerald, lane_id))
403             report.append('---')
404             report.append('')
405         return os.linesep.join(report)
406
407 def is_compressed(filename):
408     if os.path.splitext(filename)[1] == ".gz":
409         return True
410     elif os.path.splitext(filename)[1] == '.bz2':
411         return True
412     else:
413         return False
414
415 def save_flowcell_reports(data_dir, cycle_dir):
416     """
417     Save the flowcell quality reports
418     """
419     data_dir = os.path.abspath(data_dir)
420     status_file = os.path.join(data_dir, 'Status.xml')
421     reports_dir = os.path.join(data_dir, 'reports')
422     reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
423     if os.path.exists(reports_dir):
424         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
425         if os.path.exists(status_file):
426             cmd_list.extend(['Status.xml', 'Status.xsl'])
427         LOGGER.info("Saving reports from " + reports_dir)
428         cwd = os.getcwd()
429         os.chdir(data_dir)
430         q = QueueCommands([" ".join(cmd_list)])
431         q.run()
432         os.chdir(cwd)
433
434
435 def save_summary_file(pipeline, cycle_dir):
436     # Copy Summary.htm
437     gerald_object = pipeline.gerald
438     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
439     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
440     if os.path.exists(gerald_summary):
441         LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
442         shutil.copy(gerald_summary, cycle_dir)
443     elif os.path.exists(status_files_summary):
444         LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
445         shutil.copy(status_files_summary, cycle_dir)
446     else:
447         LOGGER.info('Summary file %s was not found' % (summary_path,))
448
449 def save_ivc_plot(bustard_object, cycle_dir):
450     """
451     Save the IVC page and its supporting images
452     """
453     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
454     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
455     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
456
457     plot_target_path = os.path.join(cycle_dir, 'Plots')
458
459     if os.path.exists(plot_html):
460         LOGGER.debug("Saving %s" % (plot_html,))
461         LOGGER.debug("Saving %s" % (plot_images,))
462         shutil.copy(plot_html, cycle_dir)
463         if not os.path.exists(plot_target_path):
464             os.mkdir(plot_target_path)
465         for plot_file in glob(plot_images):
466             shutil.copy(plot_file, plot_target_path)
467     else:
468         LOGGER.warning('Missing IVC.html file, not archiving')
469
470
471 def compress_score_files(bustard_object, cycle_dir):
472     """
473     Compress score files into our result directory
474     """
475     # check for g.pathname/Temp a new feature of 1.1rc1
476     scores_path = bustard_object.pathname
477     scores_path_temp = os.path.join(scores_path, 'Temp')
478     if os.path.isdir(scores_path_temp):
479         scores_path = scores_path_temp
480
481     # hopefully we have a directory that contains s_*_score files
482     score_files = []
483     for f in os.listdir(scores_path):
484         if re.match('.*_score.txt', f):
485             score_files.append(f)
486
487     tar_cmd = ['tar', 'c'] + score_files
488     bzip_cmd = [ 'bzip2', '-9', '-c' ]
489     tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
490     tar_dest = open(tar_dest_name, 'w')
491     LOGGER.info("Compressing score files from %s" % (scores_path,))
492     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
493     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
494     LOGGER.info("Writing to %s" % (tar_dest_name,))
495
496     env = {'BZIP': '-9'}
497     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
498                            cwd=scores_path)
499     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
500     tar.wait()
501
502
503 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
504     """
505     Compress eland result files into the archive directory
506     """
507     # copy & bzip eland files
508     bz_commands = []
509
510     for lanes_dictionary in gerald_object.eland_results.results:
511         for eland_lane in lanes_dictionary.values():
512             source_name = eland_lane.pathname
513             if source_name is None:
514               LOGGER.info(
515                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
516             else:
517               path, name = os.path.split(source_name)
518               dest_name = os.path.join(cycle_dir, name)
519               LOGGER.info("Saving eland file %s to %s" % \
520                          (source_name, dest_name))
521
522               if is_compressed(name):
523                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
524                 shutil.copy(source_name, dest_name)
525               else:
526                 # not compressed
527                 dest_name += '.bz2'
528                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
529                 bz_commands.append(" ".join(args))
530                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
531                 #bzip_dest = open(dest_name, 'w')
532                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
533                 #LOGGER.info('Saving to %s' % (dest_name, ))
534                 #bzip.wait()
535
536     if len(bz_commands) > 0:
537       q = QueueCommands(bz_commands, num_jobs)
538       q.run()
539
540
541 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
542     """
543     Iterate over runfolders in runs extracting the most useful information.
544       * run parameters (in run-*.xml)
545       * eland_result files
546       * score files
547       * Summary.htm
548       * srf files (raw sequence & qualities)
549     """
550     if output_base_dir is None:
551         output_base_dir = os.getcwd()
552
553     for r in runs:
554       result_dir = os.path.join(output_base_dir, r.flowcell_id)
555       LOGGER.info("Using %s as result directory" % (result_dir,))
556       if not os.path.exists(result_dir):
557         os.mkdir(result_dir)
558
559       # create cycle_dir
560       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
561       LOGGER.info("Filling in %s" % (cycle,))
562       cycle_dir = os.path.join(result_dir, cycle)
563       cycle_dir = os.path.abspath(cycle_dir)
564       if os.path.exists(cycle_dir):
565         LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
566         continue
567       else:
568         os.mkdir(cycle_dir)
569
570       # save run file
571       r.save(cycle_dir)
572
573       # save illumina flowcell status report
574       save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
575
576       # save stuff from bustard
577       # grab IVC plot
578       save_ivc_plot(r.bustard, cycle_dir)
579
580       # build base call saving commands
581       if site is not None:
582         lanes = []
583         for lane in range(1, 9):
584           lane_parameters = r.gerald.lanes.get(lane, None)
585           if lane_parameters is not None and lane_parameters.analysis != 'none':
586             lanes.append(lane)
587
588         run_name = srf.pathname_to_run_name(r.pathname)
589         seq_cmds = []
590         LOGGER.info("Raw Format is: %s" % (raw_format, ))
591         if raw_format == 'fastq':
592             rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
593             LOGGER.info("raw data = %s" % (rawpath,))
594             srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
595         elif raw_format == 'qseq':
596             seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
597         elif raw_format == 'srf':
598             seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
599         else:
600             raise ValueError('Unknown --raw-format=%s' % (raw_format))
601         srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
602
603       # save stuff from GERALD
604       # copy stuff out of the main run
605       g = r.gerald
606
607       # save summary file
608       save_summary_file(r, cycle_dir)
609
610       # compress eland result files
611       compress_eland_results(g, cycle_dir, num_jobs)
612
613       # md5 all the compressed files once we're done
614       md5_commands = srf.make_md5_commands(cycle_dir)
615       srf.run_commands(cycle_dir, md5_commands, num_jobs)
616
617 def rm_list(files, dry_run=True):
618     for f in files:
619         if os.path.exists(f):
620             LOGGER.info('deleting %s' % (f,))
621             if not dry_run:
622                 if os.path.isdir(f):
623                     shutil.rmtree(f)
624                 else:
625                     os.unlink(f)
626         else:
627             LOGGER.warn("%s doesn't exist." % (f,))
628
629 def clean_runs(runs, dry_run=True):
630     """
631     Clean up run folders to optimize for compression.
632     """
633     if dry_run:
634         LOGGER.info('In dry-run mode')
635
636     for run in runs:
637         LOGGER.info('Cleaninging %s' % (run.pathname,))
638         # rm RunLog*.xml
639         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
640         rm_list(runlogs, dry_run)
641         # rm pipeline_*.txt
642         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
643         rm_list(pipeline_logs, dry_run)
644         # rm gclog.txt?
645         # rm NetCopy.log? Isn't this robocopy?
646         logs = glob(os.path.join(run.pathname, '*.log'))
647         rm_list(logs, dry_run)
648         # rm nfn.log?
649         # Calibration
650         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
651         rm_list(calibration_dir, dry_run)
652         # rm Images/L*
653         LOGGER.info("Cleaning images")
654         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
655         rm_list(image_dirs, dry_run)
656         # rm ReadPrep
657         LOGGER.info("Cleaning ReadPrep*")
658         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
659         rm_list(read_prep_dirs, dry_run)
660         # rm ReadPrep
661         LOGGER.info("Cleaning Thubmnail_images")
662         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
663         rm_list(thumbnail_dirs, dry_run)
664
665         # make clean_intermediate
666         logging.info("Cleaning intermediate files")
667         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
668             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
669                                              cwd=run.image_analysis.pathname,)
670             clean_process.wait()
671
672
673