For pipeline 1.1rc1 or 1.3.2, look for the matrix files in the bustard dir
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import time
13
14 try:
15   from xml.etree import ElementTree
16 except ImportError, e:
17   from elementtree import ElementTree
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
27
28 class PipelineRun(object):
29     """
30     Capture "interesting" information about a pipeline run
31     """
32     XML_VERSION = 1
33     PIPELINE_RUN = 'PipelineRun'
34     FLOWCELL_ID = 'FlowcellID'
35
36     def __init__(self, pathname=None, xml=None):
37         if pathname is not None:
38           self.pathname = os.path.normpath(pathname)
39         else:
40           self.pathname = None
41         self._name = None
42         self._flowcell_id = None
43         self.image_analysis = None
44         self.bustard = None
45         self.gerald = None
46
47         if xml is not None:
48           self.set_elements(xml)
49
50     def _get_flowcell_id(self):
51         # extract flowcell ID
52         if self._flowcell_id is None:
53           config_dir = os.path.join(self.pathname, 'Config')
54           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55           if os.path.exists(flowcell_id_path):
56             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57             self._flowcell_id = flowcell_id_tree.findtext('Text')
58           else:
59             path_fields = self.pathname.split('_')
60             if len(path_fields) > 0:
61               # guessing last element of filename
62               flowcell_id = path_fields[-1]
63             else:
64               flowcell_id = 'unknown'
65
66             logging.warning(
67               "Flowcell id was not found, guessing %s" % (
68                  flowcell_id))
69             self._flowcell_id = flowcell_id
70         return self._flowcell_id
71     flowcell_id = property(_get_flowcell_id)
72
73     def get_elements(self):
74         """
75         make one master xml file from all of our sub-components.
76         """
77         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79         flowcell.text = self.flowcell_id
80         root.append(self.image_analysis.get_elements())
81         root.append(self.bustard.get_elements())
82         root.append(self.gerald.get_elements())
83         return root
84
85     def set_elements(self, tree):
86         # this file gets imported by all the others,
87         # so we need to hide the imports to avoid a cyclic imports
88         from htsworkflow.pipelines import firecrest
89         from htsworkflow.pipelines import ipar
90         from htsworkflow.pipelines import bustard
91         from htsworkflow.pipelines import gerald
92
93         tag = tree.tag.lower()
94         if tag != PipelineRun.PIPELINE_RUN.lower():
95           raise ValueError('Pipeline Run Expecting %s got %s' % (
96               PipelineRun.PIPELINE_RUN, tag))
97         for element in tree:
98           tag = element.tag.lower()
99           if tag == PipelineRun.FLOWCELL_ID.lower():
100             self._flowcell_id = element.text
101           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102           # you should only have Firecrest or IPAR, never both of them.
103           elif tag == firecrest.Firecrest.FIRECREST.lower():
104             self.image_analysis = firecrest.Firecrest(xml=element)
105           elif tag == ipar.IPAR.IPAR.lower():
106             self.image_analysis = ipar.IPAR(xml=element)
107           elif tag == bustard.Bustard.BUSTARD.lower():
108             self.bustard = bustard.Bustard(xml=element)
109           elif tag == gerald.Gerald.GERALD.lower():
110             self.gerald = gerald.Gerald(xml=element)
111           else:
112             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
113
114     def _get_run_name(self):
115         """
116         Given a run tuple, find the latest date and use that as our name
117         """
118         if self._name is None:
119           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
120           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
121           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
122         return self._name
123     name = property(_get_run_name)
124
125     def save(self, destdir=None):
126         if destdir is None:
127             destdir = ''
128         logging.info("Saving run report "+ self.name)
129         xml = self.get_elements()
130         indent(xml)
131         dest_pathname = os.path.join(destdir, self.name)
132         ElementTree.ElementTree(xml).write(dest_pathname)
133
134     def load(self, filename):
135         logging.info("Loading run report from " + filename)
136         tree = ElementTree.parse(filename).getroot()
137         self.set_elements(tree)
138
139 def load_pipeline_run_xml(pathname):
140     """
141     Load and instantiate a Pipeline run from a run xml file
142
143     :Parameters: 
144       - `pathname` : location of an run xml file
145
146     :Returns: initialized PipelineRun object
147     """
148     tree = ElementTree.parse(pathname).getroot()
149     run = PipelineRun(xml=tree)
150     return run
151
152 def get_runs(runfolder):
153     """
154     Search through a run folder for all the various sub component runs
155     and then return a PipelineRun for each different combination.
156
157     For example if there are two different GERALD runs, this will
158     generate two different PipelineRun objects, that differ
159     in there gerald component.
160     """
161     from htsworkflow.pipelines import firecrest
162     from htsworkflow.pipelines import ipar
163     from htsworkflow.pipelines import bustard
164     from htsworkflow.pipelines import gerald
165
166     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
167         logging.info("Looking for bustard directories in %s" % (pathname,))
168         bustard_glob = os.path.join(pathname, "Bustard*")
169         for bustard_pathname in glob(bustard_glob):
170             logging.info("Found bustard directory %s" % (bustard_pathname,))
171             b = bustard.bustard(bustard_pathname)
172             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
173             logging.info("Looking for gerald directories in %s" % (pathname,))
174             for gerald_pathname in glob(gerald_glob):
175                 logging.info("Found gerald directory %s" % (gerald_pathname,))
176                 try:
177                     g = gerald.gerald(gerald_pathname)
178                     p = PipelineRun(runfolder)
179                     p.image_analysis = image_analysis
180                     p.bustard = b
181                     p.gerald = g
182                     runs.append(p)
183                 except IOError, e:
184                     logging.error("Ignoring " + str(e))
185
186     datadir = os.path.join(runfolder, 'Data')
187
188     logging.info('Searching for runs in ' + datadir)
189     runs = []
190     # scan for firecrest directories
191     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
192         logging.info('Found firecrest in ' + datadir)
193         image_analysis = firecrest.firecrest(firecrest_pathname)
194         if image_analysis is None:
195             logging.warn(
196                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
197             )
198         else:
199             scan_post_image_analysis(
200                 runs, runfolder, image_analysis, firecrest_pathname
201             )
202     # scan for IPAR directories
203     for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
204         logging.info('Found ipar directories in ' + datadir)
205         image_analysis = ipar.ipar(ipar_pathname)
206         if image_analysis is None:
207             logging.warn(
208                 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
209             )
210         else:
211             scan_post_image_analysis(
212                 runs, runfolder, image_analysis, ipar_pathname
213             )
214
215     return runs
216
217
218 def extract_run_parameters(runs):
219     """
220     Search through runfolder_path for various runs and grab their parameters
221     """
222     for run in runs:
223       run.save()
224
225 def summarize_mapped_reads(genome_map, mapped_reads):
226     """
227     Summarize per chromosome reads into a genome count
228     But handle spike-in/contamination symlinks seperately.
229     """
230     summarized_reads = {}
231     genome_reads = 0
232     genome = 'unknown'
233     for k, v in mapped_reads.items():
234         path, k = os.path.split(k)
235         if len(path) > 0 and not genome_map.has_key(path):
236             genome = path
237             genome_reads += v
238         else:
239             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
240     summarized_reads[genome] = genome_reads
241     return summarized_reads
242
243 def summarize_lane(gerald, lane_id):
244     report = []
245     summary_results = gerald.summary.lane_results
246     for end in range(len(summary_results)):  
247       eland_result = gerald.eland_results.results[end][lane_id]
248       report.append("Sample name %s" % (eland_result.sample_name))
249       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
250       cluster = summary_results[end][eland_result.lane_id].cluster
251       report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
252       report.append("Total Reads: %d" % (eland_result.reads))
253       mc = eland_result._match_codes
254       nm = mc['NM']
255       nm_percent = float(nm)/eland_result.reads  * 100
256       qc = mc['QC']
257       qc_percent = float(qc)/eland_result.reads * 100
258
259       report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
260       report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
261       report.append('Unique (0,1,2 mismatches) %d %d %d' % \
262                     (mc['U0'], mc['U1'], mc['U2']))
263       report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
264                     (mc['R0'], mc['R1'], mc['R2']))
265       report.append("Mapped Reads")
266       mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
267       for name, counts in mapped_reads.items():
268         report.append("  %s: %d" % (name, counts))
269       report.append('')
270     return report
271
272 def summary_report(runs):
273     """
274     Summarize cluster numbers and mapped read counts for a runfolder
275     """
276     report = []
277     for run in runs:
278         # print a run name?
279         report.append('Summary for %s' % (run.name,))
280         # sort the report
281         eland_keys = run.gerald.eland_results.results[0].keys()
282         eland_keys.sort(alphanum)
283
284         for lane_id in eland_keys:
285             report.extend(summarize_lane(run.gerald, lane_id))
286             report.append('---')
287             report.append('')
288         return os.linesep.join(report)
289
290 def is_compressed(filename):
291     if os.path.splitext(filename)[1] == ".gz":
292         return True
293     elif os.path.splitext(filename)[1] == '.bz2':
294         return True
295     else:
296         return False
297
298 def extract_results(runs, output_base_dir=None):
299     if output_base_dir is None:
300         output_base_dir = os.getcwd()
301
302     for r in runs:
303       result_dir = os.path.join(output_base_dir, r.flowcell_id)
304       logging.info("Using %s as result directory" % (result_dir,))
305       if not os.path.exists(result_dir):
306         os.mkdir(result_dir)
307
308       # create cycle_dir
309       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
310       logging.info("Filling in %s" % (cycle,))
311       cycle_dir = os.path.join(result_dir, cycle)
312       if os.path.exists(cycle_dir):
313         logging.error("%s already exists, not overwriting" % (cycle_dir,))
314         continue
315       else:
316         os.mkdir(cycle_dir)
317
318       # copy stuff out of the main run
319       g = r.gerald
320
321       # save run file
322       r.save(cycle_dir)
323
324       return
325
326       # Copy Summary.htm
327       summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
328       if os.path.exists(summary_path):
329           logging.info('Copying %s to %s' % (summary_path, cycle_dir))
330           shutil.copy(summary_path, cycle_dir)
331       else:
332           logging.info('Summary file %s was not found' % (summary_path,))
333
334       # tar score files
335       score_files = []
336
337       # check for g.pathname/Temp a new feature of 1.1rc1
338       scores_path = g.pathname
339       scores_path_temp = os.path.join(scores_path, 'Temp')
340       if os.path.isdir(scores_path_temp):
341           scores_path = scores_path_temp
342
343       # hopefully we have a directory that contains s_*_score files
344       for f in os.listdir(scores_path):
345           if re.match('.*_score.txt', f):
346               score_files.append(f)
347
348       tar_cmd = ['/bin/tar', 'c'] + score_files
349       bzip_cmd = [ 'bzip2', '-9', '-c' ]
350       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
351       tar_dest = open(tar_dest_name, 'w')
352       logging.info("Compressing score files from %s" % (scores_path,))
353       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
354       logging.info("Running bzip2: " + " ".join(bzip_cmd))
355       logging.info("Writing to %s" %(tar_dest_name))
356
357       tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, 
358                              cwd=scores_path)
359       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
360       tar.wait()
361
362       # copy & bzip eland files
363       for lanes_dictionary in g.eland_results.results:
364           for eland_lane in lanes_dictionary.values():
365               source_name = eland_lane.pathname
366               path, name = os.path.split(eland_lane.pathname)
367               dest_name = os.path.join(cycle_dir, name)
368               logging.info("Saving eland file %s to %s" % \
369                            (source_name, dest_name))
370
371               if is_compressed(name):
372                 logging.info('Already compressed, Saving to %s' % (dest_name, ))
373                 shutil.copy(source_name, dest_name)
374               else:
375                 # not compressed
376                 dest_name += '.bz2'
377                 args = ['bzip2', '-9', '-c', source_name]
378                 logging.info('Running: %s' % ( " ".join(args) ))
379                 bzip_dest = open(dest_name, 'w')
380                 bzip = subprocess.Popen(args, stdout=bzip_dest)
381                 logging.info('Saving to %s' % (dest_name, ))
382                 bzip.wait()
383
384 def clean_runs(runs):
385     """
386     Clean up run folders to optimize for compression.
387     """
388     # TODO: implement this.
389     # rm RunLog*.xml
390     # rm pipeline_*.txt
391     # rm gclog.txt
392     # rm NetCopy.log
393     # rm nfn.log
394     # rm Images/L*
395     # cd Data/C1-*_Firecrest*
396     # make clean_intermediate
397
398     pass