rename pipeline to pipelines to imply that we can process more than just illumina.
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import time
13
14 try:
15   from xml.etree import ElementTree
16 except ImportError, e:
17   from elementtree import ElementTree
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
27
28
29 class PipelineRun(object):
30     """
31     Capture "interesting" information about a pipeline run
32     """
33     XML_VERSION = 1
34     PIPELINE_RUN = 'PipelineRun'
35     FLOWCELL_ID = 'FlowcellID'
36
37     def __init__(self, pathname=None, firecrest=None, bustard=None, gerald=None, xml=None):
38         if pathname is not None:
39           self.pathname = os.path.normpath(pathname)
40         else:
41           self.pathname = None
42         self._name = None
43         self._flowcell_id = None
44         self.firecrest = firecrest
45         self.bustard = bustard
46         self.gerald = gerald
47
48         if xml is not None:
49           self.set_elements(xml)
50     
51     def _get_flowcell_id(self):
52         # extract flowcell ID
53         if self._flowcell_id is None:
54           config_dir = os.path.join(self.pathname, 'Config')
55           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
56           if os.path.exists(flowcell_id_path):
57             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
58             self._flowcell_id = flowcell_id_tree.findtext('Text')
59           else:
60             path_fields = self.pathname.split('_')
61             if len(path_fields) > 0:
62               # guessing last element of filename
63               flowcell_id = path_fields[-1]
64             else:
65               flowcell_id = 'unknown'
66               
67             logging.warning(
68               "Flowcell id was not found, guessing %s" % (
69                  flowcell_id))
70             self._flowcell_id = flowcell_id
71         return self._flowcell_id
72     flowcell_id = property(_get_flowcell_id)
73
74     def get_elements(self):
75         """
76         make one master xml file from all of our sub-components.
77         """
78         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
79         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
80         flowcell.text = self.flowcell_id
81         root.append(self.firecrest.get_elements())
82         root.append(self.bustard.get_elements())
83         root.append(self.gerald.get_elements())
84         return root
85
86     def set_elements(self, tree):
87         # this file gets imported by all the others,
88         # so we need to hide the imports to avoid a cyclic imports
89         from htsworkflow.pipelines import firecrest
90         from htsworkflow.pipelines import bustard
91         from htsworkflow.pipelines import gerald
92
93         tag = tree.tag.lower()
94         if tag != PipelineRun.PIPELINE_RUN.lower():
95           raise ValueError('Pipeline Run Expecting %s got %s' % (
96               PipelineRun.PIPELINE_RUN, tag))
97         for element in tree:
98           tag = element.tag.lower()
99           if tag == PipelineRun.FLOWCELL_ID.lower():
100             self._flowcell_id = element.text
101           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102           elif tag == firecrest.Firecrest.FIRECREST.lower():
103             self.firecrest = firecrest.Firecrest(xml=element)
104           elif tag == bustard.Bustard.BUSTARD.lower():
105             self.bustard = bustard.Bustard(xml=element)
106           elif tag == gerald.Gerald.GERALD.lower():
107             self.gerald = gerald.Gerald(xml=element)
108           else:
109             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
110
111     def _get_run_name(self):
112         """
113         Given a run tuple, find the latest date and use that as our name
114         """
115         if self._name is None:
116           tmax = max(self.firecrest.time, self.bustard.time, self.gerald.time)
117           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
118           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
119         return self._name
120     name = property(_get_run_name)
121
122     def save(self, destdir=None):
123         if destdir is None:
124             destdir = ''
125         logging.info("Saving run report "+ self.name)
126         xml = self.get_elements()
127         indent(xml)
128         dest_pathname = os.path.join(destdir, self.name)
129         ElementTree.ElementTree(xml).write(dest_pathname)
130
131     def load(self, filename):
132         logging.info("Loading run report from " + filename)
133         tree = ElementTree.parse(filename).getroot()
134         self.set_elements(tree)
135
136 def get_runs(runfolder):
137     """
138     Search through a run folder for all the various sub component runs
139     and then return a PipelineRun for each different combination.
140
141     For example if there are two different GERALD runs, this will
142     generate two different PipelineRun objects, that differ
143     in there gerald component.
144     """
145     from htsworkflow.pipelines import firecrest
146     from htsworkflow.pipelines import bustard
147     from htsworkflow.pipelines import gerald
148
149     datadir = os.path.join(runfolder, 'Data')
150
151     logging.info('Searching for runs in ' + datadir)
152     runs = []
153     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
154         f = firecrest.firecrest(firecrest_pathname)
155         bustard_glob = os.path.join(firecrest_pathname, "Bustard*")
156         for bustard_pathname in glob(bustard_glob):
157             b = bustard.bustard(bustard_pathname)
158             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
159             for gerald_pathname in glob(gerald_glob):
160                 try:
161                     g = gerald.gerald(gerald_pathname)
162                     runs.append(PipelineRun(runfolder, f, b, g))
163                 except IOError, e:
164                     print "Ignoring", str(e)
165     return runs
166                 
167     
168 def extract_run_parameters(runs):
169     """
170     Search through runfolder_path for various runs and grab their parameters
171     """
172     for run in runs:
173       run.save()
174
175 def summarize_mapped_reads(mapped_reads):
176     """
177     Summarize per chromosome reads into a genome count
178     But handle spike-in/contamination symlinks seperately.
179     """
180     summarized_reads = {}
181     genome_reads = 0
182     genome = 'unknown'
183     for k, v in mapped_reads.items():
184         path, k = os.path.split(k)
185         if len(path) > 0:
186             genome = path
187             genome_reads += v
188         else:
189             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
190     summarized_reads[genome] = genome_reads
191     return summarized_reads
192
193 def summary_report(runs):
194     """
195     Summarize cluster numbers and mapped read counts for a runfolder
196     """
197     report = []
198     for run in runs:
199         # print a run name?
200         report.append('Summary for %s' % (run.name,))
201         # sort the report
202         eland_keys = run.gerald.eland_results.results.keys()
203         eland_keys.sort(alphanum)
204
205         lane_results = run.gerald.summary.lane_results
206         for lane_id in eland_keys:
207             result = run.gerald.eland_results.results[lane_id]
208             report.append("Sample name %s" % (result.sample_name))
209             report.append("Lane id %s" % (result.lane_id,))
210             cluster = lane_results[result.lane_id].cluster
211             report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
212             report.append("Total Reads: %d" % (result.reads))
213             mc = result._match_codes
214             nm = mc['NM']
215             nm_percent = float(nm)/result.reads  * 100
216             qc = mc['QC']
217             qc_percent = float(qc)/result.reads * 100
218
219             report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
220             report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
221             report.append('Unique (0,1,2 mismatches) %d %d %d' % \
222                           (mc['U0'], mc['U1'], mc['U2']))
223             report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
224                           (mc['R0'], mc['R1'], mc['R2']))
225             report.append("Mapped Reads")
226             mapped_reads = summarize_mapped_reads(result.mapped_reads)
227             for name, counts in mapped_reads.items():
228               report.append("  %s: %d" % (name, counts))
229             report.append('---')
230             report.append('')
231         return os.linesep.join(report)
232
233 def extract_results(runs, output_base_dir=None):
234     if output_base_dir is None:
235         output_base_dir = os.getcwd()
236
237     for r in runs:
238       result_dir = os.path.join(output_base_dir, r.flowcell_id)
239       logging.info("Using %s as result directory" % (result_dir,))
240       if not os.path.exists(result_dir):
241         os.mkdir(result_dir)
242       
243       # create cycle_dir
244       cycle = "C%d-%d" % (r.firecrest.start, r.firecrest.stop)
245       logging.info("Filling in %s" % (cycle,))
246       cycle_dir = os.path.join(result_dir, cycle)
247       if os.path.exists(cycle_dir):
248         logging.error("%s already exists, not overwriting" % (cycle_dir,))
249         continue
250       else:
251         os.mkdir(cycle_dir)
252
253       # copy stuff out of the main run
254       g = r.gerald
255
256       # save run file
257       r.save(cycle_dir)
258
259       # Copy Summary.htm
260       summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
261       if os.path.exists(summary_path):
262           logging.info('Copying %s to %s' % (summary_path, cycle_dir))
263           shutil.copy(summary_path, cycle_dir)
264       else:
265           logging.info('Summary file %s was not found' % (summary_path,))
266
267       # tar score files
268       score_files = []
269       for f in os.listdir(g.pathname):
270           if re.match('.*_score.txt', f):
271               score_files.append(f)
272
273       tar_cmd = ['/bin/tar', 'c'] + score_files
274       bzip_cmd = [ 'bzip2', '-9', '-c' ]
275       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
276       tar_dest = open(tar_dest_name, 'w')
277       logging.info("Compressing score files in %s" % (g.pathname,))
278       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
279       logging.info("Running bzip2: " + " ".join(bzip_cmd))
280       logging.info("Writing to %s" %(tar_dest_name))
281       
282       tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, cwd=g.pathname)
283       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
284       tar.wait()
285
286       # copy & bzip eland files
287       for eland_lane in g.eland_results.values():
288           source_name = eland_lane.pathname
289           path, name = os.path.split(eland_lane.pathname)
290           dest_name = os.path.join(cycle_dir, name+'.bz2')
291
292           args = ['bzip2', '-9', '-c', source_name]
293           logging.info('Running: %s' % ( " ".join(args) ))
294           bzip_dest = open(dest_name, 'w')
295           bzip = subprocess.Popen(args, stdout=bzip_dest)
296           logging.info('Saving to %s' % (dest_name, ))
297           bzip.wait()
298
299 def clean_runs(runs):
300     """
301     Clean up run folders to optimize for compression.
302     """
303     # TODO: implement this.
304     # rm RunLog*.xml
305     # rm pipeline_*.txt
306     # rm gclog.txt
307     # rm NetCopy.log
308     # rm nfn.log
309     # rm Images/L*
310     # cd Data/C1-*_Firecrest*
311     # make clean_intermediate
312
313     pass