Try to make runfolder results extraction more robust
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import time
13
14 try:
15   from xml.etree import ElementTree
16 except ImportError, e:
17   from elementtree import ElementTree
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
27
28 class PipelineRun(object):
29     """
30     Capture "interesting" information about a pipeline run
31     """
32     XML_VERSION = 1
33     PIPELINE_RUN = 'PipelineRun'
34     FLOWCELL_ID = 'FlowcellID'
35
36     def __init__(self, pathname=None, xml=None):
37         if pathname is not None:
38           self.pathname = os.path.normpath(pathname)
39         else:
40           self.pathname = None
41         self._name = None
42         self._flowcell_id = None
43         self.image_analysis = None
44         self.bustard = None
45         self.gerald = None
46
47         if xml is not None:
48           self.set_elements(xml)
49
50     def _get_flowcell_id(self):
51         # extract flowcell ID
52         if self._flowcell_id is None:
53           config_dir = os.path.join(self.pathname, 'Config')
54           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55           if os.path.exists(flowcell_id_path):
56             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57             self._flowcell_id = flowcell_id_tree.findtext('Text')
58           else:
59             path_fields = self.pathname.split('_')
60             if len(path_fields) > 0:
61               # guessing last element of filename
62               flowcell_id = path_fields[-1]
63             else:
64               flowcell_id = 'unknown'
65
66             logging.warning(
67               "Flowcell id was not found, guessing %s" % (
68                  flowcell_id))
69             self._flowcell_id = flowcell_id
70         return self._flowcell_id
71     flowcell_id = property(_get_flowcell_id)
72
73     def get_elements(self):
74         """
75         make one master xml file from all of our sub-components.
76         """
77         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79         flowcell.text = self.flowcell_id
80         root.append(self.image_analysis.get_elements())
81         root.append(self.bustard.get_elements())
82         root.append(self.gerald.get_elements())
83         return root
84
85     def set_elements(self, tree):
86         # this file gets imported by all the others,
87         # so we need to hide the imports to avoid a cyclic imports
88         from htsworkflow.pipelines import firecrest
89         from htsworkflow.pipelines import ipar
90         from htsworkflow.pipelines import bustard
91         from htsworkflow.pipelines import gerald
92
93         tag = tree.tag.lower()
94         if tag != PipelineRun.PIPELINE_RUN.lower():
95           raise ValueError('Pipeline Run Expecting %s got %s' % (
96               PipelineRun.PIPELINE_RUN, tag))
97         for element in tree:
98           tag = element.tag.lower()
99           if tag == PipelineRun.FLOWCELL_ID.lower():
100             self._flowcell_id = element.text
101           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102           # you should only have Firecrest or IPAR, never both of them.
103           elif tag == firecrest.Firecrest.FIRECREST.lower():
104             self.image_analysis = firecrest.Firecrest(xml=element)
105           elif tag == ipar.IPAR.IPAR.lower():
106             self.image_analysis = ipar.IPAR(xml=element)
107           elif tag == bustard.Bustard.BUSTARD.lower():
108             self.bustard = bustard.Bustard(xml=element)
109           elif tag == gerald.Gerald.GERALD.lower():
110             self.gerald = gerald.Gerald(xml=element)
111           else:
112             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
113
114     def _get_run_name(self):
115         """
116         Given a run tuple, find the latest date and use that as our name
117         """
118         if self._name is None:
119           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
120           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
121           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
122         return self._name
123     name = property(_get_run_name)
124
125     def save(self, destdir=None):
126         if destdir is None:
127             destdir = ''
128         logging.info("Saving run report "+ self.name)
129         xml = self.get_elements()
130         indent(xml)
131         dest_pathname = os.path.join(destdir, self.name)
132         ElementTree.ElementTree(xml).write(dest_pathname)
133
134     def load(self, filename):
135         logging.info("Loading run report from " + filename)
136         tree = ElementTree.parse(filename).getroot()
137         self.set_elements(tree)
138
139 def get_runs(runfolder):
140     """
141     Search through a run folder for all the various sub component runs
142     and then return a PipelineRun for each different combination.
143
144     For example if there are two different GERALD runs, this will
145     generate two different PipelineRun objects, that differ
146     in there gerald component.
147     """
148     from htsworkflow.pipelines import firecrest
149     from htsworkflow.pipelines import ipar
150     from htsworkflow.pipelines import bustard
151     from htsworkflow.pipelines import gerald
152
153     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
154         logging.info("Looking for bustard directories in %s" % (pathname,))
155         bustard_glob = os.path.join(pathname, "Bustard*")
156         for bustard_pathname in glob(bustard_glob):
157             logging.info("Found bustard directory %s" % (bustard_pathname,))
158             b = bustard.bustard(bustard_pathname)
159             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
160             logging.info("Looking for gerald directories in %s" % (pathname,))
161             for gerald_pathname in glob(gerald_glob):
162                 logging.info("Found gerald directory %s" % (gerald_pathname,))
163                 try:
164                     g = gerald.gerald(gerald_pathname)
165                     p = PipelineRun(runfolder)
166                     p.image_analysis = image_analysis
167                     p.bustard = b
168                     p.gerald = g
169                     runs.append(p)
170                 except IOError, e:
171                     print "Ignoring", str(e)
172
173     datadir = os.path.join(runfolder, 'Data')
174
175     logging.info('Searching for runs in ' + datadir)
176     runs = []
177     # scan for firecrest directories
178     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
179         logging.info('Found firecrest in ' + datadir)
180         image_analysis = firecrest.firecrest(firecrest_pathname)
181         if image_analysis is None:
182             logging.warn(
183                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
184             )
185         else:
186             scan_post_image_analysis(
187                 runs, runfolder, image_analysis, firecrest_pathname
188             )
189     # scan for IPAR directories
190     for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
191         logging.info('Found ipar directories in ' + datadir)
192         image_analysis = ipar.ipar(ipar_pathname)
193         if image_analysis is None:
194             logging.warn(
195                 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
196             )
197         else:
198             scan_post_image_analysis(
199                 runs, runfolder, image_analysis, ipar_pathname
200             )
201
202     return runs
203
204
205 def extract_run_parameters(runs):
206     """
207     Search through runfolder_path for various runs and grab their parameters
208     """
209     for run in runs:
210       run.save()
211
212 def summarize_mapped_reads(mapped_reads):
213     """
214     Summarize per chromosome reads into a genome count
215     But handle spike-in/contamination symlinks seperately.
216     """
217     summarized_reads = {}
218     genome_reads = 0
219     genome = 'unknown'
220     for k, v in mapped_reads.items():
221         path, k = os.path.split(k)
222         if len(path) > 0:
223             genome = path
224             genome_reads += v
225         else:
226             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
227     summarized_reads[genome] = genome_reads
228     return summarized_reads
229
230 def summarize_lane(gerald, lane_id):
231     report = []
232     summary_results = gerald.summary.lane_results
233     for end in range(len(summary_results)):  
234       eland_result = gerald.eland_results.results[end][lane_id]
235       report.append("Sample name %s" % (eland_result.sample_name))
236       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
237       cluster = summary_results[end][eland_result.lane_id].cluster
238       report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
239       report.append("Total Reads: %d" % (eland_result.reads))
240       mc = eland_result._match_codes
241       nm = mc['NM']
242       nm_percent = float(nm)/eland_result.reads  * 100
243       qc = mc['QC']
244       qc_percent = float(qc)/eland_result.reads * 100
245
246       report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
247       report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
248       report.append('Unique (0,1,2 mismatches) %d %d %d' % \
249                     (mc['U0'], mc['U1'], mc['U2']))
250       report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
251                     (mc['R0'], mc['R1'], mc['R2']))
252       report.append("Mapped Reads")
253       mapped_reads = summarize_mapped_reads(eland_result.mapped_reads)
254       for name, counts in mapped_reads.items():
255         report.append("  %s: %d" % (name, counts))
256       report.append('')
257     return report
258
259 def summary_report(runs):
260     """
261     Summarize cluster numbers and mapped read counts for a runfolder
262     """
263     report = []
264     for run in runs:
265         # print a run name?
266         report.append('Summary for %s' % (run.name,))
267         # sort the report
268         eland_keys = run.gerald.eland_results.results[0].keys()
269         eland_keys.sort(alphanum)
270
271         for lane_id in eland_keys:
272             report.extend(summarize_lane(run.gerald, lane_id))
273             report.append('---')
274             report.append('')
275         return os.linesep.join(report)
276
277 def is_compressed(filename):
278     if os.path.splitext(filename)[1] == ".gz":
279         return True
280     elif os.path.splitext(filename)[1] == '.bz2':
281         return True
282     else:
283         return False
284
285 def extract_results(runs, output_base_dir=None):
286     if output_base_dir is None:
287         output_base_dir = os.getcwd()
288
289     for r in runs:
290       result_dir = os.path.join(output_base_dir, r.flowcell_id)
291       logging.info("Using %s as result directory" % (result_dir,))
292       if not os.path.exists(result_dir):
293         os.mkdir(result_dir)
294
295       # create cycle_dir
296       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
297       logging.info("Filling in %s" % (cycle,))
298       cycle_dir = os.path.join(result_dir, cycle)
299       if os.path.exists(cycle_dir):
300         logging.error("%s already exists, not overwriting" % (cycle_dir,))
301         continue
302       else:
303         os.mkdir(cycle_dir)
304
305       # copy stuff out of the main run
306       g = r.gerald
307
308       # save run file
309       r.save(cycle_dir)
310
311       # Copy Summary.htm
312       summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
313       if os.path.exists(summary_path):
314           logging.info('Copying %s to %s' % (summary_path, cycle_dir))
315           shutil.copy(summary_path, cycle_dir)
316       else:
317           logging.info('Summary file %s was not found' % (summary_path,))
318
319       # tar score files
320       score_files = []
321
322       # check for g.pathname/Temp a new feature of 1.1rc1
323       scores_path = g.pathname
324       scores_path_temp = os.path.join(scores_path, 'Temp')
325       if os.path.isdir(scores_path_temp):
326           scores_path = scores_path_temp
327
328       # hopefully we have a directory that contains s_*_score files
329       for f in os.listdir(scores_path):
330           if re.match('.*_score.txt', f):
331               score_files.append(f)
332
333       tar_cmd = ['/bin/tar', 'c'] + score_files
334       bzip_cmd = [ 'bzip2', '-9', '-c' ]
335       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
336       tar_dest = open(tar_dest_name, 'w')
337       logging.info("Compressing score files from %s" % (scores_path,))
338       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
339       logging.info("Running bzip2: " + " ".join(bzip_cmd))
340       logging.info("Writing to %s" %(tar_dest_name))
341
342       tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, 
343                              cwd=scores_path)
344       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
345       tar.wait()
346
347       # copy & bzip eland files
348       for lanes_dictionary in g.eland_results.results:
349           for eland_lane in lanes_dictionary.values():
350               source_name = eland_lane.pathname
351               path, name = os.path.split(eland_lane.pathname)
352               dest_name = os.path.join(cycle_dir, name)
353               logging.info("Saving eland file %s to %s" % \
354                            (source_name, dest_name))
355
356               if is_compressed(name):
357                 logging.info('Already compressed, Saving to %s' % (dest_name, ))
358                 shutil.copy(source_name, dest_name)
359               else:
360                 # not compressed
361                 dest_name += '.bz2'
362                 args = ['bzip2', '-9', '-c', source_name]
363                 logging.info('Running: %s' % ( " ".join(args) ))
364                 bzip_dest = open(dest_name, 'w')
365                 bzip = subprocess.Popen(args, stdout=bzip_dest)
366                 logging.info('Saving to %s' % (dest_name, ))
367                 bzip.wait()
368
369 def clean_runs(runs):
370     """
371     Clean up run folders to optimize for compression.
372     """
373     # TODO: implement this.
374     # rm RunLog*.xml
375     # rm pipeline_*.txt
376     # rm gclog.txt
377     # rm NetCopy.log
378     # rm nfn.log
379     # rm Images/L*
380     # cd Data/C1-*_Firecrest*
381     # make clean_intermediate
382
383     pass