5d663b3d9924890097cd4219b254fdf8339f0f9c
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import time
13
14 try:
15   from xml.etree import ElementTree
16 except ImportError, e:
17   from elementtree import ElementTree
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
27
28 class PipelineRun(object):
29     """
30     Capture "interesting" information about a pipeline run
31     """
32     XML_VERSION = 1
33     PIPELINE_RUN = 'PipelineRun'
34     FLOWCELL_ID = 'FlowcellID'
35
36     def __init__(self, pathname=None, xml=None):
37         if pathname is not None:
38           self.pathname = os.path.normpath(pathname)
39         else:
40           self.pathname = None
41         self._name = None
42         self._flowcell_id = None
43         self.image_analysis = None
44         self.bustard = None
45         self.gerald = None
46
47         if xml is not None:
48           self.set_elements(xml)
49
50     def _get_flowcell_id(self):
51         # extract flowcell ID
52         if self._flowcell_id is None:
53           config_dir = os.path.join(self.pathname, 'Config')
54           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55           if os.path.exists(flowcell_id_path):
56             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57             self._flowcell_id = flowcell_id_tree.findtext('Text')
58           else:
59             path_fields = self.pathname.split('_')
60             if len(path_fields) > 0:
61               # guessing last element of filename
62               flowcell_id = path_fields[-1]
63             else:
64               flowcell_id = 'unknown'
65
66             logging.warning(
67               "Flowcell id was not found, guessing %s" % (
68                  flowcell_id))
69             self._flowcell_id = flowcell_id
70         return self._flowcell_id
71     flowcell_id = property(_get_flowcell_id)
72
73     def get_elements(self):
74         """
75         make one master xml file from all of our sub-components.
76         """
77         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79         flowcell.text = self.flowcell_id
80         root.append(self.image_analysis.get_elements())
81         root.append(self.bustard.get_elements())
82         root.append(self.gerald.get_elements())
83         return root
84
85     def set_elements(self, tree):
86         # this file gets imported by all the others,
87         # so we need to hide the imports to avoid a cyclic imports
88         from htsworkflow.pipelines import firecrest
89         from htsworkflow.pipelines import ipar
90         from htsworkflow.pipelines import bustard
91         from htsworkflow.pipelines import gerald
92
93         tag = tree.tag.lower()
94         if tag != PipelineRun.PIPELINE_RUN.lower():
95           raise ValueError('Pipeline Run Expecting %s got %s' % (
96               PipelineRun.PIPELINE_RUN, tag))
97         for element in tree:
98           tag = element.tag.lower()
99           if tag == PipelineRun.FLOWCELL_ID.lower():
100             self._flowcell_id = element.text
101           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102           # you should only have Firecrest or IPAR, never both of them.
103           elif tag == firecrest.Firecrest.FIRECREST.lower():
104             self.image_analysis = firecrest.Firecrest(xml=element)
105           elif tag == ipar.IPAR.IPAR.lower():
106             self.image_analysis = ipar.IPAR(xml=element)
107           elif tag == bustard.Bustard.BUSTARD.lower():
108             self.bustard = bustard.Bustard(xml=element)
109           elif tag == gerald.Gerald.GERALD.lower():
110             self.gerald = gerald.Gerald(xml=element)
111           else:
112             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
113
114     def _get_run_name(self):
115         """
116         Given a run tuple, find the latest date and use that as our name
117         """
118         if self._name is None:
119           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
120           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
121           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
122         return self._name
123     name = property(_get_run_name)
124
125     def save(self, destdir=None):
126         if destdir is None:
127             destdir = ''
128         logging.info("Saving run report "+ self.name)
129         xml = self.get_elements()
130         indent(xml)
131         dest_pathname = os.path.join(destdir, self.name)
132         ElementTree.ElementTree(xml).write(dest_pathname)
133
134     def load(self, filename):
135         logging.info("Loading run report from " + filename)
136         tree = ElementTree.parse(filename).getroot()
137         self.set_elements(tree)
138
139 def load_pipeline_run_xml(pathname):
140     """
141     Load and instantiate a Pipeline run from a run xml file
142
143     :Parameters: 
144       - `pathname` : location of an run xml file
145
146     :Returns: initialized PipelineRun object
147     """
148     tree = ElementTree.parse(pathname).getroot()
149     run = PipelineRun(xml=tree)
150     return run
151
152 def get_runs(runfolder):
153     """
154     Search through a run folder for all the various sub component runs
155     and then return a PipelineRun for each different combination.
156
157     For example if there are two different GERALD runs, this will
158     generate two different PipelineRun objects, that differ
159     in there gerald component.
160     """
161     from htsworkflow.pipelines import firecrest
162     from htsworkflow.pipelines import ipar
163     from htsworkflow.pipelines import bustard
164     from htsworkflow.pipelines import gerald
165
166     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
167         logging.info("Looking for bustard directories in %s" % (pathname,))
168         bustard_glob = os.path.join(pathname, "Bustard*")
169         for bustard_pathname in glob(bustard_glob):
170             logging.info("Found bustard directory %s" % (bustard_pathname,))
171             b = bustard.bustard(bustard_pathname)
172             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
173             logging.info("Looking for gerald directories in %s" % (pathname,))
174             for gerald_pathname in glob(gerald_glob):
175                 logging.info("Found gerald directory %s" % (gerald_pathname,))
176                 try:
177                     g = gerald.gerald(gerald_pathname)
178                     p = PipelineRun(runfolder)
179                     p.image_analysis = image_analysis
180                     p.bustard = b
181                     p.gerald = g
182                     runs.append(p)
183                 except IOError, e:
184                     logging.error("Ignoring " + str(e))
185
186     datadir = os.path.join(runfolder, 'Data')
187
188     logging.info('Searching for runs in ' + datadir)
189     runs = []
190     # scan for firecrest directories
191     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
192         logging.info('Found firecrest in ' + datadir)
193         image_analysis = firecrest.firecrest(firecrest_pathname)
194         if image_analysis is None:
195             logging.warn(
196                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
197             )
198         else:
199             scan_post_image_analysis(
200                 runs, runfolder, image_analysis, firecrest_pathname
201             )
202     # scan for IPAR directories
203     for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
204         logging.info('Found ipar directories in ' + datadir)
205         image_analysis = ipar.ipar(ipar_pathname)
206         if image_analysis is None:
207             logging.warn(
208                 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
209             )
210         else:
211             scan_post_image_analysis(
212                 runs, runfolder, image_analysis, ipar_pathname
213             )
214
215     return runs
216
217 def get_specific_run(gerald_dir):
218     """
219     Given a gerald directory, construct a PipelineRun out of its parents
220
221     Basically this allows specifying a particular run instead of the previous
222     get_runs which scans a runfolder for various combinations of
223     firecrest/ipar/bustard/gerald runs.
224     """
225     from htsworkflow.pipelines import firecrest
226     from htsworkflow.pipelines import ipar
227     from htsworkflow.pipelines import bustard
228     from htsworkflow.pipelines import gerald
229
230     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
231     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
232
233     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
234    
235     logging.debug('--- use-run detected options ---')
236     logging.debug('runfolder: %s' % (runfolder_dir,))
237     logging.debug('image_dir: %s' % (image_dir,))
238     logging.debug('bustard_dir: %s' % (bustard_dir,))
239     logging.debug('gerald_dir: %s' % (gerald_dir,))
240
241     # find our processed image dir
242     image_run = ipar.ipar(image_dir)
243     if image_run is None:
244         image_run = firecrest.firecrest(image_dir)
245     if image_run is None:
246         msg = '%s does not contain an image processing step' % (image_dir,)
247         logging.error(msg)
248         return None
249
250     # find our base calling
251     base_calling_run = bustard.bustard(bustard_dir)
252     if base_calling_run is None:
253         logging.error('%s does not contain a bustard run' % (bustard_dir,))
254         return None
255
256     # find alignments
257     gerald_run = gerald.gerald(gerald_dir)
258     if gerald_run is None:
259         logging.error('%s does not contain a gerald run' % (gerald_dir,))
260         return None
261
262     p = PipelineRun(runfolder_dir)
263     p.image_analysis = image_run
264     p.bustard = base_calling_run
265     p.gerald = gerald_run
266     
267     logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
268     return p
269
270 def extract_run_parameters(runs):
271     """
272     Search through runfolder_path for various runs and grab their parameters
273     """
274     for run in runs:
275       run.save()
276
277 def summarize_mapped_reads(genome_map, mapped_reads):
278     """
279     Summarize per chromosome reads into a genome count
280     But handle spike-in/contamination symlinks seperately.
281     """
282     summarized_reads = {}
283     genome_reads = 0
284     genome = 'unknown'
285     for k, v in mapped_reads.items():
286         path, k = os.path.split(k)
287         if len(path) > 0 and not genome_map.has_key(path):
288             genome = path
289             genome_reads += v
290         else:
291             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
292     summarized_reads[genome] = genome_reads
293     return summarized_reads
294
295 def summarize_lane(gerald, lane_id):
296     report = []
297     summary_results = gerald.summary.lane_results
298     for end in range(len(summary_results)):  
299       eland_result = gerald.eland_results.results[end][lane_id]
300       report.append("Sample name %s" % (eland_result.sample_name))
301       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
302       cluster = summary_results[end][eland_result.lane_id].cluster
303       report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
304       report.append("Total Reads: %d" % (eland_result.reads))
305       mc = eland_result._match_codes
306       nm = mc['NM']
307       nm_percent = float(nm)/eland_result.reads  * 100
308       qc = mc['QC']
309       qc_percent = float(qc)/eland_result.reads * 100
310
311       report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
312       report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
313       report.append('Unique (0,1,2 mismatches) %d %d %d' % \
314                     (mc['U0'], mc['U1'], mc['U2']))
315       report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
316                     (mc['R0'], mc['R1'], mc['R2']))
317       report.append("Mapped Reads")
318       mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
319       for name, counts in mapped_reads.items():
320         report.append("  %s: %d" % (name, counts))
321       report.append('')
322     return report
323
324 def summary_report(runs):
325     """
326     Summarize cluster numbers and mapped read counts for a runfolder
327     """
328     report = []
329     for run in runs:
330         # print a run name?
331         report.append('Summary for %s' % (run.name,))
332         # sort the report
333         eland_keys = run.gerald.eland_results.results[0].keys()
334         eland_keys.sort(alphanum)
335
336         for lane_id in eland_keys:
337             report.extend(summarize_lane(run.gerald, lane_id))
338             report.append('---')
339             report.append('')
340         return os.linesep.join(report)
341
342 def is_compressed(filename):
343     if os.path.splitext(filename)[1] == ".gz":
344         return True
345     elif os.path.splitext(filename)[1] == '.bz2':
346         return True
347     else:
348         return False
349
350 def extract_results(runs, output_base_dir=None):
351     if output_base_dir is None:
352         output_base_dir = os.getcwd()
353
354     for r in runs:
355       result_dir = os.path.join(output_base_dir, r.flowcell_id)
356       logging.info("Using %s as result directory" % (result_dir,))
357       if not os.path.exists(result_dir):
358         os.mkdir(result_dir)
359
360       # create cycle_dir
361       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
362       logging.info("Filling in %s" % (cycle,))
363       cycle_dir = os.path.join(result_dir, cycle)
364       if os.path.exists(cycle_dir):
365         logging.error("%s already exists, not overwriting" % (cycle_dir,))
366         continue
367       else:
368         os.mkdir(cycle_dir)
369
370       # copy stuff out of the main run
371       g = r.gerald
372
373       # save run file
374       r.save(cycle_dir)
375
376       return
377
378       # Copy Summary.htm
379       summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
380       if os.path.exists(summary_path):
381           logging.info('Copying %s to %s' % (summary_path, cycle_dir))
382           shutil.copy(summary_path, cycle_dir)
383       else:
384           logging.info('Summary file %s was not found' % (summary_path,))
385
386       # tar score files
387       score_files = []
388
389       # check for g.pathname/Temp a new feature of 1.1rc1
390       scores_path = g.pathname
391       scores_path_temp = os.path.join(scores_path, 'Temp')
392       if os.path.isdir(scores_path_temp):
393           scores_path = scores_path_temp
394
395       # hopefully we have a directory that contains s_*_score files
396       for f in os.listdir(scores_path):
397           if re.match('.*_score.txt', f):
398               score_files.append(f)
399
400       tar_cmd = ['/bin/tar', 'c'] + score_files
401       bzip_cmd = [ 'bzip2', '-9', '-c' ]
402       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
403       tar_dest = open(tar_dest_name, 'w')
404       logging.info("Compressing score files from %s" % (scores_path,))
405       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
406       logging.info("Running bzip2: " + " ".join(bzip_cmd))
407       logging.info("Writing to %s" %(tar_dest_name))
408
409       tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, 
410                              cwd=scores_path)
411       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
412       tar.wait()
413
414       # copy & bzip eland files
415       for lanes_dictionary in g.eland_results.results:
416           for eland_lane in lanes_dictionary.values():
417               source_name = eland_lane.pathname
418               path, name = os.path.split(eland_lane.pathname)
419               dest_name = os.path.join(cycle_dir, name)
420               logging.info("Saving eland file %s to %s" % \
421                            (source_name, dest_name))
422
423               if is_compressed(name):
424                 logging.info('Already compressed, Saving to %s' % (dest_name, ))
425                 shutil.copy(source_name, dest_name)
426               else:
427                 # not compressed
428                 dest_name += '.bz2'
429                 args = ['bzip2', '-9', '-c', source_name]
430                 logging.info('Running: %s' % ( " ".join(args) ))
431                 bzip_dest = open(dest_name, 'w')
432                 bzip = subprocess.Popen(args, stdout=bzip_dest)
433                 logging.info('Saving to %s' % (dest_name, ))
434                 bzip.wait()
435
436 def clean_runs(runs):
437     """
438     Clean up run folders to optimize for compression.
439     """
440     # TODO: implement this.
441     # rm RunLog*.xml
442     # rm pipeline_*.txt
443     # rm gclog.txt
444     # rm NetCopy.log
445     # rm nfn.log
446     # rm Images/L*
447     # cd Data/C1-*_Firecrest*
448     # make clean_intermediate
449
450     pass