Fix bugs introduduced by the improved HiSeq runfolder scanning.
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import tarfile
13 import time
14
15 try:
16     from xml.etree import ElementTree
17 except ImportError, e:
18     from elementtree import ElementTree
19
20 LOGGER = logging.getLogger(__name__)
21
22 EUROPEAN_STRPTIME = "%d-%m-%Y"
23 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
24 VERSION_RE = "([0-9\.]+)"
25 USER_RE = "([a-zA-Z0-9]+)"
26 LANES_PER_FLOWCELL = 8
27 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
28
29 from htsworkflow.util.alphanum import alphanum
30 from htsworkflow.util.ethelp import indent, flatten
31 from htsworkflow.util.queuecommands import QueueCommands
32
33 from htsworkflow.pipelines import srf
34
35 class PipelineRun(object):
36     """
37     Capture "interesting" information about a pipeline run
38     """
39     XML_VERSION = 1
40     PIPELINE_RUN = 'PipelineRun'
41     FLOWCELL_ID = 'FlowcellID'
42
43     def __init__(self, pathname=None, flowcell_id=None, xml=None):
44         if pathname is not None:
45           self.pathname = os.path.normpath(pathname)
46         else:
47           self.pathname = None
48         self._name = None
49         self._flowcell_id = flowcell_id
50         self.datadir = None
51         self.image_analysis = None
52         self.bustard = None
53         self.gerald = None
54
55         if xml is not None:
56           self.set_elements(xml)
57
58     def _get_flowcell_id(self):
59         # extract flowcell ID
60         if self._flowcell_id is None:
61             config_dir = os.path.join(self.pathname, 'Config')
62             flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
63             if os.path.exists(flowcell_id_path):
64                 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
65                 self._flowcell_id = flowcell_id_tree.findtext('Text')
66             else:
67                 path_fields = self.pathname.split('_')
68                 if len(path_fields) > 0:
69                     # guessing last element of filename
70                    self._flowcell_id = path_fields[-1]
71                 else:
72                    self._flowcell_id = 'unknown'
73
74                    LOGGER.warning(
75                        "Flowcell id was not found, guessing %s" % (
76                        self._flowcell_id))
77
78         return self._flowcell_id
79     flowcell_id = property(_get_flowcell_id)
80
81     def _get_runfolder_name(self):
82         if self.gerald is None:
83             return None
84         else:
85             return self.gerald.runfolder_name
86     runfolder_name = property(_get_runfolder_name)
87
88     def get_elements(self):
89         """
90         make one master xml file from all of our sub-components.
91         """
92         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
93         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
94         flowcell.text = self.flowcell_id
95         root.append(self.image_analysis.get_elements())
96         root.append(self.bustard.get_elements())
97         root.append(self.gerald.get_elements())
98         return root
99
100     def set_elements(self, tree):
101         # this file gets imported by all the others,
102         # so we need to hide the imports to avoid a cyclic imports
103         from htsworkflow.pipelines import firecrest
104         from htsworkflow.pipelines import ipar
105         from htsworkflow.pipelines import bustard
106         from htsworkflow.pipelines import gerald
107
108         tag = tree.tag.lower()
109         if tag != PipelineRun.PIPELINE_RUN.lower():
110           raise ValueError('Pipeline Run Expecting %s got %s' % (
111               PipelineRun.PIPELINE_RUN, tag))
112         for element in tree:
113           tag = element.tag.lower()
114           if tag == PipelineRun.FLOWCELL_ID.lower():
115             self._flowcell_id = element.text
116           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
117           # you should only have Firecrest or IPAR, never both of them.
118           elif tag == firecrest.Firecrest.FIRECREST.lower():
119             self.image_analysis = firecrest.Firecrest(xml=element)
120           elif tag == ipar.IPAR.IPAR.lower():
121             self.image_analysis = ipar.IPAR(xml=element)
122           elif tag == bustard.Bustard.BUSTARD.lower():
123             self.bustard = bustard.Bustard(xml=element)
124           elif tag == gerald.Gerald.GERALD.lower():
125             self.gerald = gerald.Gerald(xml=element)
126           else:
127             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
128
129     def _get_run_name(self):
130         """
131         Given a run tuple, find the latest date and use that as our name
132         """
133         if self._name is None:
134           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
135           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
136           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
137         return self._name
138     name = property(_get_run_name)
139
140     def save(self, destdir=None):
141         if destdir is None:
142             destdir = ''
143         LOGGER.info("Saving run report " + self.name)
144         xml = self.get_elements()
145         indent(xml)
146         dest_pathname = os.path.join(destdir, self.name)
147         ElementTree.ElementTree(xml).write(dest_pathname)
148
149     def load(self, filename):
150         LOGGER.info("Loading run report from " + filename)
151         tree = ElementTree.parse(filename).getroot()
152         self.set_elements(tree)
153
154 def load_pipeline_run_xml(pathname):
155     """
156     Load and instantiate a Pipeline run from a run xml file
157
158     :Parameters:
159       - `pathname` : location of an run xml file
160
161     :Returns: initialized PipelineRun object
162     """
163     tree = ElementTree.parse(pathname).getroot()
164     run = PipelineRun(xml=tree)
165     return run
166
167 def get_runs(runfolder, flowcell_id=None):
168     """
169     Search through a run folder for all the various sub component runs
170     and then return a PipelineRun for each different combination.
171
172     For example if there are two different GERALD runs, this will
173     generate two different PipelineRun objects, that differ
174     in there gerald component.
175     """
176     from htsworkflow.pipelines import firecrest
177     from htsworkflow.pipelines import ipar
178     from htsworkflow.pipelines import bustard
179     from htsworkflow.pipelines import gerald
180
181     def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname):
182         LOGGER.info("Looking for bustard directories in %s" % (pathname,))
183         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
184         # RTA BaseCalls looks enough like Bustard.
185         bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
186         for bustard_pathname in bustard_dirs:
187             LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
188             b = bustard.bustard(bustard_pathname)
189             build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
190
191             build_aligned_runs(image_analysis, runs, b, datadir, runfolder)
192
193     def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
194         gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
195         LOGGER.info("Looking for gerald directories in %s" % (pathname,))
196         for gerald_pathname in glob(gerald_glob):
197             LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
198             try:
199                 g = gerald.gerald(gerald_pathname)
200                 p = PipelineRun(runfolder, flowcell_id)
201                 p.datadir = datadir
202                 p.image_analysis = image_analysis
203                 p.bustard = b
204                 p.gerald = g
205                 runs.append(p)
206             except IOError, e:
207                 LOGGER.error("Ignoring " + str(e))
208
209
210     def build_aligned_runs(image_analysis, runs, b, datadir, runfolder):
211         aligned_glob = os.path.join(runfolder, 'Aligned*')
212         for aligned in glob(aligned_glob):
213             LOGGER.info("Found aligned directory %s" % (aligned,))
214             try:
215                 g = gerald.gerald(aligned)
216                 p = PipelineRun(runfolder, flowcell_id)
217                 p.datadir = datadir
218                 p.image_analysis = image_analysis
219                 p.bustard = b
220                 p.gerald = g
221                 runs.append(p)
222             except IOError, e:
223                 LOGGER.error("Ignoring " + str(e))
224
225     datadir = os.path.join(runfolder, 'Data')
226
227     LOGGER.info('Searching for runs in ' + datadir)
228     runs = []
229     # scan for firecrest directories
230     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
231         LOGGER.info('Found firecrest in ' + datadir)
232         image_analysis = firecrest.firecrest(firecrest_pathname)
233         if image_analysis is None:
234             LOGGER.warn(
235                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
236             )
237         else:
238             scan_post_image_analysis(
239                 runs, runfolder, datadir, image_analysis, firecrest_pathname
240             )
241     # scan for IPAR directories
242     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
243     # The Intensities directory from the RTA software looks a lot like IPAR
244     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
245     for ipar_pathname in ipar_dirs:
246         LOGGER.info('Found ipar directories in ' + datadir)
247         image_analysis = ipar.ipar(ipar_pathname)
248         if image_analysis is None:
249             LOGGER.warn(
250                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
251             )
252         else:
253             scan_post_image_analysis(
254                 runs, runfolder, datadir, image_analysis, ipar_pathname
255             )
256
257     return runs
258
259 def get_specific_run(gerald_dir):
260     """
261     Given a gerald directory, construct a PipelineRun out of its parents
262
263     Basically this allows specifying a particular run instead of the previous
264     get_runs which scans a runfolder for various combinations of
265     firecrest/ipar/bustard/gerald runs.
266     """
267     from htsworkflow.pipelines import firecrest
268     from htsworkflow.pipelines import ipar
269     from htsworkflow.pipelines import bustard
270     from htsworkflow.pipelines import gerald
271
272     gerald_dir = os.path.expanduser(gerald_dir)
273     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
274     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
275
276     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
277
278     LOGGER.info('--- use-run detected options ---')
279     LOGGER.info('runfolder: %s' % (runfolder_dir,))
280     LOGGER.info('image_dir: %s' % (image_dir,))
281     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
282     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
283
284     # find our processed image dir
285     image_run = None
286     # split into parent, and leaf directory
287     # leaf directory should be an IPAR or firecrest directory
288     data_dir, short_image_dir = os.path.split(image_dir)
289     LOGGER.info('data_dir: %s' % (data_dir,))
290     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
291
292     # guess which type of image processing directory we have by looking
293     # in the leaf directory name
294     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
295         image_run = firecrest.firecrest(image_dir)
296     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
297         image_run = ipar.ipar(image_dir)
298     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
299         image_run = ipar.ipar(image_dir)
300
301     # if we din't find a run, report the error and return
302     if image_run is None:
303         msg = '%s does not contain an image processing step' % (image_dir,)
304         LOGGER.error(msg)
305         return None
306
307     # find our base calling
308     base_calling_run = bustard.bustard(bustard_dir)
309     if base_calling_run is None:
310         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
311         return None
312
313     # find alignments
314     gerald_run = gerald.gerald(gerald_dir)
315     if gerald_run is None:
316         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
317         return None
318
319     p = PipelineRun(runfolder_dir)
320     p.image_analysis = image_run
321     p.bustard = base_calling_run
322     p.gerald = gerald_run
323
324     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
325     return p
326
327 def extract_run_parameters(runs):
328     """
329     Search through runfolder_path for various runs and grab their parameters
330     """
331     for run in runs:
332       run.save()
333
334 def summarize_mapped_reads(genome_map, mapped_reads):
335     """
336     Summarize per chromosome reads into a genome count
337     But handle spike-in/contamination symlinks seperately.
338     """
339     summarized_reads = {}
340     genome_reads = 0
341     genome = 'unknown'
342     for k, v in mapped_reads.items():
343         path, k = os.path.split(k)
344         if len(path) > 0 and not genome_map.has_key(path):
345             genome = path
346             genome_reads += v
347         else:
348             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
349     summarized_reads[genome] = genome_reads
350     return summarized_reads
351
352 def summarize_lane(gerald, lane_id):
353     report = []
354     summary_results = gerald.summary.lane_results
355     for end in range(len(summary_results)):
356       eland_result = gerald.eland_results.results[end][lane_id]
357       report.append("Sample name %s" % (eland_result.sample_name))
358       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
359       if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
360           cluster = summary_results[end][eland_result.lane_id].cluster
361           report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
362       report.append("Total Reads: %d" % (eland_result.reads))
363
364       if hasattr(eland_result, 'match_codes'):
365           mc = eland_result.match_codes
366           nm = mc['NM']
367           nm_percent = float(nm) / eland_result.reads * 100
368           qc = mc['QC']
369           qc_percent = float(qc) / eland_result.reads * 100
370
371           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
372           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
373           report.append('Unique (0,1,2 mismatches) %d %d %d' % \
374                         (mc['U0'], mc['U1'], mc['U2']))
375           report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
376                         (mc['R0'], mc['R1'], mc['R2']))
377
378       if hasattr(eland_result, 'genome_map'):
379           report.append("Mapped Reads")
380           mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
381           for name, counts in mapped_reads.items():
382             report.append("  %s: %d" % (name, counts))
383
384       report.append('')
385     return report
386
387 def summary_report(runs):
388     """
389     Summarize cluster numbers and mapped read counts for a runfolder
390     """
391     report = []
392     for run in runs:
393         # print a run name?
394         report.append('Summary for %s' % (run.name,))
395         # sort the report
396         eland_keys = run.gerald.eland_results.results[0].keys()
397         eland_keys.sort(alphanum)
398
399         for lane_id in eland_keys:
400             report.extend(summarize_lane(run.gerald, lane_id))
401             report.append('---')
402             report.append('')
403         return os.linesep.join(report)
404
405 def is_compressed(filename):
406     if os.path.splitext(filename)[1] == ".gz":
407         return True
408     elif os.path.splitext(filename)[1] == '.bz2':
409         return True
410     else:
411         return False
412
413 def save_flowcell_reports(data_dir, cycle_dir):
414     """
415     Save the flowcell quality reports
416     """
417     data_dir = os.path.abspath(data_dir)
418     status_file = os.path.join(data_dir, 'Status.xml')
419     reports_dir = os.path.join(data_dir, 'reports')
420     reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
421     if os.path.exists(reports_dir):
422         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
423         if os.path.exists(status_file):
424             cmd_list.extend(['Status.xml', 'Status.xsl'])
425         LOGGER.info("Saving reports from " + reports_dir)
426         cwd = os.getcwd()
427         os.chdir(data_dir)
428         q = QueueCommands([" ".join(cmd_list)])
429         q.run()
430         os.chdir(cwd)
431
432
433 def save_summary_file(pipeline, cycle_dir):
434     # Copy Summary.htm
435     gerald_object = pipeline.gerald
436     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
437     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
438     if os.path.exists(gerald_summary):
439         LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
440         shutil.copy(gerald_summary, cycle_dir)
441     elif os.path.exists(status_files_summary):
442         LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
443         shutil.copy(status_files_summary, cycle_dir)
444     else:
445         LOGGER.info('Summary file %s was not found' % (summary_path,))
446
447 def save_ivc_plot(bustard_object, cycle_dir):
448     """
449     Save the IVC page and its supporting images
450     """
451     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
452     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
453     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
454
455     plot_target_path = os.path.join(cycle_dir, 'Plots')
456
457     if os.path.exists(plot_html):
458         LOGGER.debug("Saving %s" % (plot_html,))
459         LOGGER.debug("Saving %s" % (plot_images,))
460         shutil.copy(plot_html, cycle_dir)
461         if not os.path.exists(plot_target_path):
462             os.mkdir(plot_target_path)
463         for plot_file in glob(plot_images):
464             shutil.copy(plot_file, plot_target_path)
465     else:
466         LOGGER.warning('Missing IVC.html file, not archiving')
467
468
469 def compress_score_files(bustard_object, cycle_dir):
470     """
471     Compress score files into our result directory
472     """
473     # check for g.pathname/Temp a new feature of 1.1rc1
474     scores_path = bustard_object.pathname
475     scores_path_temp = os.path.join(scores_path, 'Temp')
476     if os.path.isdir(scores_path_temp):
477         scores_path = scores_path_temp
478
479     # hopefully we have a directory that contains s_*_score files
480     score_files = []
481     for f in os.listdir(scores_path):
482         if re.match('.*_score.txt', f):
483             score_files.append(f)
484
485     tar_cmd = ['tar', 'c'] + score_files
486     bzip_cmd = [ 'bzip2', '-9', '-c' ]
487     tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
488     tar_dest = open(tar_dest_name, 'w')
489     LOGGER.info("Compressing score files from %s" % (scores_path,))
490     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
491     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
492     LOGGER.info("Writing to %s" % (tar_dest_name,))
493
494     env = {'BZIP': '-9'}
495     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
496                            cwd=scores_path)
497     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
498     tar.wait()
499
500
501 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
502     """
503     Compress eland result files into the archive directory
504     """
505     # copy & bzip eland files
506     bz_commands = []
507
508     for lanes_dictionary in gerald_object.eland_results.results:
509         for eland_lane in lanes_dictionary.values():
510             source_name = eland_lane.pathname
511             if source_name is None:
512               LOGGER.info(
513                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
514             else:
515               path, name = os.path.split(source_name)
516               dest_name = os.path.join(cycle_dir, name)
517               LOGGER.info("Saving eland file %s to %s" % \
518                          (source_name, dest_name))
519
520               if is_compressed(name):
521                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
522                 shutil.copy(source_name, dest_name)
523               else:
524                 # not compressed
525                 dest_name += '.bz2'
526                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
527                 bz_commands.append(" ".join(args))
528                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
529                 #bzip_dest = open(dest_name, 'w')
530                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
531                 #LOGGER.info('Saving to %s' % (dest_name, ))
532                 #bzip.wait()
533
534     if len(bz_commands) > 0:
535       q = QueueCommands(bz_commands, num_jobs)
536       q.run()
537
538
539 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
540     """
541     Iterate over runfolders in runs extracting the most useful information.
542       * run parameters (in run-*.xml)
543       * eland_result files
544       * score files
545       * Summary.htm
546       * srf files (raw sequence & qualities)
547     """
548     if output_base_dir is None:
549         output_base_dir = os.getcwd()
550
551     for r in runs:
552       result_dir = os.path.join(output_base_dir, r.flowcell_id)
553       LOGGER.info("Using %s as result directory" % (result_dir,))
554       if not os.path.exists(result_dir):
555         os.mkdir(result_dir)
556
557       # create cycle_dir
558       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
559       LOGGER.info("Filling in %s" % (cycle,))
560       cycle_dir = os.path.join(result_dir, cycle)
561       cycle_dir = os.path.abspath(cycle_dir)
562       if os.path.exists(cycle_dir):
563         LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
564         continue
565       else:
566         os.mkdir(cycle_dir)
567
568       # save run file
569       r.save(cycle_dir)
570
571       # save illumina flowcell status report
572       save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
573
574       # save stuff from bustard
575       # grab IVC plot
576       save_ivc_plot(r.bustard, cycle_dir)
577
578       # build base call saving commands
579       if site is not None:
580         lanes = []
581         for lane in range(1, 9):
582           lane_parameters = r.gerald.lanes.get(lane, None)
583           if lane_parameters is not None and lane_parameters.analysis != 'none':
584             lanes.append(lane)
585
586         run_name = srf.pathname_to_run_name(r.pathname)
587         seq_cmds = []
588         LOGGER.info("Raw Format is: %s" % (raw_format, ))
589         if raw_format == 'fastq':
590             rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
591             LOGGER.info("raw data = %s" % (rawpath,))
592             srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
593         elif raw_format == 'qseq':
594             seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
595         elif raw_format == 'srf':
596             seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
597         else:
598             raise ValueError('Unknown --raw-format=%s' % (raw_format))
599         srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
600
601       # save stuff from GERALD
602       # copy stuff out of the main run
603       g = r.gerald
604
605       # save summary file
606       save_summary_file(r, cycle_dir)
607
608       # compress eland result files
609       compress_eland_results(g, cycle_dir, num_jobs)
610
611       # md5 all the compressed files once we're done
612       md5_commands = srf.make_md5_commands(cycle_dir)
613       srf.run_commands(cycle_dir, md5_commands, num_jobs)
614
615 def rm_list(files, dry_run=True):
616     for f in files:
617         if os.path.exists(f):
618             LOGGER.info('deleting %s' % (f,))
619             if not dry_run:
620                 if os.path.isdir(f):
621                     shutil.rmtree(f)
622                 else:
623                     os.unlink(f)
624         else:
625             LOGGER.warn("%s doesn't exist." % (f,))
626
627 def clean_runs(runs, dry_run=True):
628     """
629     Clean up run folders to optimize for compression.
630     """
631     if dry_run:
632         LOGGER.info('In dry-run mode')
633
634     for run in runs:
635         LOGGER.info('Cleaninging %s' % (run.pathname,))
636         # rm RunLog*.xml
637         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
638         rm_list(runlogs, dry_run)
639         # rm pipeline_*.txt
640         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
641         rm_list(pipeline_logs, dry_run)
642         # rm gclog.txt?
643         # rm NetCopy.log? Isn't this robocopy?
644         logs = glob(os.path.join(run.pathname, '*.log'))
645         rm_list(logs, dry_run)
646         # rm nfn.log?
647         # Calibration
648         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
649         rm_list(calibration_dir, dry_run)
650         # rm Images/L*
651         LOGGER.info("Cleaning images")
652         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
653         rm_list(image_dirs, dry_run)
654         # rm ReadPrep
655         LOGGER.info("Cleaning ReadPrep*")
656         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
657         rm_list(read_prep_dirs, dry_run)
658         # rm ReadPrep
659         LOGGER.info("Cleaning Thubmnail_images")
660         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
661         rm_list(thumbnail_dirs, dry_run)
662
663         # make clean_intermediate
664         logging.info("Cleaning intermediate files")
665         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
666             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
667                                              cwd=run.image_analysis.pathname,)
668             clean_process.wait()
669
670
671