Don't crash if there are no lane result summary entries when rendering
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import time
13
14 try:
15   from xml.etree import ElementTree
16 except ImportError, e:
17   from elementtree import ElementTree
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
25
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
28
29 class PipelineRun(object):
30     """
31     Capture "interesting" information about a pipeline run
32     """
33     XML_VERSION = 1
34     PIPELINE_RUN = 'PipelineRun'
35     FLOWCELL_ID = 'FlowcellID'
36
37     def __init__(self, pathname=None, xml=None):
38         if pathname is not None:
39           self.pathname = os.path.normpath(pathname)
40         else:
41           self.pathname = None
42         self._name = None
43         self._flowcell_id = None
44         self.image_analysis = None
45         self.bustard = None
46         self.gerald = None
47
48         if xml is not None:
49           self.set_elements(xml)
50
51     def _get_flowcell_id(self):
52         # extract flowcell ID
53         if self._flowcell_id is None:
54           config_dir = os.path.join(self.pathname, 'Config')
55           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
56           if os.path.exists(flowcell_id_path):
57             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
58             self._flowcell_id = flowcell_id_tree.findtext('Text')
59           else:
60             path_fields = self.pathname.split('_')
61             if len(path_fields) > 0:
62               # guessing last element of filename
63               flowcell_id = path_fields[-1]
64             else:
65               flowcell_id = 'unknown'
66
67             logging.warning(
68               "Flowcell id was not found, guessing %s" % (
69                  flowcell_id))
70             self._flowcell_id = flowcell_id
71         return self._flowcell_id
72     flowcell_id = property(_get_flowcell_id)
73
74     def get_elements(self):
75         """
76         make one master xml file from all of our sub-components.
77         """
78         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
79         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
80         flowcell.text = self.flowcell_id
81         root.append(self.image_analysis.get_elements())
82         root.append(self.bustard.get_elements())
83         root.append(self.gerald.get_elements())
84         return root
85
86     def set_elements(self, tree):
87         # this file gets imported by all the others,
88         # so we need to hide the imports to avoid a cyclic imports
89         from htsworkflow.pipelines import firecrest
90         from htsworkflow.pipelines import ipar
91         from htsworkflow.pipelines import bustard
92         from htsworkflow.pipelines import gerald
93
94         tag = tree.tag.lower()
95         if tag != PipelineRun.PIPELINE_RUN.lower():
96           raise ValueError('Pipeline Run Expecting %s got %s' % (
97               PipelineRun.PIPELINE_RUN, tag))
98         for element in tree:
99           tag = element.tag.lower()
100           if tag == PipelineRun.FLOWCELL_ID.lower():
101             self._flowcell_id = element.text
102           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
103           # you should only have Firecrest or IPAR, never both of them.
104           elif tag == firecrest.Firecrest.FIRECREST.lower():
105             self.image_analysis = firecrest.Firecrest(xml=element)
106           elif tag == ipar.IPAR.IPAR.lower():
107             self.image_analysis = ipar.IPAR(xml=element)
108           elif tag == bustard.Bustard.BUSTARD.lower():
109             self.bustard = bustard.Bustard(xml=element)
110           elif tag == gerald.Gerald.GERALD.lower():
111             self.gerald = gerald.Gerald(xml=element)
112           else:
113             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
114
115     def _get_run_name(self):
116         """
117         Given a run tuple, find the latest date and use that as our name
118         """
119         if self._name is None:
120           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
121           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
122           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
123         return self._name
124     name = property(_get_run_name)
125
126     def save(self, destdir=None):
127         if destdir is None:
128             destdir = ''
129         logging.info("Saving run report "+ self.name)
130         xml = self.get_elements()
131         indent(xml)
132         dest_pathname = os.path.join(destdir, self.name)
133         ElementTree.ElementTree(xml).write(dest_pathname)
134
135     def load(self, filename):
136         logging.info("Loading run report from " + filename)
137         tree = ElementTree.parse(filename).getroot()
138         self.set_elements(tree)
139
140 def load_pipeline_run_xml(pathname):
141     """
142     Load and instantiate a Pipeline run from a run xml file
143
144     :Parameters: 
145       - `pathname` : location of an run xml file
146
147     :Returns: initialized PipelineRun object
148     """
149     tree = ElementTree.parse(pathname).getroot()
150     run = PipelineRun(xml=tree)
151     return run
152
153 def get_runs(runfolder):
154     """
155     Search through a run folder for all the various sub component runs
156     and then return a PipelineRun for each different combination.
157
158     For example if there are two different GERALD runs, this will
159     generate two different PipelineRun objects, that differ
160     in there gerald component.
161     """
162     from htsworkflow.pipelines import firecrest
163     from htsworkflow.pipelines import ipar
164     from htsworkflow.pipelines import bustard
165     from htsworkflow.pipelines import gerald
166
167     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
168         logging.info("Looking for bustard directories in %s" % (pathname,))
169         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
170         # RTA BaseCalls looks enough like Bustard.
171         bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
172         for bustard_pathname in bustard_dirs:
173             logging.info("Found bustard directory %s" % (bustard_pathname,))
174             b = bustard.bustard(bustard_pathname)
175             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
176             logging.info("Looking for gerald directories in %s" % (pathname,))
177             for gerald_pathname in glob(gerald_glob):
178                 logging.info("Found gerald directory %s" % (gerald_pathname,))
179                 try:
180                     g = gerald.gerald(gerald_pathname)
181                     p = PipelineRun(runfolder)
182                     p.image_analysis = image_analysis
183                     p.bustard = b
184                     p.gerald = g
185                     runs.append(p)
186                 except IOError, e:
187                     logging.error("Ignoring " + str(e))
188
189     datadir = os.path.join(runfolder, 'Data')
190
191     logging.info('Searching for runs in ' + datadir)
192     runs = []
193     # scan for firecrest directories
194     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
195         logging.info('Found firecrest in ' + datadir)
196         image_analysis = firecrest.firecrest(firecrest_pathname)
197         if image_analysis is None:
198             logging.warn(
199                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
200             )
201         else:
202             scan_post_image_analysis(
203                 runs, runfolder, image_analysis, firecrest_pathname
204             )
205     # scan for IPAR directories
206     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
207     # The Intensities directory from the RTA software looks a lot like IPAR
208     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
209     for ipar_pathname in ipar_dirs:
210         logging.info('Found ipar directories in ' + datadir)
211         image_analysis = ipar.ipar(ipar_pathname)
212         if image_analysis is None:
213             logging.warn(
214                 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
215             )
216         else:
217             scan_post_image_analysis(
218                 runs, runfolder, image_analysis, ipar_pathname
219             )
220
221     return runs
222
223 def get_specific_run(gerald_dir):
224     """
225     Given a gerald directory, construct a PipelineRun out of its parents
226
227     Basically this allows specifying a particular run instead of the previous
228     get_runs which scans a runfolder for various combinations of
229     firecrest/ipar/bustard/gerald runs.
230     """
231     from htsworkflow.pipelines import firecrest
232     from htsworkflow.pipelines import ipar
233     from htsworkflow.pipelines import bustard
234     from htsworkflow.pipelines import gerald
235
236     gerald_dir = os.path.expanduser(gerald_dir)
237     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
238     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
239
240     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
241    
242     logging.info('--- use-run detected options ---')
243     logging.info('runfolder: %s' % (runfolder_dir,))
244     logging.info('image_dir: %s' % (image_dir,))
245     logging.info('bustard_dir: %s' % (bustard_dir,))
246     logging.info('gerald_dir: %s' % (gerald_dir,))
247
248     # find our processed image dir
249     image_run = None
250     # split into parent, and leaf directory
251     # leaf directory should be an IPAR or firecrest directory
252     data_dir, short_image_dir = os.path.split(image_dir)
253     logging.info('data_dir: %s' % (data_dir,))
254     logging.info('short_iamge_dir: %s' %(short_image_dir,))
255
256     # guess which type of image processing directory we have by looking
257     # in the leaf directory name
258     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
259         image_run = firecrest.firecrest(image_dir)
260     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
261         image_run = ipar.ipar(image_dir)
262     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
263         image_run = ipar.ipar(image_dir)
264
265     # if we din't find a run, report the error and return 
266     if image_run is None:
267         msg = '%s does not contain an image processing step' % (image_dir,)
268         logging.error(msg)
269         return None
270
271     # find our base calling
272     base_calling_run = bustard.bustard(bustard_dir)
273     if base_calling_run is None:
274         logging.error('%s does not contain a bustard run' % (bustard_dir,))
275         return None
276
277     # find alignments
278     gerald_run = gerald.gerald(gerald_dir)
279     if gerald_run is None:
280         logging.error('%s does not contain a gerald run' % (gerald_dir,))
281         return None
282
283     p = PipelineRun(runfolder_dir)
284     p.image_analysis = image_run
285     p.bustard = base_calling_run
286     p.gerald = gerald_run
287     
288     logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
289     return p
290
291 def extract_run_parameters(runs):
292     """
293     Search through runfolder_path for various runs and grab their parameters
294     """
295     for run in runs:
296       run.save()
297
298 def summarize_mapped_reads(genome_map, mapped_reads):
299     """
300     Summarize per chromosome reads into a genome count
301     But handle spike-in/contamination symlinks seperately.
302     """
303     summarized_reads = {}
304     genome_reads = 0
305     genome = 'unknown'
306     for k, v in mapped_reads.items():
307         path, k = os.path.split(k)
308         if len(path) > 0 and not genome_map.has_key(path):
309             genome = path
310             genome_reads += v
311         else:
312             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
313     summarized_reads[genome] = genome_reads
314     return summarized_reads
315
316 def summarize_lane(gerald, lane_id):
317     report = []
318     summary_results = gerald.summary.lane_results
319     for end in range(len(summary_results)):  
320       eland_result = gerald.eland_results.results[end][lane_id]
321       report.append("Sample name %s" % (eland_result.sample_name))
322       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
323       if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
324           cluster = summary_results[end][eland_result.lane_id].cluster
325           report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
326       report.append("Total Reads: %d" % (eland_result.reads))
327
328       if hasattr(eland_result, 'match_codes'):
329           mc = eland_result.match_codes
330           nm = mc['NM']
331           nm_percent = float(nm)/eland_result.reads  * 100
332           qc = mc['QC']
333           qc_percent = float(qc)/eland_result.reads * 100
334
335           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
336           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
337           report.append('Unique (0,1,2 mismatches) %d %d %d' % \
338                         (mc['U0'], mc['U1'], mc['U2']))
339           report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
340                         (mc['R0'], mc['R1'], mc['R2']))
341
342       if hasattr(eland_result, 'genome_map'):
343           report.append("Mapped Reads")
344           mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
345           for name, counts in mapped_reads.items():
346             report.append("  %s: %d" % (name, counts))
347
348       report.append('')
349     return report
350
351 def summary_report(runs):
352     """
353     Summarize cluster numbers and mapped read counts for a runfolder
354     """
355     report = []
356     for run in runs:
357         # print a run name?
358         report.append('Summary for %s' % (run.name,))
359         # sort the report
360         eland_keys = run.gerald.eland_results.results[0].keys()
361         eland_keys.sort(alphanum)
362
363         for lane_id in eland_keys:
364             report.extend(summarize_lane(run.gerald, lane_id))
365             report.append('---')
366             report.append('')
367         return os.linesep.join(report)
368
369 def is_compressed(filename):
370     if os.path.splitext(filename)[1] == ".gz":
371         return True
372     elif os.path.splitext(filename)[1] == '.bz2':
373         return True
374     else:
375         return False
376
377 def extract_results(runs, output_base_dir=None):
378     if output_base_dir is None:
379         output_base_dir = os.getcwd()
380
381     for r in runs:
382       result_dir = os.path.join(output_base_dir, r.flowcell_id)
383       logging.info("Using %s as result directory" % (result_dir,))
384       if not os.path.exists(result_dir):
385         os.mkdir(result_dir)
386
387       # create cycle_dir
388       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
389       logging.info("Filling in %s" % (cycle,))
390       cycle_dir = os.path.join(result_dir, cycle)
391       if os.path.exists(cycle_dir):
392         logging.error("%s already exists, not overwriting" % (cycle_dir,))
393         continue
394       else:
395         os.mkdir(cycle_dir)
396
397       # copy stuff out of the main run
398       g = r.gerald
399
400       # save run file
401       r.save(cycle_dir)
402
403       # Copy Summary.htm
404       summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
405       if os.path.exists(summary_path):
406           logging.info('Copying %s to %s' % (summary_path, cycle_dir))
407           shutil.copy(summary_path, cycle_dir)
408       else:
409           logging.info('Summary file %s was not found' % (summary_path,))
410
411       # tar score files
412       score_files = []
413
414       # check for g.pathname/Temp a new feature of 1.1rc1
415       scores_path = g.pathname
416       scores_path_temp = os.path.join(scores_path, 'Temp')
417       if os.path.isdir(scores_path_temp):
418           scores_path = scores_path_temp
419
420       # hopefully we have a directory that contains s_*_score files
421       for f in os.listdir(scores_path):
422           if re.match('.*_score.txt', f):
423               score_files.append(f)
424
425       tar_cmd = ['/bin/tar', 'c'] + score_files
426       bzip_cmd = [ 'bzip2', '-9', '-c' ]
427       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
428       tar_dest = open(tar_dest_name, 'w')
429       logging.info("Compressing score files from %s" % (scores_path,))
430       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
431       logging.info("Running bzip2: " + " ".join(bzip_cmd))
432       logging.info("Writing to %s" %(tar_dest_name))
433
434       env = {'BZIP': '-9'}
435       tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
436                              cwd=scores_path)
437       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
438       tar.wait()
439
440       # copy & bzip eland files
441       for lanes_dictionary in g.eland_results.results:
442           for eland_lane in lanes_dictionary.values():
443               source_name = eland_lane.pathname
444               path, name = os.path.split(eland_lane.pathname)
445               dest_name = os.path.join(cycle_dir, name)
446               logging.info("Saving eland file %s to %s" % \
447                            (source_name, dest_name))
448
449               if is_compressed(name):
450                 logging.info('Already compressed, Saving to %s' % (dest_name, ))
451                 shutil.copy(source_name, dest_name)
452               else:
453                 # not compressed
454                 dest_name += '.bz2'
455                 args = ['bzip2', '-9', '-c', source_name]
456                 logging.info('Running: %s' % ( " ".join(args) ))
457                 bzip_dest = open(dest_name, 'w')
458                 bzip = subprocess.Popen(args, stdout=bzip_dest)
459                 logging.info('Saving to %s' % (dest_name, ))
460                 bzip.wait()
461
462 def rm_list(files, dry_run=True):
463     for f in files:
464         if os.path.exists(f):
465             logging.info('deleting %s' % (f,))
466             if not dry_run:
467                 if os.path.isdir(f):
468                     shutil.rmtree(f)
469                 else:
470                     os.unlink(f)
471         else:
472             logging.warn("%s doesn't exist."% (f,))
473
474 def clean_runs(runs, dry_run=True):
475     """
476     Clean up run folders to optimize for compression.
477     """
478     if dry_run:
479         logging.info('In dry-run mode')
480
481     for run in runs:
482         logging.info('Cleaninging %s' % (run.pathname,))
483         # rm RunLog*.xml
484         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
485         rm_list(runlogs, dry_run)
486         # rm pipeline_*.txt
487         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
488         rm_list(pipeline_logs, dry_run)
489         # rm gclog.txt?
490         # rm NetCopy.log? Isn't this robocopy?
491         logs = glob(os.path.join(run.pathname, '*.log'))
492         rm_list(logs, dry_run)
493         # rm nfn.log?
494         # Calibration
495         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
496         rm_list(calibration_dir, dry_run)
497         # rm Images/L*
498         logging.info("Cleaning images")
499         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
500         rm_list(image_dirs, dry_run)
501         # cd Data/C1-*_Firecrest*
502         logging.info("Cleaning intermediate files")
503         # make clean_intermediate
504         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
505             clean_process = subprocess.Popen(['make', 'clean_intermediate'], 
506                                              cwd=run.image_analysis.pathname,)
507             clean_process.wait()
508
509
510