Look in Temp directories for some of the files we have historically
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import time
13
14 try:
15   from xml.etree import ElementTree
16 except ImportError, e:
17   from elementtree import ElementTree
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
27
28 class PipelineRun(object):
29     """
30     Capture "interesting" information about a pipeline run
31     """
32     XML_VERSION = 1
33     PIPELINE_RUN = 'PipelineRun'
34     FLOWCELL_ID = 'FlowcellID'
35
36     def __init__(self, pathname=None, xml=None):
37         if pathname is not None:
38           self.pathname = os.path.normpath(pathname)
39         else:
40           self.pathname = None
41         self._name = None
42         self._flowcell_id = None
43         self.image_analysis = None
44         self.bustard = None
45         self.gerald = None
46
47         if xml is not None:
48           self.set_elements(xml)
49
50     def _get_flowcell_id(self):
51         # extract flowcell ID
52         if self._flowcell_id is None:
53           config_dir = os.path.join(self.pathname, 'Config')
54           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55           if os.path.exists(flowcell_id_path):
56             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57             self._flowcell_id = flowcell_id_tree.findtext('Text')
58           else:
59             path_fields = self.pathname.split('_')
60             if len(path_fields) > 0:
61               # guessing last element of filename
62               flowcell_id = path_fields[-1]
63             else:
64               flowcell_id = 'unknown'
65
66             logging.warning(
67               "Flowcell id was not found, guessing %s" % (
68                  flowcell_id))
69             self._flowcell_id = flowcell_id
70         return self._flowcell_id
71     flowcell_id = property(_get_flowcell_id)
72
73     def get_elements(self):
74         """
75         make one master xml file from all of our sub-components.
76         """
77         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79         flowcell.text = self.flowcell_id
80         root.append(self.image_analysis.get_elements())
81         root.append(self.bustard.get_elements())
82         root.append(self.gerald.get_elements())
83         return root
84
85     def set_elements(self, tree):
86         # this file gets imported by all the others,
87         # so we need to hide the imports to avoid a cyclic imports
88         from htsworkflow.pipelines import firecrest
89         from htsworkflow.pipelines import ipar
90         from htsworkflow.pipelines import bustard
91         from htsworkflow.pipelines import gerald
92
93         tag = tree.tag.lower()
94         if tag != PipelineRun.PIPELINE_RUN.lower():
95           raise ValueError('Pipeline Run Expecting %s got %s' % (
96               PipelineRun.PIPELINE_RUN, tag))
97         for element in tree:
98           tag = element.tag.lower()
99           if tag == PipelineRun.FLOWCELL_ID.lower():
100             self._flowcell_id = element.text
101           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102           # you should only have Firecrest or IPAR, never both of them.
103           elif tag == firecrest.Firecrest.FIRECREST.lower():
104             self.image_analysis = firecrest.Firecrest(xml=element)
105           elif tag == ipar.IPAR.IPAR.lower():
106             self.image_analysis = ipar.IPAR(xml=element)
107           elif tag == bustard.Bustard.BUSTARD.lower():
108             self.bustard = bustard.Bustard(xml=element)
109           elif tag == gerald.Gerald.GERALD.lower():
110             self.gerald = gerald.Gerald(xml=element)
111           else:
112             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
113
114     def _get_run_name(self):
115         """
116         Given a run tuple, find the latest date and use that as our name
117         """
118         if self._name is None:
119           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
120           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
121           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
122         return self._name
123     name = property(_get_run_name)
124
125     def save(self, destdir=None):
126         if destdir is None:
127             destdir = ''
128         logging.info("Saving run report "+ self.name)
129         xml = self.get_elements()
130         indent(xml)
131         dest_pathname = os.path.join(destdir, self.name)
132         ElementTree.ElementTree(xml).write(dest_pathname)
133
134     def load(self, filename):
135         logging.info("Loading run report from " + filename)
136         tree = ElementTree.parse(filename).getroot()
137         self.set_elements(tree)
138
139 def get_runs(runfolder):
140     """
141     Search through a run folder for all the various sub component runs
142     and then return a PipelineRun for each different combination.
143
144     For example if there are two different GERALD runs, this will
145     generate two different PipelineRun objects, that differ
146     in there gerald component.
147     """
148     from htsworkflow.pipelines import firecrest
149     from htsworkflow.pipelines import ipar
150     from htsworkflow.pipelines import bustard
151     from htsworkflow.pipelines import gerald
152
153     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
154         logging.info("Looking for bustard directories in %s" % (pathname,))
155         bustard_glob = os.path.join(pathname, "Bustard*")
156         for bustard_pathname in glob(bustard_glob):
157             logging.info("Found bustard directory %s" % (bustard_pathname,))
158             b = bustard.bustard(bustard_pathname)
159             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
160             logging.info("Looking for gerald directories in %s" % (pathname,))
161             for gerald_pathname in glob(gerald_glob):
162                 logging.info("Found gerald directory %s" % (gerald_pathname,))
163                 try:
164                     g = gerald.gerald(gerald_pathname)
165                     p = PipelineRun(runfolder)
166                     p.image_analysis = image_analysis
167                     p.bustard = b
168                     p.gerald = g
169                     runs.append(p)
170                 except IOError, e:
171                     print "Ignoring", str(e)
172
173     datadir = os.path.join(runfolder, 'Data')
174
175     logging.info('Searching for runs in ' + datadir)
176     runs = []
177     # scan for firecrest directories
178     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
179         logging.info('Found firecrest in ' + datadir)
180         image_analysis = firecrest.firecrest(firecrest_pathname)
181         scan_post_image_analysis(runs, runfolder, image_analysis, firecrest_pathname)
182     # scan for IPAR directories
183     for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
184         logging.info('Found ipar directories in ' + datadir)
185         image_analysis = ipar.ipar(ipar_pathname)
186         scan_post_image_analysis(runs, runfolder, image_analysis, ipar_pathname)
187
188     return runs
189
190
191 def extract_run_parameters(runs):
192     """
193     Search through runfolder_path for various runs and grab their parameters
194     """
195     for run in runs:
196       run.save()
197
198 def summarize_mapped_reads(mapped_reads):
199     """
200     Summarize per chromosome reads into a genome count
201     But handle spike-in/contamination symlinks seperately.
202     """
203     summarized_reads = {}
204     genome_reads = 0
205     genome = 'unknown'
206     for k, v in mapped_reads.items():
207         path, k = os.path.split(k)
208         if len(path) > 0:
209             genome = path
210             genome_reads += v
211         else:
212             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
213     summarized_reads[genome] = genome_reads
214     return summarized_reads
215
216 def summarize_lane(gerald, lane_id):
217     report = []
218     summary_results = gerald.summary.lane_results
219     for end in range(len(summary_results)):  
220       eland_result = gerald.eland_results.results[end][lane_id]
221       report.append("Sample name %s" % (eland_result.sample_name))
222       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
223       cluster = summary_results[end][eland_result.lane_id].cluster
224       report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
225       report.append("Total Reads: %d" % (eland_result.reads))
226       mc = eland_result._match_codes
227       nm = mc['NM']
228       nm_percent = float(nm)/eland_result.reads  * 100
229       qc = mc['QC']
230       qc_percent = float(qc)/eland_result.reads * 100
231
232       report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
233       report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
234       report.append('Unique (0,1,2 mismatches) %d %d %d' % \
235                     (mc['U0'], mc['U1'], mc['U2']))
236       report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
237                     (mc['R0'], mc['R1'], mc['R2']))
238       report.append("Mapped Reads")
239       mapped_reads = summarize_mapped_reads(eland_result.mapped_reads)
240       for name, counts in mapped_reads.items():
241         report.append("  %s: %d" % (name, counts))
242       report.append('')
243     return report
244
245 def summary_report(runs):
246     """
247     Summarize cluster numbers and mapped read counts for a runfolder
248     """
249     report = []
250     for run in runs:
251         # print a run name?
252         report.append('Summary for %s' % (run.name,))
253         # sort the report
254         eland_keys = run.gerald.eland_results.results[0].keys()
255         eland_keys.sort(alphanum)
256
257         for lane_id in eland_keys:
258             report.extend(summarize_lane(run.gerald, lane_id))
259             report.append('---')
260             report.append('')
261         return os.linesep.join(report)
262
263 def is_compressed(filename):
264     if os.path.splitext(filename)[1] == ".gz":
265         return True
266     elif os.path.splitext(filename)[1] == '.bz2':
267         return True
268     else:
269         return False
270
271 def extract_results(runs, output_base_dir=None):
272     if output_base_dir is None:
273         output_base_dir = os.getcwd()
274
275     for r in runs:
276       result_dir = os.path.join(output_base_dir, r.flowcell_id)
277       logging.info("Using %s as result directory" % (result_dir,))
278       if not os.path.exists(result_dir):
279         os.mkdir(result_dir)
280
281       # create cycle_dir
282       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
283       logging.info("Filling in %s" % (cycle,))
284       cycle_dir = os.path.join(result_dir, cycle)
285       if os.path.exists(cycle_dir):
286         logging.error("%s already exists, not overwriting" % (cycle_dir,))
287         continue
288       else:
289         os.mkdir(cycle_dir)
290
291       # copy stuff out of the main run
292       g = r.gerald
293
294       # save run file
295       r.save(cycle_dir)
296
297       # Copy Summary.htm
298       summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
299       if os.path.exists(summary_path):
300           logging.info('Copying %s to %s' % (summary_path, cycle_dir))
301           shutil.copy(summary_path, cycle_dir)
302       else:
303           logging.info('Summary file %s was not found' % (summary_path,))
304
305       # tar score files
306       score_files = []
307
308       # check for g.pathname/Temp a new feature of 1.1rc1
309       scores_path = g.pathname
310       scores_path_temp = os.path.join(scores_path, 'Temp')
311       if os.path.isdir(scores_path_temp):
312           scores_path = scores_path_temp
313
314       # hopefully we have a directory that contains s_*_score files
315       for f in os.listdir(scores_path):
316           if re.match('.*_score.txt', f):
317               score_files.append(f)
318
319       tar_cmd = ['/bin/tar', 'c'] + score_files
320       bzip_cmd = [ 'bzip2', '-9', '-c' ]
321       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
322       tar_dest = open(tar_dest_name, 'w')
323       logging.info("Compressing score files from %s" % (scores_path,))
324       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
325       logging.info("Running bzip2: " + " ".join(bzip_cmd))
326       logging.info("Writing to %s" %(tar_dest_name))
327
328       tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, 
329                              cwd=scores_path)
330       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
331       tar.wait()
332
333       # copy & bzip eland files
334       for lanes_dictionary in g.eland_results.results:
335           for eland_lane in lanes_dictionary.values():
336               source_name = eland_lane.pathname
337               path, name = os.path.split(eland_lane.pathname)
338               dest_name = os.path.join(cycle_dir, name)
339               logging.info("Saving eland file %s to %s" % \
340                            (source_name, dest_name))
341
342               if is_compressed(name):
343                 logging.info('Already compressed, Saving to %s' % (dest_name, ))
344                 shutil.copy(source_name, dest_name)
345               else:
346                 # not compressed
347                 dest_name += '.bz2'
348                 args = ['bzip2', '-9', '-c', source_name]
349                 logging.info('Running: %s' % ( " ".join(args) ))
350                 bzip_dest = open(dest_name, 'w')
351                 bzip = subprocess.Popen(args, stdout=bzip_dest)
352                 logging.info('Saving to %s' % (dest_name, ))
353                 bzip.wait()
354
355 def clean_runs(runs):
356     """
357     Clean up run folders to optimize for compression.
358     """
359     # TODO: implement this.
360     # rm RunLog*.xml
361     # rm pipeline_*.txt
362     # rm gclog.txt
363     # rm NetCopy.log
364     # rm nfn.log
365     # rm Images/L*
366     # cd Data/C1-*_Firecrest*
367     # make clean_intermediate
368
369     pass