don't use os.path.normpath when pathname is null in PipelineRun
[htsworkflow.git] / gaworkflow / pipeline / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import stat
9 import subprocess
10 import sys
11 import time
12
13 try:
14   from xml.etree import ElementTree
15 except ImportError, e:
16   from elementtree import ElementTree
17
18 EUROPEAN_STRPTIME = "%d-%m-%Y"
19 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
20 VERSION_RE = "([0-9\.]+)"
21 USER_RE = "([a-zA-Z0-9]+)"
22 LANES_PER_FLOWCELL = 8
23
24 from gaworkflow.util.alphanum import alphanum
25 from gaworkflow.util.ethelp import indent, flatten
26
27
28 class PipelineRun(object):
29     """
30     Capture "interesting" information about a pipeline run
31     """
32     XML_VERSION = 1
33     PIPELINE_RUN = 'PipelineRun'
34     FLOWCELL_ID = 'FlowcellID'
35
36     def __init__(self, pathname=None, firecrest=None, bustard=None, gerald=None, xml=None):
37         if pathname is not None:
38           self.pathname = os.path.normpath(pathname)
39         else:
40           self.pathname = None
41         self._name = None
42         self._flowcell_id = None
43         self.firecrest = firecrest
44         self.bustard = bustard
45         self.gerald = gerald
46
47         if xml is not None:
48           self.set_elements(xml)
49     
50     def _get_flowcell_id(self):
51         # extract flowcell ID
52         if self._flowcell_id is None:
53           config_dir = os.path.join(self.pathname, 'Config')
54           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55           if os.path.exists(flowcell_id_path):
56             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57             self._flowcell_id = flowcell_id_tree.findtext('Text')
58           else:
59             path_fields = self.pathname.split('_')
60             if len(path_fields) > 0:
61               # guessing last element of filename
62               flowcell_id = path_fields[-1]
63             else:
64               flowcell_id = 'unknown'
65               
66             logging.warning(
67               "Flowcell id was not found, guessing %s" % (
68                  flowcell_id))
69             self._flowcell_id = flowcell_id
70         return self._flowcell_id
71     flowcell_id = property(_get_flowcell_id)
72
73     def get_elements(self):
74         """
75         make one master xml file from all of our sub-components.
76         """
77         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79         flowcell.text = self.flowcell_id
80         root.append(self.firecrest.get_elements())
81         root.append(self.bustard.get_elements())
82         root.append(self.gerald.get_elements())
83         return root
84
85     def set_elements(self, tree):
86         # this file gets imported by all the others,
87         # so we need to hide the imports to avoid a cyclic imports
88         from gaworkflow.pipeline import firecrest
89         from gaworkflow.pipeline import bustard
90         from gaworkflow.pipeline import gerald
91
92         tag = tree.tag.lower()
93         if tag != PipelineRun.PIPELINE_RUN.lower():
94           raise ValueError('Pipeline Run Expecting %s got %s' % (
95               PipelineRun.PIPELINE_RUN, tag))
96         for element in tree:
97           tag = element.tag.lower()
98           if tag == PipelineRun.FLOWCELL_ID.lower():
99             self._flowcell_id = element.text
100           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
101           elif tag == firecrest.Firecrest.FIRECREST.lower():
102             self.firecrest = firecrest.Firecrest(xml=element)
103           elif tag == bustard.Bustard.BUSTARD.lower():
104             self.bustard = bustard.Bustard(xml=element)
105           elif tag == gerald.Gerald.GERALD.lower():
106             self.gerald = gerald.Gerald(xml=element)
107           else:
108             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
109
110     def _get_run_name(self):
111         """
112         Given a run tuple, find the latest date and use that as our name
113         """
114         if self._name is None:
115           tmax = max(self.firecrest.time, self.bustard.time, self.gerald.time)
116           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
117           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
118         return self._name
119     name = property(_get_run_name)
120
121     def save(self, destdir=None):
122         if destdir is None:
123             destdir = ''
124         logging.info("Saving run report "+ self.name)
125         xml = self.get_elements()
126         indent(xml)
127         dest_pathname = os.path.join(destdir, self.name)
128         ElementTree.ElementTree(xml).write(dest_pathname)
129
130     def load(self, filename):
131         logging.info("Loading run report from " + filename)
132         tree = ElementTree.parse(filename).getroot()
133         self.set_elements(tree)
134
135 def get_runs(runfolder):
136     """
137     Search through a run folder for all the various sub component runs
138     and then return a PipelineRun for each different combination.
139
140     For example if there are two different GERALD runs, this will
141     generate two different PipelineRun objects, that differ
142     in there gerald component.
143     """
144     from gaworkflow.pipeline import firecrest
145     from gaworkflow.pipeline import bustard
146     from gaworkflow.pipeline import gerald
147
148     datadir = os.path.join(runfolder, 'Data')
149
150     logging.info('Searching for runs in ' + datadir)
151     runs = []
152     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
153         f = firecrest.firecrest(firecrest_pathname)
154         bustard_glob = os.path.join(firecrest_pathname, "Bustard*")
155         for bustard_pathname in glob(bustard_glob):
156             b = bustard.bustard(bustard_pathname)
157             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
158             for gerald_pathname in glob(gerald_glob):
159                 try:
160                     g = gerald.gerald(gerald_pathname)
161                     runs.append(PipelineRun(runfolder, f, b, g))
162                 except IOError, e:
163                     print "Ignoring", str(e)
164     return runs
165                 
166     
167 def extract_run_parameters(runs):
168     """
169     Search through runfolder_path for various runs and grab their parameters
170     """
171     for run in runs:
172       run.save()
173
174 def summarize_mapped_reads(mapped_reads):
175     """
176     Summarize per chromosome reads into a genome count
177     But handle spike-in/contamination symlinks seperately.
178     """
179     summarized_reads = {}
180     genome_reads = 0
181     genome = 'unknown'
182     for k, v in mapped_reads.items():
183         path, k = os.path.split(k)
184         if len(path) > 0:
185             genome = path
186             genome_reads += v
187         else:
188             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
189     summarized_reads[genome] = genome_reads
190     return summarized_reads
191
192 def summary_report(runs):
193     """
194     Summarize cluster numbers and mapped read counts for a runfolder
195     """
196     report = []
197     for run in runs:
198         # print a run name?
199         report.append('Summary for %s' % (run.name,))
200         # sort the report
201         eland_keys = run.gerald.eland_results.results.keys()
202         eland_keys.sort(alphanum)
203
204         lane_results = run.gerald.summary.lane_results
205         for lane_id in eland_keys:
206             result = run.gerald.eland_results.results[lane_id]
207             report.append("Sample name %s" % (result.sample_name))
208             report.append("Lane id %s" % (result.lane_id,))
209             cluster = lane_results[result.lane_id].cluster
210             report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
211             report.append("Total Reads: %d" % (result.reads))
212             mc = result._match_codes
213             report.append("No Match: %d" % (mc['NM']))
214             report.append("QC Failed: %d" % (mc['QC']))
215             report.append('Unique (0,1,2 mismatches) %d %d %d' % \
216                           (mc['U0'], mc['U1'], mc['U2']))
217             report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
218                           (mc['R0'], mc['R1'], mc['R2']))
219             report.append("Mapped Reads")
220             mapped_reads = summarize_mapped_reads(result.mapped_reads)
221             for name, counts in mapped_reads.items():
222               report.append("  %s: %d" % (name, counts))
223             report.append('---')
224             report.append('')
225         return os.linesep.join(report)
226
227 def extract_results(runs, output_base_dir=None):
228     if output_base_dir is None:
229         output_base_dir = os.getcwd()
230
231     for r in runs:
232       result_dir = os.path.join(output_base_dir, r.flowcell_id)
233       logging.info("Using %s as result directory" % (result_dir,))
234       if not os.path.exists(result_dir):
235         os.mkdir(result_dir)
236       
237       # create cycle_dir
238       cycle = "C%d-%d" % (r.firecrest.start, r.firecrest.stop)
239       logging.info("Filling in %s" % (cycle,))
240       cycle_dir = os.path.join(result_dir, cycle)
241       if os.path.exists(cycle_dir):
242         logging.error("%s already exists, not overwriting" % (cycle_dir,))
243         continue
244       else:
245         os.mkdir(cycle_dir)
246
247       # copy stuff out of the main run
248       g = r.gerald
249
250       # save run file
251       r.save(cycle_dir)
252
253       # tar score files
254       score_files = []
255       for f in os.listdir(g.pathname):
256           if re.match('.*_score.txt', f):
257               score_files.append(f)
258
259       tar_cmd = ['/bin/tar', 'c'] + score_files
260       bzip_cmd = [ 'bzip2', '-9', '-c' ]
261       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
262       tar_dest = open(tar_dest_name, 'w')
263       logging.info("Compressing score files in %s" % (g.pathname,))
264       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
265       logging.info("Running bzip2: " + " ".join(bzip_cmd))
266       logging.info("Writing to %s" %(tar_dest_name))
267       
268       tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, cwd=g.pathname)
269       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
270       tar.wait()
271
272       # copy & bzip eland files
273       for eland_lane in g.eland_results.values():
274           source_name = eland_lane.pathname
275           path, name = os.path.split(eland_lane.pathname)
276           dest_name = os.path.join(cycle_dir, name+'.bz2')
277
278           args = ['bzip2', '-9', '-c', source_name]
279           logging.info('Running: %s' % ( " ".join(args) ))
280           bzip_dest = open(dest_name, 'w')
281           bzip = subprocess.Popen(args, stdout=bzip_dest)
282           logging.info('Saving to %s' % (dest_name, ))
283           bzip.wait()
284
285