Add support for extracting data out of Illumina's new RTA runfolder.
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import time
13
14 try:
15   from xml.etree import ElementTree
16 except ImportError, e:
17   from elementtree import ElementTree
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
25
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
28
29 class PipelineRun(object):
30     """
31     Capture "interesting" information about a pipeline run
32     """
33     XML_VERSION = 1
34     PIPELINE_RUN = 'PipelineRun'
35     FLOWCELL_ID = 'FlowcellID'
36
37     def __init__(self, pathname=None, xml=None):
38         if pathname is not None:
39           self.pathname = os.path.normpath(pathname)
40         else:
41           self.pathname = None
42         self._name = None
43         self._flowcell_id = None
44         self.image_analysis = None
45         self.bustard = None
46         self.gerald = None
47
48         if xml is not None:
49           self.set_elements(xml)
50
51     def _get_flowcell_id(self):
52         # extract flowcell ID
53         if self._flowcell_id is None:
54           config_dir = os.path.join(self.pathname, 'Config')
55           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
56           if os.path.exists(flowcell_id_path):
57             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
58             self._flowcell_id = flowcell_id_tree.findtext('Text')
59           else:
60             path_fields = self.pathname.split('_')
61             if len(path_fields) > 0:
62               # guessing last element of filename
63               flowcell_id = path_fields[-1]
64             else:
65               flowcell_id = 'unknown'
66
67             logging.warning(
68               "Flowcell id was not found, guessing %s" % (
69                  flowcell_id))
70             self._flowcell_id = flowcell_id
71         return self._flowcell_id
72     flowcell_id = property(_get_flowcell_id)
73
74     def get_elements(self):
75         """
76         make one master xml file from all of our sub-components.
77         """
78         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
79         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
80         flowcell.text = self.flowcell_id
81         root.append(self.image_analysis.get_elements())
82         root.append(self.bustard.get_elements())
83         root.append(self.gerald.get_elements())
84         return root
85
86     def set_elements(self, tree):
87         # this file gets imported by all the others,
88         # so we need to hide the imports to avoid a cyclic imports
89         from htsworkflow.pipelines import firecrest
90         from htsworkflow.pipelines import ipar
91         from htsworkflow.pipelines import bustard
92         from htsworkflow.pipelines import gerald
93
94         tag = tree.tag.lower()
95         if tag != PipelineRun.PIPELINE_RUN.lower():
96           raise ValueError('Pipeline Run Expecting %s got %s' % (
97               PipelineRun.PIPELINE_RUN, tag))
98         for element in tree:
99           tag = element.tag.lower()
100           if tag == PipelineRun.FLOWCELL_ID.lower():
101             self._flowcell_id = element.text
102           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
103           # you should only have Firecrest or IPAR, never both of them.
104           elif tag == firecrest.Firecrest.FIRECREST.lower():
105             self.image_analysis = firecrest.Firecrest(xml=element)
106           elif tag == ipar.IPAR.IPAR.lower():
107             self.image_analysis = ipar.IPAR(xml=element)
108           elif tag == bustard.Bustard.BUSTARD.lower():
109             self.bustard = bustard.Bustard(xml=element)
110           elif tag == gerald.Gerald.GERALD.lower():
111             self.gerald = gerald.Gerald(xml=element)
112           else:
113             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
114
115     def _get_run_name(self):
116         """
117         Given a run tuple, find the latest date and use that as our name
118         """
119         if self._name is None:
120           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
121           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
122           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
123         return self._name
124     name = property(_get_run_name)
125
126     def save(self, destdir=None):
127         if destdir is None:
128             destdir = ''
129         logging.info("Saving run report "+ self.name)
130         xml = self.get_elements()
131         indent(xml)
132         dest_pathname = os.path.join(destdir, self.name)
133         ElementTree.ElementTree(xml).write(dest_pathname)
134
135     def load(self, filename):
136         logging.info("Loading run report from " + filename)
137         tree = ElementTree.parse(filename).getroot()
138         self.set_elements(tree)
139
140 def load_pipeline_run_xml(pathname):
141     """
142     Load and instantiate a Pipeline run from a run xml file
143
144     :Parameters: 
145       - `pathname` : location of an run xml file
146
147     :Returns: initialized PipelineRun object
148     """
149     tree = ElementTree.parse(pathname).getroot()
150     run = PipelineRun(xml=tree)
151     return run
152
153 def get_runs(runfolder):
154     """
155     Search through a run folder for all the various sub component runs
156     and then return a PipelineRun for each different combination.
157
158     For example if there are two different GERALD runs, this will
159     generate two different PipelineRun objects, that differ
160     in there gerald component.
161     """
162     from htsworkflow.pipelines import firecrest
163     from htsworkflow.pipelines import ipar
164     from htsworkflow.pipelines import bustard
165     from htsworkflow.pipelines import gerald
166
167     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
168         logging.info("Looking for bustard directories in %s" % (pathname,))
169         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
170         # RTA BaseCalls looks enough like Bustard.
171         bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
172         for bustard_pathname in bustard_dirs:
173             logging.info("Found bustard directory %s" % (bustard_pathname,))
174             b = bustard.bustard(bustard_pathname)
175             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
176             logging.info("Looking for gerald directories in %s" % (pathname,))
177             for gerald_pathname in glob(gerald_glob):
178                 logging.info("Found gerald directory %s" % (gerald_pathname,))
179                 try:
180                     g = gerald.gerald(gerald_pathname)
181                     p = PipelineRun(runfolder)
182                     p.image_analysis = image_analysis
183                     p.bustard = b
184                     p.gerald = g
185                     runs.append(p)
186                 except IOError, e:
187                     logging.error("Ignoring " + str(e))
188
189     datadir = os.path.join(runfolder, 'Data')
190
191     logging.info('Searching for runs in ' + datadir)
192     runs = []
193     # scan for firecrest directories
194     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
195         logging.info('Found firecrest in ' + datadir)
196         image_analysis = firecrest.firecrest(firecrest_pathname)
197         if image_analysis is None:
198             logging.warn(
199                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
200             )
201         else:
202             scan_post_image_analysis(
203                 runs, runfolder, image_analysis, firecrest_pathname
204             )
205     # scan for IPAR directories
206     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
207     # The Intensities directory from the RTA software looks a lot like IPAR
208     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
209     for ipar_pathname in ipar_dirs:
210         logging.info('Found ipar directories in ' + datadir)
211         image_analysis = ipar.ipar(ipar_pathname)
212         if image_analysis is None:
213             logging.warn(
214                 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
215             )
216         else:
217             scan_post_image_analysis(
218                 runs, runfolder, image_analysis, ipar_pathname
219             )
220
221     return runs
222
223 def get_specific_run(gerald_dir):
224     """
225     Given a gerald directory, construct a PipelineRun out of its parents
226
227     Basically this allows specifying a particular run instead of the previous
228     get_runs which scans a runfolder for various combinations of
229     firecrest/ipar/bustard/gerald runs.
230     """
231     from htsworkflow.pipelines import firecrest
232     from htsworkflow.pipelines import ipar
233     from htsworkflow.pipelines import bustard
234     from htsworkflow.pipelines import gerald
235
236     gerald_dir = os.path.expanduser(gerald_dir)
237     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
238     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
239
240     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
241    
242     logging.info('--- use-run detected options ---')
243     logging.info('runfolder: %s' % (runfolder_dir,))
244     logging.info('image_dir: %s' % (image_dir,))
245     logging.info('bustard_dir: %s' % (bustard_dir,))
246     logging.info('gerald_dir: %s' % (gerald_dir,))
247
248     # find our processed image dir
249     image_run = None
250     # split into parent, and leaf directory
251     # leaf directory should be an IPAR or firecrest directory
252     data_dir, short_image_dir = os.path.split(image_dir)
253     logging.info('data_dir: %s' % (data_dir,))
254     logging.info('short_iamge_dir: %s' %(short_image_dir,))
255
256     # guess which type of image processing directory we have by looking
257     # in the leaf directory name
258     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
259         image_run = firecrest.firecrest(image_dir)
260     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
261         image_run = ipar.ipar(image_dir)
262     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
263         image_run = ipar.ipar(image_dir)
264
265     # if we din't find a run, report the error and return 
266     if image_run is None:
267         msg = '%s does not contain an image processing step' % (image_dir,)
268         logging.error(msg)
269         return None
270
271     # find our base calling
272     base_calling_run = bustard.bustard(bustard_dir)
273     if base_calling_run is None:
274         logging.error('%s does not contain a bustard run' % (bustard_dir,))
275         return None
276
277     # find alignments
278     gerald_run = gerald.gerald(gerald_dir)
279     if gerald_run is None:
280         logging.error('%s does not contain a gerald run' % (gerald_dir,))
281         return None
282
283     p = PipelineRun(runfolder_dir)
284     p.image_analysis = image_run
285     p.bustard = base_calling_run
286     p.gerald = gerald_run
287     
288     logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
289     return p
290
291 def extract_run_parameters(runs):
292     """
293     Search through runfolder_path for various runs and grab their parameters
294     """
295     for run in runs:
296       run.save()
297
298 def summarize_mapped_reads(genome_map, mapped_reads):
299     """
300     Summarize per chromosome reads into a genome count
301     But handle spike-in/contamination symlinks seperately.
302     """
303     summarized_reads = {}
304     genome_reads = 0
305     genome = 'unknown'
306     for k, v in mapped_reads.items():
307         path, k = os.path.split(k)
308         if len(path) > 0 and not genome_map.has_key(path):
309             genome = path
310             genome_reads += v
311         else:
312             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
313     summarized_reads[genome] = genome_reads
314     return summarized_reads
315
316 def summarize_lane(gerald, lane_id):
317     report = []
318     summary_results = gerald.summary.lane_results
319     for end in range(len(summary_results)):  
320       eland_result = gerald.eland_results.results[end][lane_id]
321       report.append("Sample name %s" % (eland_result.sample_name))
322       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
323       cluster = summary_results[end][eland_result.lane_id].cluster
324       report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
325       report.append("Total Reads: %d" % (eland_result.reads))
326
327       if hasattr(eland_result, 'match_codes'):
328           mc = eland_result.match_codes
329           nm = mc['NM']
330           nm_percent = float(nm)/eland_result.reads  * 100
331           qc = mc['QC']
332           qc_percent = float(qc)/eland_result.reads * 100
333
334           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
335           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
336           report.append('Unique (0,1,2 mismatches) %d %d %d' % \
337                         (mc['U0'], mc['U1'], mc['U2']))
338           report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
339                         (mc['R0'], mc['R1'], mc['R2']))
340
341       if hasattr(eland_result, 'genome_map'):
342           report.append("Mapped Reads")
343           mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
344           for name, counts in mapped_reads.items():
345             report.append("  %s: %d" % (name, counts))
346
347       report.append('')
348     return report
349
350 def summary_report(runs):
351     """
352     Summarize cluster numbers and mapped read counts for a runfolder
353     """
354     report = []
355     for run in runs:
356         # print a run name?
357         report.append('Summary for %s' % (run.name,))
358         # sort the report
359         eland_keys = run.gerald.eland_results.results[0].keys()
360         eland_keys.sort(alphanum)
361
362         for lane_id in eland_keys:
363             report.extend(summarize_lane(run.gerald, lane_id))
364             report.append('---')
365             report.append('')
366         return os.linesep.join(report)
367
368 def is_compressed(filename):
369     if os.path.splitext(filename)[1] == ".gz":
370         return True
371     elif os.path.splitext(filename)[1] == '.bz2':
372         return True
373     else:
374         return False
375
376 def extract_results(runs, output_base_dir=None):
377     if output_base_dir is None:
378         output_base_dir = os.getcwd()
379
380     for r in runs:
381       result_dir = os.path.join(output_base_dir, r.flowcell_id)
382       logging.info("Using %s as result directory" % (result_dir,))
383       if not os.path.exists(result_dir):
384         os.mkdir(result_dir)
385
386       # create cycle_dir
387       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
388       logging.info("Filling in %s" % (cycle,))
389       cycle_dir = os.path.join(result_dir, cycle)
390       if os.path.exists(cycle_dir):
391         logging.error("%s already exists, not overwriting" % (cycle_dir,))
392         continue
393       else:
394         os.mkdir(cycle_dir)
395
396       # copy stuff out of the main run
397       g = r.gerald
398
399       # save run file
400       r.save(cycle_dir)
401
402       # Copy Summary.htm
403       summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
404       if os.path.exists(summary_path):
405           logging.info('Copying %s to %s' % (summary_path, cycle_dir))
406           shutil.copy(summary_path, cycle_dir)
407       else:
408           logging.info('Summary file %s was not found' % (summary_path,))
409
410       # tar score files
411       score_files = []
412
413       # check for g.pathname/Temp a new feature of 1.1rc1
414       scores_path = g.pathname
415       scores_path_temp = os.path.join(scores_path, 'Temp')
416       if os.path.isdir(scores_path_temp):
417           scores_path = scores_path_temp
418
419       # hopefully we have a directory that contains s_*_score files
420       for f in os.listdir(scores_path):
421           if re.match('.*_score.txt', f):
422               score_files.append(f)
423
424       tar_cmd = ['/bin/tar', 'c'] + score_files
425       bzip_cmd = [ 'bzip2', '-9', '-c' ]
426       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
427       tar_dest = open(tar_dest_name, 'w')
428       logging.info("Compressing score files from %s" % (scores_path,))
429       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
430       logging.info("Running bzip2: " + " ".join(bzip_cmd))
431       logging.info("Writing to %s" %(tar_dest_name))
432
433       env = {'BZIP': '-9'}
434       tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
435                              cwd=scores_path)
436       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
437       tar.wait()
438
439       # copy & bzip eland files
440       for lanes_dictionary in g.eland_results.results:
441           for eland_lane in lanes_dictionary.values():
442               source_name = eland_lane.pathname
443               path, name = os.path.split(eland_lane.pathname)
444               dest_name = os.path.join(cycle_dir, name)
445               logging.info("Saving eland file %s to %s" % \
446                            (source_name, dest_name))
447
448               if is_compressed(name):
449                 logging.info('Already compressed, Saving to %s' % (dest_name, ))
450                 shutil.copy(source_name, dest_name)
451               else:
452                 # not compressed
453                 dest_name += '.bz2'
454                 args = ['bzip2', '-9', '-c', source_name]
455                 logging.info('Running: %s' % ( " ".join(args) ))
456                 bzip_dest = open(dest_name, 'w')
457                 bzip = subprocess.Popen(args, stdout=bzip_dest)
458                 logging.info('Saving to %s' % (dest_name, ))
459                 bzip.wait()
460
461 def rm_list(files, dry_run=True):
462     for f in files:
463         if os.path.exists(f):
464             logging.info('deleting %s' % (f,))
465             if not dry_run:
466                 if os.path.isdir(f):
467                     shutil.rmtree(f)
468                 else:
469                     os.unlink(f)
470         else:
471             logging.warn("%s doesn't exist."% (f,))
472
473 def clean_runs(runs, dry_run=True):
474     """
475     Clean up run folders to optimize for compression.
476     """
477     if dry_run:
478         logging.info('In dry-run mode')
479
480     for run in runs:
481         logging.info('Cleaninging %s' % (run.pathname,))
482         # rm RunLog*.xml
483         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
484         rm_list(runlogs, dry_run)
485         # rm pipeline_*.txt
486         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
487         rm_list(pipeline_logs, dry_run)
488         # rm gclog.txt?
489         # rm NetCopy.log? Isn't this robocopy?
490         logs = glob(os.path.join(run.pathname, '*.log'))
491         rm_list(logs, dry_run)
492         # rm nfn.log?
493         # Calibration
494         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
495         rm_list(calibration_dir, dry_run)
496         # rm Images/L*
497         logging.info("Cleaning images")
498         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
499         rm_list(image_dirs, dry_run)
500         # cd Data/C1-*_Firecrest*
501         logging.info("Cleaning intermediate files")
502         # make clean_intermediate
503         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
504             clean_process = subprocess.Popen(['make', 'clean_intermediate'], 
505                                              cwd=run.image_analysis.pathname,)
506             clean_process.wait()
507
508
509