Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import tarfile
13 import time
14
15 import lxml.etree as ElementTree
16
17 LOGGER = logging.getLogger(__name__)
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
25
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
28 from htsworkflow.util.queuecommands import QueueCommands
29
30 from htsworkflow.pipelines import srf
31
32 class PipelineRun(object):
33     """
34     Capture "interesting" information about a pipeline run
35     """
36     XML_VERSION = 1
37     PIPELINE_RUN = 'PipelineRun'
38     FLOWCELL_ID = 'FlowcellID'
39
40     def __init__(self, pathname=None, flowcell_id=None, xml=None):
41         if pathname is not None:
42           self.pathname = os.path.normpath(pathname)
43         else:
44           self.pathname = None
45         self._name = None
46         self._flowcell_id = flowcell_id
47         self.datadir = None
48         self.image_analysis = None
49         self.bustard = None
50         self.gerald = None
51
52         if xml is not None:
53           self.set_elements(xml)
54
55     def _get_flowcell_id(self):
56         # extract flowcell ID
57         if self._flowcell_id is None:
58             self._flowcell_id = self._get_flowcell_id_from_runinfo()
59         if self._flowcell_id is None:
60             self._flowcell_id = self._get_flowcell_id_from_flowcellid()
61         if self._flowcell_id is None:
62             self._flowcell_id = self._get_flowcell_id_from_path()
63         if self._flowcell_id is None:
64             self._flowcell_id = 'unknown'
65
66             LOGGER.warning(
67                 "Flowcell id was not found, guessing %s" % (
68                     self._flowcell_id))
69
70         return self._flowcell_id
71     flowcell_id = property(_get_flowcell_id)
72
73     def _get_flowcell_id_from_flowcellid(self):
74         """Extract flowcell id from a Config/FlowcellId.xml file
75         """
76         config_dir = os.path.join(self.pathname, 'Config')
77         flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
78         if os.path.exists(flowcell_id_path):
79             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
80             return flowcell_id_tree.findtext('Text')
81
82     def _get_flowcell_id_from_runinfo(self):
83         """Read RunInfo file for flowcell id
84         """
85         runinfo = os.path.join(self.pathname, 'RunInfo.xml')
86         if os.path.exists(runinfo):
87             tree = ElementTree.parse(runinfo)
88             root = tree.getroot()
89             fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
90             if len(fc_nodes) == 1:
91                 return fc_nodes[0].text
92
93
94     def _get_flowcell_id_from_path(self):
95         """Guess a flowcell name from the path
96         """
97         path_fields = self.pathname.split('_')
98         if len(path_fields) > 0:
99             # guessing last element of filename
100             return path_fields[-1]
101
102     def _get_runfolder_name(self):
103         if self.gerald is None:
104             return None
105         else:
106             return self.gerald.runfolder_name
107     runfolder_name = property(_get_runfolder_name)
108
109     def get_elements(self):
110         """
111         make one master xml file from all of our sub-components.
112         """
113         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
114         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
115         flowcell.text = self.flowcell_id
116         root.append(self.image_analysis.get_elements())
117         root.append(self.bustard.get_elements())
118         root.append(self.gerald.get_elements())
119         return root
120
121     def set_elements(self, tree):
122         # this file gets imported by all the others,
123         # so we need to hide the imports to avoid a cyclic imports
124         from htsworkflow.pipelines import firecrest
125         from htsworkflow.pipelines import ipar
126         from htsworkflow.pipelines import bustard
127         from htsworkflow.pipelines import gerald
128
129         tag = tree.tag.lower()
130         if tag != PipelineRun.PIPELINE_RUN.lower():
131           raise ValueError('Pipeline Run Expecting %s got %s' % (
132               PipelineRun.PIPELINE_RUN, tag))
133         for element in tree:
134           tag = element.tag.lower()
135           if tag == PipelineRun.FLOWCELL_ID.lower():
136             self._flowcell_id = element.text
137           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
138           # you should only have Firecrest or IPAR, never both of them.
139           elif tag == firecrest.Firecrest.FIRECREST.lower():
140             self.image_analysis = firecrest.Firecrest(xml=element)
141           elif tag == ipar.IPAR.IPAR.lower():
142             self.image_analysis = ipar.IPAR(xml=element)
143           elif tag == bustard.Bustard.BUSTARD.lower():
144             self.bustard = bustard.Bustard(xml=element)
145           elif tag == gerald.Gerald.GERALD.lower():
146             self.gerald = gerald.Gerald(xml=element)
147           elif tag == gerald.CASAVA.GERALD.lower():
148             self.gerald = gerald.CASAVA(xml=element)
149           else:
150             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
151
152     def _get_run_name(self):
153         """
154         Given a run tuple, find the latest date and use that as our name
155         """
156         if self._name is None:
157           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
158           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
159           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
160         return self._name
161     name = property(_get_run_name)
162
163     def save(self, destdir=None):
164         if destdir is None:
165             destdir = ''
166         LOGGER.info("Saving run report " + self.name)
167         xml = self.get_elements()
168         indent(xml)
169         dest_pathname = os.path.join(destdir, self.name)
170         ElementTree.ElementTree(xml).write(dest_pathname)
171
172     def load(self, filename):
173         LOGGER.info("Loading run report from " + filename)
174         tree = ElementTree.parse(filename).getroot()
175         self.set_elements(tree)
176
177 def load_pipeline_run_xml(pathname):
178     """
179     Load and instantiate a Pipeline run from a run xml file
180
181     :Parameters:
182       - `pathname` : location of an run xml file
183
184     :Returns: initialized PipelineRun object
185     """
186     tree = ElementTree.parse(pathname).getroot()
187     run = PipelineRun(xml=tree)
188     return run
189
190 def get_runs(runfolder, flowcell_id=None):
191     """
192     Search through a run folder for all the various sub component runs
193     and then return a PipelineRun for each different combination.
194
195     For example if there are two different GERALD runs, this will
196     generate two different PipelineRun objects, that differ
197     in there gerald component.
198     """
199     from htsworkflow.pipelines import firecrest
200     from htsworkflow.pipelines import ipar
201     from htsworkflow.pipelines import bustard
202     from htsworkflow.pipelines import gerald
203
204     def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
205                                  pathname):
206         added = build_aligned_runs(image_analysis, runs, datadir, runfolder)
207         # If we're a multiplexed run, don't look for older run type.
208         if added > 0:
209             return
210
211         LOGGER.info("Looking for bustard directories in %s" % (pathname,))
212         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
213         # RTA BaseCalls looks enough like Bustard.
214         bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
215         for bustard_pathname in bustard_dirs:
216             LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
217             b = bustard.bustard(bustard_pathname)
218             build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
219
220
221     def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
222         start = len(runs)
223         gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
224         LOGGER.info("Looking for gerald directories in %s" % (pathname,))
225         for gerald_pathname in glob(gerald_glob):
226             LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
227             try:
228                 g = gerald.gerald(gerald_pathname)
229                 p = PipelineRun(runfolder, flowcell_id)
230                 p.datadir = datadir
231                 p.image_analysis = image_analysis
232                 p.bustard = b
233                 p.gerald = g
234                 runs.append(p)
235             except IOError, e:
236                 LOGGER.error("Ignoring " + str(e))
237         return len(runs) - start
238
239
240     def build_aligned_runs(image_analysis, runs, datadir, runfolder):
241         start = len(runs)
242         aligned_glob = os.path.join(runfolder, 'Aligned*')
243         for aligned in glob(aligned_glob):
244             LOGGER.info("Found aligned directory %s" % (aligned,))
245             try:
246                 g = gerald.gerald(aligned)
247                 p = PipelineRun(runfolder, flowcell_id)
248                 bustard_pathname = os.path.join(runfolder, g.runfolder_name)
249
250                 p.datadir = datadir
251                 p.image_analysis = image_analysis
252                 p.bustard = bustard.bustard(bustard_pathname)
253                 p.gerald = g
254                 runs.append(p)
255             except IOError, e:
256                 LOGGER.error("Ignoring " + str(e))
257         return len(runs) - start
258     datadir = os.path.join(runfolder, 'Data')
259
260     LOGGER.info('Searching for runs in ' + datadir)
261     runs = []
262     # scan for firecrest directories
263     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
264         LOGGER.info('Found firecrest in ' + datadir)
265         image_analysis = firecrest.firecrest(firecrest_pathname)
266         if image_analysis is None:
267             LOGGER.warn(
268                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
269             )
270         else:
271             scan_post_image_analysis(
272                 runs, runfolder, datadir, image_analysis, firecrest_pathname
273             )
274     # scan for IPAR directories
275     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
276     # The Intensities directory from the RTA software looks a lot like IPAR
277     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
278     for ipar_pathname in ipar_dirs:
279         LOGGER.info('Found ipar directories in ' + datadir)
280         image_analysis = ipar.ipar(ipar_pathname)
281         if image_analysis is None:
282             LOGGER.warn(
283                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
284             )
285         else:
286             scan_post_image_analysis(
287                 runs, runfolder, datadir, image_analysis, ipar_pathname
288             )
289
290     return runs
291
292 def get_specific_run(gerald_dir):
293     """
294     Given a gerald directory, construct a PipelineRun out of its parents
295
296     Basically this allows specifying a particular run instead of the previous
297     get_runs which scans a runfolder for various combinations of
298     firecrest/ipar/bustard/gerald runs.
299     """
300     from htsworkflow.pipelines import firecrest
301     from htsworkflow.pipelines import ipar
302     from htsworkflow.pipelines import bustard
303     from htsworkflow.pipelines import gerald
304
305     gerald_dir = os.path.expanduser(gerald_dir)
306     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
307     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
308
309     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
310
311     LOGGER.info('--- use-run detected options ---')
312     LOGGER.info('runfolder: %s' % (runfolder_dir,))
313     LOGGER.info('image_dir: %s' % (image_dir,))
314     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
315     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
316
317     # find our processed image dir
318     image_run = None
319     # split into parent, and leaf directory
320     # leaf directory should be an IPAR or firecrest directory
321     data_dir, short_image_dir = os.path.split(image_dir)
322     LOGGER.info('data_dir: %s' % (data_dir,))
323     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
324
325     # guess which type of image processing directory we have by looking
326     # in the leaf directory name
327     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
328         image_run = firecrest.firecrest(image_dir)
329     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
330         image_run = ipar.ipar(image_dir)
331     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
332         image_run = ipar.ipar(image_dir)
333
334     # if we din't find a run, report the error and return
335     if image_run is None:
336         msg = '%s does not contain an image processing step' % (image_dir,)
337         LOGGER.error(msg)
338         return None
339
340     # find our base calling
341     base_calling_run = bustard.bustard(bustard_dir)
342     if base_calling_run is None:
343         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
344         return None
345
346     # find alignments
347     gerald_run = gerald.gerald(gerald_dir)
348     if gerald_run is None:
349         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
350         return None
351
352     p = PipelineRun(runfolder_dir)
353     p.image_analysis = image_run
354     p.bustard = base_calling_run
355     p.gerald = gerald_run
356
357     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
358     return p
359
360 def extract_run_parameters(runs):
361     """
362     Search through runfolder_path for various runs and grab their parameters
363     """
364     for run in runs:
365       run.save()
366
367 def summarize_mapped_reads(genome_map, mapped_reads):
368     """
369     Summarize per chromosome reads into a genome count
370     But handle spike-in/contamination symlinks seperately.
371     """
372     summarized_reads = {}
373     genome_reads = 0
374     genome = 'unknown'
375     for k, v in mapped_reads.items():
376         path, k = os.path.split(k)
377         if len(path) > 0 and path not in genome_map:
378             genome = path
379             genome_reads += v
380         else:
381             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
382     summarized_reads[genome] = genome_reads
383     return summarized_reads
384
385 def summarize_lane(gerald, lane_id):
386     report = []
387     lane_results = gerald.summary.lane_results
388     eland_result = gerald.eland_results[lane_id]
389     report.append("Sample name %s" % (eland_result.sample_name))
390     report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
391
392     if lane_id.read < len(lane_results) and \
393            lane_id.lane in lane_results[lane_id.read]:
394         summary_results = lane_results[lane_id.read][lane_id.lane]
395         cluster = summary_results.cluster
396         report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
397     report.append("Total Reads: %d" % (eland_result.reads))
398
399     if hasattr(eland_result, 'match_codes'):
400         mc = eland_result.match_codes
401         nm = mc['NM']
402         nm_percent = float(nm) / eland_result.reads * 100
403         qc = mc['QC']
404         qc_percent = float(qc) / eland_result.reads * 100
405
406         report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
407         report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
408         report.append('Unique (0,1,2 mismatches) %d %d %d' % \
409                       (mc['U0'], mc['U1'], mc['U2']))
410         report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
411                       (mc['R0'], mc['R1'], mc['R2']))
412
413     if hasattr(eland_result, 'genome_map'):
414         report.append("Mapped Reads")
415         mapped_reads = summarize_mapped_reads(eland_result.genome_map,
416                                               eland_result.mapped_reads)
417         for name, counts in mapped_reads.items():
418             report.append("  %s: %d" % (name, counts))
419
420         report.append('')
421     return report
422
423 def summary_report(runs):
424     """
425     Summarize cluster numbers and mapped read counts for a runfolder
426     """
427     report = []
428     for run in runs:
429         # print a run name?
430         report.append('Summary for %s' % (run.name,))
431         # sort the report
432         eland_keys = sorted(run.gerald.eland_results.keys())
433     for lane_id in eland_keys:
434         report.extend(summarize_lane(run.gerald, lane_id))
435         report.append('---')
436         report.append('')
437     return os.linesep.join(report)
438
439 def is_compressed(filename):
440     if os.path.splitext(filename)[1] == ".gz":
441         return True
442     elif os.path.splitext(filename)[1] == '.bz2':
443         return True
444     else:
445         return False
446
447 def save_flowcell_reports(data_dir, cycle_dir):
448     """
449     Save the flowcell quality reports
450     """
451     data_dir = os.path.abspath(data_dir)
452     status_file = os.path.join(data_dir, 'Status.xml')
453     reports_dir = os.path.join(data_dir, 'reports')
454     reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
455     if os.path.exists(reports_dir):
456         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
457         if os.path.exists(status_file):
458             cmd_list.extend(['Status.xml', 'Status.xsl'])
459         LOGGER.info("Saving reports from " + reports_dir)
460         cwd = os.getcwd()
461         os.chdir(data_dir)
462         q = QueueCommands([" ".join(cmd_list)])
463         q.run()
464         os.chdir(cwd)
465
466
467 def save_summary_file(pipeline, cycle_dir):
468     # Copy Summary.htm
469     gerald_object = pipeline.gerald
470     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
471     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
472     if os.path.exists(gerald_summary):
473         LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
474         shutil.copy(gerald_summary, cycle_dir)
475     elif os.path.exists(status_files_summary):
476         LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
477         shutil.copy(status_files_summary, cycle_dir)
478     else:
479         LOGGER.info('Summary file %s was not found' % (summary_path,))
480
481 def save_ivc_plot(bustard_object, cycle_dir):
482     """
483     Save the IVC page and its supporting images
484     """
485     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
486     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
487     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
488
489     plot_target_path = os.path.join(cycle_dir, 'Plots')
490
491     if os.path.exists(plot_html):
492         LOGGER.debug("Saving %s" % (plot_html,))
493         LOGGER.debug("Saving %s" % (plot_images,))
494         shutil.copy(plot_html, cycle_dir)
495         if not os.path.exists(plot_target_path):
496             os.mkdir(plot_target_path)
497         for plot_file in glob(plot_images):
498             shutil.copy(plot_file, plot_target_path)
499     else:
500         LOGGER.warning('Missing IVC.html file, not archiving')
501
502
503 def compress_score_files(bustard_object, cycle_dir):
504     """
505     Compress score files into our result directory
506     """
507     # check for g.pathname/Temp a new feature of 1.1rc1
508     scores_path = bustard_object.pathname
509     scores_path_temp = os.path.join(scores_path, 'Temp')
510     if os.path.isdir(scores_path_temp):
511         scores_path = scores_path_temp
512
513     # hopefully we have a directory that contains s_*_score files
514     score_files = []
515     for f in os.listdir(scores_path):
516         if re.match('.*_score.txt', f):
517             score_files.append(f)
518
519     tar_cmd = ['tar', 'c'] + score_files
520     bzip_cmd = [ 'bzip2', '-9', '-c' ]
521     tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
522     tar_dest = open(tar_dest_name, 'w')
523     LOGGER.info("Compressing score files from %s" % (scores_path,))
524     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
525     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
526     LOGGER.info("Writing to %s" % (tar_dest_name,))
527
528     env = {'BZIP': '-9'}
529     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
530                            cwd=scores_path)
531     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
532     tar.wait()
533
534
535 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
536     """
537     Compress eland result files into the archive directory
538     """
539     # copy & bzip eland files
540     bz_commands = []
541
542     for key in gerald_object.eland_results:
543         eland_lane = gerald_object.eland_results[key]
544         for source_name in eland_lane.pathnames:
545             if source_name is None:
546               LOGGER.info(
547                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
548             else:
549               path, name = os.path.split(source_name)
550               dest_name = os.path.join(cycle_dir, name)
551               LOGGER.info("Saving eland file %s to %s" % \
552                          (source_name, dest_name))
553
554               if is_compressed(name):
555                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
556                 shutil.copy(source_name, dest_name)
557               else:
558                 # not compressed
559                 dest_name += '.bz2'
560                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
561                 bz_commands.append(" ".join(args))
562                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
563                 #bzip_dest = open(dest_name, 'w')
564                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
565                 #LOGGER.info('Saving to %s' % (dest_name, ))
566                 #bzip.wait()
567
568     if len(bz_commands) > 0:
569       q = QueueCommands(bz_commands, num_jobs)
570       q.run()
571
572
573 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
574     """
575     Iterate over runfolders in runs extracting the most useful information.
576       * run parameters (in run-*.xml)
577       * eland_result files
578       * score files
579       * Summary.htm
580       * srf files (raw sequence & qualities)
581     """
582     if output_base_dir is None:
583         output_base_dir = os.getcwd()
584
585     for r in runs:
586         result_dir = os.path.join(output_base_dir, r.flowcell_id)
587         LOGGER.info("Using %s as result directory" % (result_dir,))
588         if not os.path.exists(result_dir):
589             os.mkdir(result_dir)
590
591         # create cycle_dir
592         cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
593         LOGGER.info("Filling in %s" % (cycle,))
594         cycle_dir = os.path.join(result_dir, cycle)
595         cycle_dir = os.path.abspath(cycle_dir)
596         if os.path.exists(cycle_dir):
597             LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
598             continue
599         else:
600             os.mkdir(cycle_dir)
601
602         # save run file
603         r.save(cycle_dir)
604
605         # save illumina flowcell status report
606         save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
607                               cycle_dir)
608
609         # save stuff from bustard
610         # grab IVC plot
611         save_ivc_plot(r.bustard, cycle_dir)
612
613         # build base call saving commands
614         if site is not None:
615             save_raw_data(num_jobs, r, site, raw_format, cycle_dir)
616
617         # save stuff from GERALD
618         # copy stuff out of the main run
619         g = r.gerald
620
621         # save summary file
622         save_summary_file(r, cycle_dir)
623
624         # compress eland result files
625         compress_eland_results(g, cycle_dir, num_jobs)
626
627         # md5 all the compressed files once we're done
628         md5_commands = srf.make_md5_commands(cycle_dir)
629         srf.run_commands(cycle_dir, md5_commands, num_jobs)
630
631 def save_raw_data(num_jobs, r, site, raw_format, cycle_dir):
632     lanes = []
633     for lane in r.gerald.lanes:
634         lane_parameters = r.gerald.lanes.get(lane, None)
635         if lane_parameters is not None:
636             lanes.append(lane)
637
638     run_name = srf.pathname_to_run_name(r.pathname)
639     seq_cmds = []
640     if raw_format is None:
641         raw_format = r.bustard.sequence_format
642
643     LOGGER.info("Raw Format is: %s" % (raw_format, ))
644     if raw_format == 'fastq':
645         rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
646         LOGGER.info("raw data = %s" % (rawpath,))
647         srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
648     elif raw_format == 'qseq':
649         seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
650     elif raw_format == 'srf':
651         seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
652     else:
653         raise ValueError('Unknown --raw-format=%s' % (raw_format))
654     srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
655
656 def rm_list(files, dry_run=True):
657     for f in files:
658         if os.path.exists(f):
659             LOGGER.info('deleting %s' % (f,))
660             if not dry_run:
661                 if os.path.isdir(f):
662                     shutil.rmtree(f)
663                 else:
664                     os.unlink(f)
665         else:
666             LOGGER.warn("%s doesn't exist." % (f,))
667
668 def clean_runs(runs, dry_run=True):
669     """
670     Clean up run folders to optimize for compression.
671     """
672     if dry_run:
673         LOGGER.info('In dry-run mode')
674
675     for run in runs:
676         LOGGER.info('Cleaninging %s' % (run.pathname,))
677         # rm RunLog*.xml
678         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
679         rm_list(runlogs, dry_run)
680         # rm pipeline_*.txt
681         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
682         rm_list(pipeline_logs, dry_run)
683         # rm gclog.txt?
684         # rm NetCopy.log? Isn't this robocopy?
685         logs = glob(os.path.join(run.pathname, '*.log'))
686         rm_list(logs, dry_run)
687         # rm nfn.log?
688         # Calibration
689         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
690         rm_list(calibration_dir, dry_run)
691         # rm Images/L*
692         LOGGER.info("Cleaning images")
693         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
694         rm_list(image_dirs, dry_run)
695         # rm ReadPrep
696         LOGGER.info("Cleaning ReadPrep*")
697         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
698         rm_list(read_prep_dirs, dry_run)
699         # rm ReadPrep
700         LOGGER.info("Cleaning Thubmnail_images")
701         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
702         rm_list(thumbnail_dirs, dry_run)
703
704         # make clean_intermediate
705         logging.info("Cleaning intermediate files")
706         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
707             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
708                                              cwd=run.image_analysis.pathname,)
709             clean_process.wait()
710
711
712