25d933110877ad3bfd853bfdf1e3456f020f90ee
[htsworkflow.git] / gaworkflow / pipeline / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import stat
9 import subprocess
10 import sys
11 import time
12
13 try:
14   from xml.etree import ElementTree
15 except ImportError, e:
16   from elementtree import ElementTree
17
18 EUROPEAN_STRPTIME = "%d-%m-%Y"
19 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
20 VERSION_RE = "([0-9\.]+)"
21 USER_RE = "([a-zA-Z0-9]+)"
22 LANES_PER_FLOWCELL = 8
23
24 from gaworkflow.util.alphanum import alphanum
25 from gaworkflow.util.ethelp import indent, flatten
26
27
28 class PipelineRun(object):
29     """
30     Capture "interesting" information about a pipeline run
31     """
32     XML_VERSION = 1
33     PIPELINE_RUN = 'PipelineRun'
34     FLOWCELL_ID = 'FlowcellID'
35
36     def __init__(self, pathname=None, firecrest=None, bustard=None, gerald=None, xml=None):
37         self.pathname = os.path.normpath(pathname)
38         self._name = None
39         self._flowcell_id = None
40         self.firecrest = firecrest
41         self.bustard = bustard
42         self.gerald = gerald
43
44         if xml is not None:
45           self.set_elements(xml)
46     
47     def _get_flowcell_id(self):
48         # extract flowcell ID
49         if self._flowcell_id is None:
50           config_dir = os.path.join(self.pathname, 'Config')
51           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
52           if os.path.exists(flowcell_id_path):
53             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
54             self._flowcell_id = flowcell_id_tree.findtext('Text')
55           else:
56             path_fields = self.pathname.split('_')
57             if len(path_fields) > 0:
58               # guessing last element of filename
59               flowcell_id = path_fields[-1]
60             else:
61               flowcell_id = 'unknown'
62               
63             logging.warning(
64               "Flowcell id was not found, guessing %s" % (
65                  flowcell_id))
66             self._flowcell_id = flowcell_id
67         return self._flowcell_id
68     flowcell_id = property(_get_flowcell_id)
69
70     def get_elements(self):
71         """
72         make one master xml file from all of our sub-components.
73         """
74         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
75         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
76         flowcell.text = self.flowcell_id
77         root.append(self.firecrest.get_elements())
78         root.append(self.bustard.get_elements())
79         root.append(self.gerald.get_elements())
80         return root
81
82     def set_elements(self, tree):
83         # this file gets imported by all the others,
84         # so we need to hide the imports to avoid a cyclic imports
85         from gaworkflow.pipeline import firecrest
86         from gaworkflow.pipeline import bustard
87         from gaworkflow.pipeline import gerald
88
89         tag = tree.tag.lower()
90         if tag != PipelineRun.PIPELINE_RUN.lower():
91           raise ValueError('Pipeline Run Expecting %s got %s' % (
92               PipelineRun.PIPELINE_RUN, tag))
93         for element in tree:
94           tag = element.tag.lower()
95           if tag == PipelineRun.FLOWCELL_ID.lower():
96             self._flowcell_id = element.text
97           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
98           elif tag == firecrest.Firecrest.FIRECREST.lower():
99             self.firecrest = firecrest.Firecrest(xml=element)
100           elif tag == bustard.Bustard.BUSTARD.lower():
101             self.bustard = bustard.Bustard(xml=element)
102           elif tag == gerald.Gerald.GERALD.lower():
103             self.gerald = gerald.Gerald(xml=element)
104           else:
105             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
106
107     def _get_run_name(self):
108         """
109         Given a run tuple, find the latest date and use that as our name
110         """
111         if self._name is None:
112           tmax = max(self.firecrest.time, self.bustard.time, self.gerald.time)
113           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
114           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
115         return self._name
116     name = property(_get_run_name)
117
118     def save(self, destdir=None):
119         if destdir is None:
120             destdir = ''
121         logging.info("Saving run report "+ self.name)
122         xml = self.get_elements()
123         indent(xml)
124         dest_pathname = os.path.join(destdir, self.name)
125         ElementTree.ElementTree(xml).write(dest_pathname)
126
127     def load(self, filename):
128         logging.info("Loading run report from " + filename)
129         tree = ElementTree.parse(filename).getroot()
130         self.set_elements(tree)
131
132 def get_runs(runfolder):
133     """
134     Search through a run folder for all the various sub component runs
135     and then return a PipelineRun for each different combination.
136
137     For example if there are two different GERALD runs, this will
138     generate two different PipelineRun objects, that differ
139     in there gerald component.
140     """
141     from gaworkflow.pipeline import firecrest
142     from gaworkflow.pipeline import bustard
143     from gaworkflow.pipeline import gerald
144
145     datadir = os.path.join(runfolder, 'Data')
146
147     logging.info('Searching for runs in ' + datadir)
148     runs = []
149     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
150         f = firecrest.firecrest(firecrest_pathname)
151         bustard_glob = os.path.join(firecrest_pathname, "Bustard*")
152         for bustard_pathname in glob(bustard_glob):
153             b = bustard.bustard(bustard_pathname)
154             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
155             for gerald_pathname in glob(gerald_glob):
156                 try:
157                     g = gerald.gerald(gerald_pathname)
158                     runs.append(PipelineRun(runfolder, f, b, g))
159                 except IOError, e:
160                     print "Ignoring", str(e)
161     return runs
162                 
163     
164 def extract_run_parameters(runs):
165     """
166     Search through runfolder_path for various runs and grab their parameters
167     """
168     for run in runs:
169       run.save()
170
171 def summarize_mapped_reads(mapped_reads):
172     """
173     Summarize per chromosome reads into a genome count
174     But handle spike-in/contamination symlinks seperately.
175     """
176     summarized_reads = {}
177     genome_reads = 0
178     genome = 'unknown'
179     for k, v in mapped_reads.items():
180         path, k = os.path.split(k)
181         if len(path) > 0:
182             genome = path
183             genome_reads += v
184         else:
185             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
186     summarized_reads[genome] = genome_reads
187     return summarized_reads
188
189 def summary_report(runs):
190     """
191     Summarize cluster numbers and mapped read counts for a runfolder
192     """
193     report = []
194     for run in runs:
195         # print a run name?
196         report.append('Summary for %s' % (run.name,))
197         # sort the report
198         eland_keys = run.gerald.eland_results.results.keys()
199         eland_keys.sort(alphanum)
200
201         lane_results = run.gerald.summary.lane_results
202         for lane_id in eland_keys:
203             result = run.gerald.eland_results.results[lane_id]
204             report.append("Sample name %s" % (result.sample_name))
205             report.append("Lane id %s" % (result.lane_id,))
206             cluster = lane_results[result.lane_id].cluster
207             report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
208             report.append("Total Reads: %d" % (result.reads))
209             mc = result._match_codes
210             report.append("No Match: %d" % (mc['NM']))
211             report.append("QC Failed: %d" % (mc['QC']))
212             report.append('Unique (0,1,2 mismatches) %d %d %d' % \
213                           (mc['U0'], mc['U1'], mc['U2']))
214             report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
215                           (mc['R0'], mc['R1'], mc['R2']))
216             report.append("Mapped Reads")
217             mapped_reads = summarize_mapped_reads(result.mapped_reads)
218             for name, counts in mapped_reads.items():
219               report.append("  %s: %d" % (name, counts))
220             report.append('---')
221             report.append('')
222         return os.linesep.join(report)
223
224 def extract_results(runs, output_base_dir=None):
225     if output_base_dir is None:
226         output_base_dir = os.getcwd()
227
228     for r in runs:
229       result_dir = os.path.join(output_base_dir, r.flowcell_id)
230       logging.info("Using %s as result directory" % (result_dir,))
231       if not os.path.exists(result_dir):
232         os.mkdir(result_dir)
233       
234       # create cycle_dir
235       cycle = "C%d-%d" % (r.firecrest.start, r.firecrest.stop)
236       logging.info("Filling in %s" % (cycle,))
237       cycle_dir = os.path.join(result_dir, cycle)
238       if os.path.exists(cycle_dir):
239         logging.error("%s already exists, not overwriting" % (cycle_dir,))
240         continue
241       else:
242         os.mkdir(cycle_dir)
243
244       # copy stuff out of the main run
245       g = r.gerald
246
247       # save run file
248       r.save(cycle_dir)
249
250       # tar score files
251       score_files = []
252       for f in os.listdir(g.pathname):
253           if re.match('.*_score.txt', f):
254               score_files.append(f)
255
256       tar_cmd = ['/bin/tar', 'c'] + score_files
257       bzip_cmd = [ 'bzip2', '-9', '-c' ]
258       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
259       tar_dest = open(tar_dest_name, 'w')
260       logging.info("Compressing score files in %s" % (g.pathname,))
261       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
262       logging.info("Running bzip2: " + " ".join(bzip_cmd))
263       logging.info("Writing to %s" %(tar_dest_name))
264       
265       tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, cwd=g.pathname)
266       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
267       tar.wait()
268
269       # copy & bzip eland files
270       for eland_lane in g.eland_results.values():
271           source_name = eland_lane.pathname
272           path, name = os.path.split(eland_lane.pathname)
273           dest_name = os.path.join(cycle_dir, name+'.bz2')
274
275           args = ['bzip2', '-9', '-c', source_name]
276           logging.info('Running: %s' % ( " ".join(args) ))
277           bzip_dest = open(dest_name, 'w')
278           bzip = subprocess.Popen(args, stdout=bzip_dest)
279           logging.info('Saving to %s' % (dest_name, ))
280           bzip.wait()
281
282