Merge branch 'django1.4' of mus.cacr.caltech.edu:htsworkflow into django1.4
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import tarfile
13 import time
14
15 import lxml.etree as ElementTree
16
17 LOGGER = logging.getLogger(__name__)
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
25
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
28 from htsworkflow.util.queuecommands import QueueCommands
29
30 from htsworkflow.pipelines import srf
31
32 class PipelineRun(object):
33     """
34     Capture "interesting" information about a pipeline run
35     """
36     XML_VERSION = 1
37     PIPELINE_RUN = 'PipelineRun'
38     FLOWCELL_ID = 'FlowcellID'
39
40     def __init__(self, pathname=None, flowcell_id=None, xml=None):
41         if pathname is not None:
42           self.pathname = os.path.normpath(pathname)
43         else:
44           self.pathname = None
45         self._name = None
46         self._flowcell_id = flowcell_id
47         self.datadir = None
48         self.image_analysis = None
49         self.bustard = None
50         self.gerald = None
51
52         if xml is not None:
53           self.set_elements(xml)
54
55     def _get_flowcell_id(self):
56         # extract flowcell ID
57         if self._flowcell_id is None:
58             self._flowcell_id = self._get_flowcell_id_from_runinfo()
59         if self._flowcell_id is None:
60             self._flowcell_id = self._get_flowcell_id_from_flowcellid()
61         if self._flowcell_id is None:
62             self._flowcell_id = self._get_flowcell_id_from_path()
63         if self._flowcell_id is None:
64             self._flowcell_id = 'unknown'
65
66             LOGGER.warning(
67                 "Flowcell id was not found, guessing %s" % (
68                     self._flowcell_id))
69
70         return self._flowcell_id
71     flowcell_id = property(_get_flowcell_id)
72
73     def _get_flowcell_id_from_flowcellid(self):
74         """Extract flowcell id from a Config/FlowcellId.xml file
75         """
76         config_dir = os.path.join(self.pathname, 'Config')
77         flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
78         if os.path.exists(flowcell_id_path):
79             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
80             return flowcell_id_tree.findtext('Text')
81
82     def _get_flowcell_id_from_runinfo(self):
83         """Read RunInfo file for flowcell id
84         """
85         runinfo = os.path.join(self.pathname, 'RunInfo.xml')
86         if os.path.exists(runinfo):
87             tree = ElementTree.parse(runinfo)
88             root = tree.getroot()
89             fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
90             if len(fc_nodes) == 1:
91                 return fc_nodes[0].text
92
93
94     def _get_flowcell_id_from_path(self):
95         """Guess a flowcell name from the path
96         """
97         path_fields = self.pathname.split('_')
98         if len(path_fields) > 0:
99             # guessing last element of filename
100             return path_fields[-1]
101
102     def _get_runfolder_name(self):
103         if self.gerald is None:
104             return None
105         else:
106             return self.gerald.runfolder_name
107     runfolder_name = property(_get_runfolder_name)
108
109     def get_elements(self):
110         """
111         make one master xml file from all of our sub-components.
112         """
113         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
114         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
115         flowcell.text = self.flowcell_id
116         root.append(self.image_analysis.get_elements())
117         root.append(self.bustard.get_elements())
118         root.append(self.gerald.get_elements())
119         return root
120
121     def set_elements(self, tree):
122         # this file gets imported by all the others,
123         # so we need to hide the imports to avoid a cyclic imports
124         from htsworkflow.pipelines import firecrest
125         from htsworkflow.pipelines import ipar
126         from htsworkflow.pipelines import bustard
127         from htsworkflow.pipelines import gerald
128
129         tag = tree.tag.lower()
130         if tag != PipelineRun.PIPELINE_RUN.lower():
131           raise ValueError('Pipeline Run Expecting %s got %s' % (
132               PipelineRun.PIPELINE_RUN, tag))
133         for element in tree:
134           tag = element.tag.lower()
135           if tag == PipelineRun.FLOWCELL_ID.lower():
136             self._flowcell_id = element.text
137           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
138           # you should only have Firecrest or IPAR, never both of them.
139           elif tag == firecrest.Firecrest.FIRECREST.lower():
140             self.image_analysis = firecrest.Firecrest(xml=element)
141           elif tag == ipar.IPAR.IPAR.lower():
142             self.image_analysis = ipar.IPAR(xml=element)
143           elif tag == bustard.Bustard.BUSTARD.lower():
144             self.bustard = bustard.Bustard(xml=element)
145           elif tag == gerald.Gerald.GERALD.lower():
146             self.gerald = gerald.Gerald(xml=element)
147           elif tag == gerald.CASAVA.GERALD.lower():
148             self.gerald = gerald.CASAVA(xml=element)
149           else:
150             LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
151
152     def _get_run_name(self):
153         """
154         Given a run tuple, find the latest date and use that as our name
155         """
156         if self._name is None:
157           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
158           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
159           self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
160         return self._name
161     name = property(_get_run_name)
162
163     def save(self, destdir=None):
164         if destdir is None:
165             destdir = ''
166         LOGGER.info("Saving run report " + self.name)
167         xml = self.get_elements()
168         indent(xml)
169         dest_pathname = os.path.join(destdir, self.name)
170         ElementTree.ElementTree(xml).write(dest_pathname)
171
172     def load(self, filename):
173         LOGGER.info("Loading run report from " + filename)
174         tree = ElementTree.parse(filename).getroot()
175         self.set_elements(tree)
176
177 def load_pipeline_run_xml(pathname):
178     """
179     Load and instantiate a Pipeline run from a run xml file
180
181     :Parameters:
182       - `pathname` : location of an run xml file
183
184     :Returns: initialized PipelineRun object
185     """
186     tree = ElementTree.parse(pathname).getroot()
187     run = PipelineRun(xml=tree)
188     return run
189
190 def get_runs(runfolder, flowcell_id=None):
191     """
192     Search through a run folder for all the various sub component runs
193     and then return a PipelineRun for each different combination.
194
195     For example if there are two different GERALD runs, this will
196     generate two different PipelineRun objects, that differ
197     in there gerald component.
198     """
199     from htsworkflow.pipelines import firecrest
200     from htsworkflow.pipelines import ipar
201     from htsworkflow.pipelines import bustard
202     from htsworkflow.pipelines import gerald
203
204     datadir = os.path.join(runfolder, 'Data')
205
206     LOGGER.info('Searching for runs in ' + datadir)
207     runs = []
208     # scan for firecrest directories
209     for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
210         LOGGER.info('Found firecrest in ' + datadir)
211         image_analysis = firecrest.firecrest(firecrest_pathname)
212         if image_analysis is None:
213             LOGGER.warn(
214                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
215             )
216         else:
217             scan_post_image_analysis(
218                 runs, runfolder, datadir, image_analysis, firecrest_pathname, flowcell_id
219             )
220     # scan for IPAR directories
221     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
222     # The Intensities directory from the RTA software looks a lot like IPAR
223     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
224     for ipar_pathname in ipar_dirs:
225         LOGGER.info('Found ipar directories in ' + datadir)
226         image_analysis = ipar.ipar(ipar_pathname)
227         if image_analysis is None:
228             LOGGER.warn(
229                 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
230             )
231         else:
232             scan_post_image_analysis(
233                 runs, runfolder, datadir, image_analysis, ipar_pathname, flowcell_id
234             )
235
236     return runs
237
238 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
239                              pathname, flowcell_id):
240     added = build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id)
241     # If we're a multiplexed run, don't look for older run type.
242     if added > 0:
243         return
244
245     LOGGER.info("Looking for bustard directories in %s" % (pathname,))
246     bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
247     # RTA BaseCalls looks enough like Bustard.
248     bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
249     for bustard_pathname in bustard_dirs:
250         LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
251         b = bustard.bustard(bustard_pathname)
252         build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname,
253                           runfolder, flowcell_id)
254
255
256 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder,
257                       flowcell_id):
258     start = len(runs)
259     gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
260     LOGGER.info("Looking for gerald directories in %s" % (pathname,))
261     for gerald_pathname in glob(gerald_glob):
262         LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
263         try:
264             g = gerald.gerald(gerald_pathname)
265             p = PipelineRun(runfolder, flowcell_id)
266             p.datadir = datadir
267             p.image_analysis = image_analysis
268             p.bustard = b
269             p.gerald = g
270             runs.append(p)
271         except IOError, e:
272             LOGGER.error("Ignoring " + str(e))
273     return len(runs) - start
274
275
276 def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id):
277     start = len(runs)
278     aligned_glob = os.path.join(runfolder, 'Aligned*')
279     unaligned_glob = os.path.join(runfolder, 'Unaligned*')
280
281     aligned_paths = glob(aligned_glob)
282     unaligned_paths = glob(unaligned_glob)
283
284     matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths)
285     LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths))
286
287     for aligned, unaligned in matched_paths:
288         if unaligned is None:
289             LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned)
290             continue
291
292         g = gerald.gerald(aligned)
293         print "scan for aligned then remove them from unaligned list"
294         try:
295             p = PipelineRun(runfolder, flowcell_id)
296             p.datadir = datadir
297             p.image_analysis = image_analysis
298             p.bustard = bustard.bustard(unaligned)
299             if aligned:
300                 p.gerald = gerald.gerald(aligned)
301             runs.append(p)
302         except IOError, e:
303             LOGGER.error("Ignoring " + str(e))
304     return len(runs) - start
305
306 def hiseq_match_aligned_unaligned(aligned, unaligned):
307     """Match aligned and unaligned folders from seperate lists
308     """
309     unaligned_suffix_re = re.compile('Unaligned(?P<suffix>[\w]*)')
310
311     aligned_by_suffix = build_dir_dict_by_suffix('Aligned', aligned)
312     unaligned_by_suffix = build_dir_dict_by_suffix('Unaligned', unaligned)
313
314     keys = set(aligned_by_suffix.keys()).union(set(unaligned_by_suffix.keys()))
315
316     matches = []
317     for key in keys:
318         a = aligned_by_suffix.get(key)
319         u = unaligned_by_suffix.get(key)
320         matches.append((a, u))
321     return matches
322
323 def build_dir_dict_by_suffix(prefix, dirnames):
324     """Build a dictionary indexed by suffix of last directory name.
325
326     It assumes a constant prefix
327     """
328     regex = re.compile('%s(?P<suffix>[\w]*)' % (prefix,))
329
330     by_suffix = {}
331     for absname in dirnames:
332         basename = os.path.basename(absname)
333         match = regex.match(basename)
334         if match:
335             by_suffix[match.group('suffix')] = absname
336     return by_suffix
337
338 def get_specific_run(gerald_dir):
339     """
340     Given a gerald directory, construct a PipelineRun out of its parents
341
342     Basically this allows specifying a particular run instead of the previous
343     get_runs which scans a runfolder for various combinations of
344     firecrest/ipar/bustard/gerald runs.
345     """
346     from htsworkflow.pipelines import firecrest
347     from htsworkflow.pipelines import ipar
348     from htsworkflow.pipelines import bustard
349     from htsworkflow.pipelines import gerald
350
351     gerald_dir = os.path.expanduser(gerald_dir)
352     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
353     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
354
355     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
356
357     LOGGER.info('--- use-run detected options ---')
358     LOGGER.info('runfolder: %s' % (runfolder_dir,))
359     LOGGER.info('image_dir: %s' % (image_dir,))
360     LOGGER.info('bustard_dir: %s' % (bustard_dir,))
361     LOGGER.info('gerald_dir: %s' % (gerald_dir,))
362
363     # find our processed image dir
364     image_run = None
365     # split into parent, and leaf directory
366     # leaf directory should be an IPAR or firecrest directory
367     data_dir, short_image_dir = os.path.split(image_dir)
368     LOGGER.info('data_dir: %s' % (data_dir,))
369     LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
370
371     # guess which type of image processing directory we have by looking
372     # in the leaf directory name
373     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
374         image_run = firecrest.firecrest(image_dir)
375     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
376         image_run = ipar.ipar(image_dir)
377     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
378         image_run = ipar.ipar(image_dir)
379
380     # if we din't find a run, report the error and return
381     if image_run is None:
382         msg = '%s does not contain an image processing step' % (image_dir,)
383         LOGGER.error(msg)
384         return None
385
386     # find our base calling
387     base_calling_run = bustard.bustard(bustard_dir)
388     if base_calling_run is None:
389         LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
390         return None
391
392     # find alignments
393     gerald_run = gerald.gerald(gerald_dir)
394     if gerald_run is None:
395         LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
396         return None
397
398     p = PipelineRun(runfolder_dir)
399     p.image_analysis = image_run
400     p.bustard = base_calling_run
401     p.gerald = gerald_run
402
403     LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
404     return p
405
406 def extract_run_parameters(runs):
407     """
408     Search through runfolder_path for various runs and grab their parameters
409     """
410     for run in runs:
411       run.save()
412
413 def summarize_mapped_reads(genome_map, mapped_reads):
414     """
415     Summarize per chromosome reads into a genome count
416     But handle spike-in/contamination symlinks seperately.
417     """
418     summarized_reads = {}
419     genome_reads = 0
420     genome = 'unknown'
421     for k, v in mapped_reads.items():
422         path, k = os.path.split(k)
423         if len(path) > 0 and path not in genome_map:
424             genome = path
425             genome_reads += v
426         else:
427             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
428     summarized_reads[genome] = genome_reads
429     return summarized_reads
430
431 def summarize_lane(gerald, lane_id):
432     report = []
433     lane_results = gerald.summary.lane_results
434     eland_result = gerald.eland_results[lane_id]
435     report.append("Sample name %s" % (eland_result.sample_name))
436     report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
437
438     if lane_id.read < len(lane_results) and \
439            lane_id.lane in lane_results[lane_id.read]:
440         summary_results = lane_results[lane_id.read][lane_id.lane]
441         cluster = summary_results.cluster
442         report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
443     report.append("Total Reads: %d" % (eland_result.reads))
444
445     if hasattr(eland_result, 'match_codes'):
446         mc = eland_result.match_codes
447         nm = mc['NM']
448         nm_percent = float(nm) / eland_result.reads * 100
449         qc = mc['QC']
450         qc_percent = float(qc) / eland_result.reads * 100
451
452         report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
453         report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
454         report.append('Unique (0,1,2 mismatches) %d %d %d' % \
455                       (mc['U0'], mc['U1'], mc['U2']))
456         report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
457                       (mc['R0'], mc['R1'], mc['R2']))
458
459     if hasattr(eland_result, 'genome_map'):
460         report.append("Mapped Reads")
461         mapped_reads = summarize_mapped_reads(eland_result.genome_map,
462                                               eland_result.mapped_reads)
463         for name, counts in mapped_reads.items():
464             report.append("  %s: %d" % (name, counts))
465
466         report.append('')
467     return report
468
469 def summary_report(runs):
470     """
471     Summarize cluster numbers and mapped read counts for a runfolder
472     """
473     report = []
474     for run in runs:
475         # print a run name?
476         report.append('Summary for %s' % (run.name,))
477         # sort the report
478         eland_keys = sorted(run.gerald.eland_results.keys())
479     for lane_id in eland_keys:
480         report.extend(summarize_lane(run.gerald, lane_id))
481         report.append('---')
482         report.append('')
483     return os.linesep.join(report)
484
485 def is_compressed(filename):
486     if os.path.splitext(filename)[1] == ".gz":
487         return True
488     elif os.path.splitext(filename)[1] == '.bz2':
489         return True
490     else:
491         return False
492
493 def save_flowcell_reports(data_dir, cycle_dir):
494     """
495     Save the flowcell quality reports
496     """
497     data_dir = os.path.abspath(data_dir)
498     status_file = os.path.join(data_dir, 'Status.xml')
499     reports_dir = os.path.join(data_dir, 'reports')
500     reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
501     if os.path.exists(reports_dir):
502         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
503         if os.path.exists(status_file):
504             cmd_list.extend(['Status.xml', 'Status.xsl'])
505         LOGGER.info("Saving reports from " + reports_dir)
506         cwd = os.getcwd()
507         os.chdir(data_dir)
508         q = QueueCommands([" ".join(cmd_list)])
509         q.run()
510         os.chdir(cwd)
511
512
513 def save_summary_file(pipeline, cycle_dir):
514     # Copy Summary.htm
515     gerald_object = pipeline.gerald
516     gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
517     status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
518     if os.path.exists(gerald_summary):
519         LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
520         shutil.copy(gerald_summary, cycle_dir)
521     elif os.path.exists(status_files_summary):
522         LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
523         shutil.copy(status_files_summary, cycle_dir)
524     else:
525         LOGGER.info('Summary file %s was not found' % (summary_path,))
526
527 def save_ivc_plot(bustard_object, cycle_dir):
528     """
529     Save the IVC page and its supporting images
530     """
531     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
532     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
533     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
534
535     plot_target_path = os.path.join(cycle_dir, 'Plots')
536
537     if os.path.exists(plot_html):
538         LOGGER.debug("Saving %s" % (plot_html,))
539         LOGGER.debug("Saving %s" % (plot_images,))
540         shutil.copy(plot_html, cycle_dir)
541         if not os.path.exists(plot_target_path):
542             os.mkdir(plot_target_path)
543         for plot_file in glob(plot_images):
544             shutil.copy(plot_file, plot_target_path)
545     else:
546         LOGGER.warning('Missing IVC.html file, not archiving')
547
548
549 def compress_score_files(bustard_object, cycle_dir):
550     """
551     Compress score files into our result directory
552     """
553     # check for g.pathname/Temp a new feature of 1.1rc1
554     scores_path = bustard_object.pathname
555     scores_path_temp = os.path.join(scores_path, 'Temp')
556     if os.path.isdir(scores_path_temp):
557         scores_path = scores_path_temp
558
559     # hopefully we have a directory that contains s_*_score files
560     score_files = []
561     for f in os.listdir(scores_path):
562         if re.match('.*_score.txt', f):
563             score_files.append(f)
564
565     tar_cmd = ['tar', 'c'] + score_files
566     bzip_cmd = [ 'bzip2', '-9', '-c' ]
567     tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
568     tar_dest = open(tar_dest_name, 'w')
569     LOGGER.info("Compressing score files from %s" % (scores_path,))
570     LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
571     LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
572     LOGGER.info("Writing to %s" % (tar_dest_name,))
573
574     env = {'BZIP': '-9'}
575     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
576                            cwd=scores_path)
577     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
578     tar.wait()
579
580
581 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
582     """
583     Compress eland result files into the archive directory
584     """
585     # copy & bzip eland files
586     bz_commands = []
587
588     for key in gerald_object.eland_results:
589         eland_lane = gerald_object.eland_results[key]
590         for source_name in eland_lane.pathnames:
591             if source_name is None:
592               LOGGER.info(
593                 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
594             else:
595               path, name = os.path.split(source_name)
596               dest_name = os.path.join(cycle_dir, name)
597               LOGGER.info("Saving eland file %s to %s" % \
598                          (source_name, dest_name))
599
600               if is_compressed(name):
601                 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
602                 shutil.copy(source_name, dest_name)
603               else:
604                 # not compressed
605                 dest_name += '.bz2'
606                 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
607                 bz_commands.append(" ".join(args))
608                 #LOGGER.info('Running: %s' % ( " ".join(args) ))
609                 #bzip_dest = open(dest_name, 'w')
610                 #bzip = subprocess.Popen(args, stdout=bzip_dest)
611                 #LOGGER.info('Saving to %s' % (dest_name, ))
612                 #bzip.wait()
613
614     if len(bz_commands) > 0:
615       q = QueueCommands(bz_commands, num_jobs)
616       q.run()
617
618
619 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
620     """
621     Iterate over runfolders in runs extracting the most useful information.
622       * run parameters (in run-*.xml)
623       * eland_result files
624       * score files
625       * Summary.htm
626       * srf files (raw sequence & qualities)
627     """
628     if output_base_dir is None:
629         output_base_dir = os.getcwd()
630
631     for r in runs:
632         result_dir = os.path.join(output_base_dir, r.flowcell_id)
633         LOGGER.info("Using %s as result directory" % (result_dir,))
634         if not os.path.exists(result_dir):
635             os.mkdir(result_dir)
636
637         # create cycle_dir
638         cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
639         LOGGER.info("Filling in %s" % (cycle,))
640         cycle_dir = os.path.join(result_dir, cycle)
641         cycle_dir = os.path.abspath(cycle_dir)
642         if os.path.exists(cycle_dir):
643             LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
644             continue
645         else:
646             os.mkdir(cycle_dir)
647
648         # save run file
649         r.save(cycle_dir)
650
651         # save illumina flowcell status report
652         save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
653                               cycle_dir)
654
655         # save stuff from bustard
656         # grab IVC plot
657         save_ivc_plot(r.bustard, cycle_dir)
658
659         # build base call saving commands
660         if site is not None:
661             save_raw_data(num_jobs, r, site, raw_format, cycle_dir)
662
663         # save stuff from GERALD
664         # copy stuff out of the main run
665         g = r.gerald
666
667         # save summary file
668         save_summary_file(r, cycle_dir)
669
670         # compress eland result files
671         compress_eland_results(g, cycle_dir, num_jobs)
672
673         # md5 all the compressed files once we're done
674         md5_commands = srf.make_md5_commands(cycle_dir)
675         srf.run_commands(cycle_dir, md5_commands, num_jobs)
676
677 def save_raw_data(num_jobs, r, site, raw_format, cycle_dir):
678     lanes = []
679     for lane in r.gerald.lanes:
680         lane_parameters = r.gerald.lanes.get(lane, None)
681         if lane_parameters is not None:
682             lanes.append(lane)
683
684     run_name = srf.pathname_to_run_name(r.pathname)
685     seq_cmds = []
686     if raw_format is None:
687         raw_format = r.bustard.sequence_format
688
689     LOGGER.info("Raw Format is: %s" % (raw_format, ))
690     if raw_format == 'fastq':
691         rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
692         LOGGER.info("raw data = %s" % (rawpath,))
693         srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
694     elif raw_format == 'qseq':
695         seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
696     elif raw_format == 'srf':
697         seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
698     else:
699         raise ValueError('Unknown --raw-format=%s' % (raw_format))
700     srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
701
702 def rm_list(files, dry_run=True):
703     for f in files:
704         if os.path.exists(f):
705             LOGGER.info('deleting %s' % (f,))
706             if not dry_run:
707                 if os.path.isdir(f):
708                     shutil.rmtree(f)
709                 else:
710                     os.unlink(f)
711         else:
712             LOGGER.warn("%s doesn't exist." % (f,))
713
714 def clean_runs(runs, dry_run=True):
715     """
716     Clean up run folders to optimize for compression.
717     """
718     if dry_run:
719         LOGGER.info('In dry-run mode')
720
721     for run in runs:
722         LOGGER.info('Cleaninging %s' % (run.pathname,))
723         # rm RunLog*.xml
724         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
725         rm_list(runlogs, dry_run)
726         # rm pipeline_*.txt
727         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
728         rm_list(pipeline_logs, dry_run)
729         # rm gclog.txt?
730         # rm NetCopy.log? Isn't this robocopy?
731         logs = glob(os.path.join(run.pathname, '*.log'))
732         rm_list(logs, dry_run)
733         # rm nfn.log?
734         # Calibration
735         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
736         rm_list(calibration_dir, dry_run)
737         # rm Images/L*
738         LOGGER.info("Cleaning images")
739         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
740         rm_list(image_dirs, dry_run)
741         # rm ReadPrep
742         LOGGER.info("Cleaning ReadPrep*")
743         read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
744         rm_list(read_prep_dirs, dry_run)
745         # rm ReadPrep
746         LOGGER.info("Cleaning Thubmnail_images")
747         thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
748         rm_list(thumbnail_dirs, dry_run)
749
750         # make clean_intermediate
751         logging.info("Cleaning intermediate files")
752         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
753             clean_process = subprocess.Popen(['make', 'clean_intermediate'],
754                                              cwd=run.image_analysis.pathname,)
755             clean_process.wait()
756
757
758