Handle lanes that were only sequenced.
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import time
13
14 try:
15   from xml.etree import ElementTree
16 except ImportError, e:
17   from elementtree import ElementTree
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
27
28 class PipelineRun(object):
29     """
30     Capture "interesting" information about a pipeline run
31     """
32     XML_VERSION = 1
33     PIPELINE_RUN = 'PipelineRun'
34     FLOWCELL_ID = 'FlowcellID'
35
36     def __init__(self, pathname=None, xml=None):
37         if pathname is not None:
38           self.pathname = os.path.normpath(pathname)
39         else:
40           self.pathname = None
41         self._name = None
42         self._flowcell_id = None
43         self.image_analysis = None
44         self.bustard = None
45         self.gerald = None
46
47         if xml is not None:
48           self.set_elements(xml)
49
50     def _get_flowcell_id(self):
51         # extract flowcell ID
52         if self._flowcell_id is None:
53           config_dir = os.path.join(self.pathname, 'Config')
54           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55           if os.path.exists(flowcell_id_path):
56             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57             self._flowcell_id = flowcell_id_tree.findtext('Text')
58           else:
59             path_fields = self.pathname.split('_')
60             if len(path_fields) > 0:
61               # guessing last element of filename
62               flowcell_id = path_fields[-1]
63             else:
64               flowcell_id = 'unknown'
65
66             logging.warning(
67               "Flowcell id was not found, guessing %s" % (
68                  flowcell_id))
69             self._flowcell_id = flowcell_id
70         return self._flowcell_id
71     flowcell_id = property(_get_flowcell_id)
72
73     def get_elements(self):
74         """
75         make one master xml file from all of our sub-components.
76         """
77         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79         flowcell.text = self.flowcell_id
80         root.append(self.image_analysis.get_elements())
81         root.append(self.bustard.get_elements())
82         root.append(self.gerald.get_elements())
83         return root
84
85     def set_elements(self, tree):
86         # this file gets imported by all the others,
87         # so we need to hide the imports to avoid a cyclic imports
88         from htsworkflow.pipelines import firecrest
89         from htsworkflow.pipelines import ipar
90         from htsworkflow.pipelines import bustard
91         from htsworkflow.pipelines import gerald
92
93         tag = tree.tag.lower()
94         if tag != PipelineRun.PIPELINE_RUN.lower():
95           raise ValueError('Pipeline Run Expecting %s got %s' % (
96               PipelineRun.PIPELINE_RUN, tag))
97         for element in tree:
98           tag = element.tag.lower()
99           if tag == PipelineRun.FLOWCELL_ID.lower():
100             self._flowcell_id = element.text
101           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102           # you should only have Firecrest or IPAR, never both of them.
103           elif tag == firecrest.Firecrest.FIRECREST.lower():
104             self.image_analysis = firecrest.Firecrest(xml=element)
105           elif tag == ipar.IPAR.IPAR.lower():
106             self.image_analysis = ipar.IPAR(xml=element)
107           elif tag == bustard.Bustard.BUSTARD.lower():
108             self.bustard = bustard.Bustard(xml=element)
109           elif tag == gerald.Gerald.GERALD.lower():
110             self.gerald = gerald.Gerald(xml=element)
111           else:
112             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
113
114     def _get_run_name(self):
115         """
116         Given a run tuple, find the latest date and use that as our name
117         """
118         if self._name is None:
119           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
120           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
121           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
122         return self._name
123     name = property(_get_run_name)
124
125     def save(self, destdir=None):
126         if destdir is None:
127             destdir = ''
128         logging.info("Saving run report "+ self.name)
129         xml = self.get_elements()
130         indent(xml)
131         dest_pathname = os.path.join(destdir, self.name)
132         ElementTree.ElementTree(xml).write(dest_pathname)
133
134     def load(self, filename):
135         logging.info("Loading run report from " + filename)
136         tree = ElementTree.parse(filename).getroot()
137         self.set_elements(tree)
138
139 def load_pipeline_run_xml(pathname):
140     """
141     Load and instantiate a Pipeline run from a run xml file
142
143     :Parameters: 
144       - `pathname` : location of an run xml file
145
146     :Returns: initialized PipelineRun object
147     """
148     tree = ElementTree.parse(pathname).getroot()
149     run = PipelineRun(xml=tree)
150     return run
151
152 def get_runs(runfolder):
153     """
154     Search through a run folder for all the various sub component runs
155     and then return a PipelineRun for each different combination.
156
157     For example if there are two different GERALD runs, this will
158     generate two different PipelineRun objects, that differ
159     in there gerald component.
160     """
161     from htsworkflow.pipelines import firecrest
162     from htsworkflow.pipelines import ipar
163     from htsworkflow.pipelines import bustard
164     from htsworkflow.pipelines import gerald
165
166     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
167         logging.info("Looking for bustard directories in %s" % (pathname,))
168         bustard_glob = os.path.join(pathname, "Bustard*")
169         for bustard_pathname in glob(bustard_glob):
170             logging.info("Found bustard directory %s" % (bustard_pathname,))
171             b = bustard.bustard(bustard_pathname)
172             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
173             logging.info("Looking for gerald directories in %s" % (pathname,))
174             for gerald_pathname in glob(gerald_glob):
175                 logging.info("Found gerald directory %s" % (gerald_pathname,))
176                 try:
177                     g = gerald.gerald(gerald_pathname)
178                     p = PipelineRun(runfolder)
179                     p.image_analysis = image_analysis
180                     p.bustard = b
181                     p.gerald = g
182                     runs.append(p)
183                 except IOError, e:
184                     logging.error("Ignoring " + str(e))
185
186     datadir = os.path.join(runfolder, 'Data')
187
188     logging.info('Searching for runs in ' + datadir)
189     runs = []
190     # scan for firecrest directories
191     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
192         logging.info('Found firecrest in ' + datadir)
193         image_analysis = firecrest.firecrest(firecrest_pathname)
194         if image_analysis is None:
195             logging.warn(
196                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
197             )
198         else:
199             scan_post_image_analysis(
200                 runs, runfolder, image_analysis, firecrest_pathname
201             )
202     # scan for IPAR directories
203     for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
204         logging.info('Found ipar directories in ' + datadir)
205         image_analysis = ipar.ipar(ipar_pathname)
206         if image_analysis is None:
207             logging.warn(
208                 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
209             )
210         else:
211             scan_post_image_analysis(
212                 runs, runfolder, image_analysis, ipar_pathname
213             )
214
215     return runs
216
217 def get_specific_run(gerald_dir):
218     """
219     Given a gerald directory, construct a PipelineRun out of its parents
220
221     Basically this allows specifying a particular run instead of the previous
222     get_runs which scans a runfolder for various combinations of
223     firecrest/ipar/bustard/gerald runs.
224     """
225     from htsworkflow.pipelines import firecrest
226     from htsworkflow.pipelines import ipar
227     from htsworkflow.pipelines import bustard
228     from htsworkflow.pipelines import gerald
229
230     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
231     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
232
233     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
234    
235     logging.info('--- use-run detected options ---')
236     logging.info('runfolder: %s' % (runfolder_dir,))
237     logging.info('image_dir: %s' % (image_dir,))
238     logging.info('bustard_dir: %s' % (bustard_dir,))
239     logging.info('gerald_dir: %s' % (gerald_dir,))
240
241     # find our processed image dir
242     image_run = None
243     # split into parent, and leaf directory
244     # leaf directory should be an IPAR or firecrest directory
245     data_dir, short_image_dir = os.path.split(image_dir)
246     logging.info('data_dir: %s' % (data_dir,))
247     logging.info('short_iamge_dir: %s' %(short_image_dir,))
248
249     # guess which type of image processing directory we have by looking
250     # in the leaf directory name
251     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
252         image_run = firecrest.firecrest(image_dir)
253     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
254         image_run = ipar.ipar(image_dir)
255     # if we din't find a run, report the error and return 
256     if image_run is None:
257         msg = '%s does not contain an image processing step' % (image_dir,)
258         logging.error(msg)
259         return None
260
261     # find our base calling
262     base_calling_run = bustard.bustard(bustard_dir)
263     if base_calling_run is None:
264         logging.error('%s does not contain a bustard run' % (bustard_dir,))
265         return None
266
267     # find alignments
268     gerald_run = gerald.gerald(gerald_dir)
269     if gerald_run is None:
270         logging.error('%s does not contain a gerald run' % (gerald_dir,))
271         return None
272
273     p = PipelineRun(runfolder_dir)
274     p.image_analysis = image_run
275     p.bustard = base_calling_run
276     p.gerald = gerald_run
277     
278     logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
279     return p
280
281 def extract_run_parameters(runs):
282     """
283     Search through runfolder_path for various runs and grab their parameters
284     """
285     for run in runs:
286       run.save()
287
288 def summarize_mapped_reads(genome_map, mapped_reads):
289     """
290     Summarize per chromosome reads into a genome count
291     But handle spike-in/contamination symlinks seperately.
292     """
293     summarized_reads = {}
294     genome_reads = 0
295     genome = 'unknown'
296     for k, v in mapped_reads.items():
297         path, k = os.path.split(k)
298         if len(path) > 0 and not genome_map.has_key(path):
299             genome = path
300             genome_reads += v
301         else:
302             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
303     summarized_reads[genome] = genome_reads
304     return summarized_reads
305
306 def summarize_lane(gerald, lane_id):
307     report = []
308     summary_results = gerald.summary.lane_results
309     for end in range(len(summary_results)):  
310       eland_result = gerald.eland_results.results[end][lane_id]
311       report.append("Sample name %s" % (eland_result.sample_name))
312       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
313       cluster = summary_results[end][eland_result.lane_id].cluster
314       report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
315       report.append("Total Reads: %d" % (eland_result.reads))
316
317       if hasattr(eland_result, 'match_codes'):
318           mc = eland_result.match_codes
319           nm = mc['NM']
320           nm_percent = float(nm)/eland_result.reads  * 100
321           qc = mc['QC']
322           qc_percent = float(qc)/eland_result.reads * 100
323
324           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
325           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
326           report.append('Unique (0,1,2 mismatches) %d %d %d' % \
327                         (mc['U0'], mc['U1'], mc['U2']))
328           report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
329                         (mc['R0'], mc['R1'], mc['R2']))
330
331       if hasattr(eland_result, 'genome_map'):
332           report.append("Mapped Reads")
333           mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
334           for name, counts in mapped_reads.items():
335             report.append("  %s: %d" % (name, counts))
336
337       report.append('')
338     return report
339
340 def summary_report(runs):
341     """
342     Summarize cluster numbers and mapped read counts for a runfolder
343     """
344     report = []
345     for run in runs:
346         # print a run name?
347         report.append('Summary for %s' % (run.name,))
348         # sort the report
349         eland_keys = run.gerald.eland_results.results[0].keys()
350         eland_keys.sort(alphanum)
351
352         for lane_id in eland_keys:
353             report.extend(summarize_lane(run.gerald, lane_id))
354             report.append('---')
355             report.append('')
356         return os.linesep.join(report)
357
358 def is_compressed(filename):
359     if os.path.splitext(filename)[1] == ".gz":
360         return True
361     elif os.path.splitext(filename)[1] == '.bz2':
362         return True
363     else:
364         return False
365
366 def extract_results(runs, output_base_dir=None):
367     if output_base_dir is None:
368         output_base_dir = os.getcwd()
369
370     for r in runs:
371       result_dir = os.path.join(output_base_dir, r.flowcell_id)
372       logging.info("Using %s as result directory" % (result_dir,))
373       if not os.path.exists(result_dir):
374         os.mkdir(result_dir)
375
376       # create cycle_dir
377       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
378       logging.info("Filling in %s" % (cycle,))
379       cycle_dir = os.path.join(result_dir, cycle)
380       if os.path.exists(cycle_dir):
381         logging.error("%s already exists, not overwriting" % (cycle_dir,))
382         continue
383       else:
384         os.mkdir(cycle_dir)
385
386       # copy stuff out of the main run
387       g = r.gerald
388
389       # save run file
390       r.save(cycle_dir)
391
392       # Copy Summary.htm
393       summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
394       if os.path.exists(summary_path):
395           logging.info('Copying %s to %s' % (summary_path, cycle_dir))
396           shutil.copy(summary_path, cycle_dir)
397       else:
398           logging.info('Summary file %s was not found' % (summary_path,))
399
400       # tar score files
401       score_files = []
402
403       # check for g.pathname/Temp a new feature of 1.1rc1
404       scores_path = g.pathname
405       scores_path_temp = os.path.join(scores_path, 'Temp')
406       if os.path.isdir(scores_path_temp):
407           scores_path = scores_path_temp
408
409       # hopefully we have a directory that contains s_*_score files
410       for f in os.listdir(scores_path):
411           if re.match('.*_score.txt', f):
412               score_files.append(f)
413
414       tar_cmd = ['/bin/tar', 'c'] + score_files
415       bzip_cmd = [ 'bzip2', '-9', '-c' ]
416       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
417       tar_dest = open(tar_dest_name, 'w')
418       logging.info("Compressing score files from %s" % (scores_path,))
419       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
420       logging.info("Running bzip2: " + " ".join(bzip_cmd))
421       logging.info("Writing to %s" %(tar_dest_name))
422
423       env = {'BZIP': '-9'}
424       tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
425                              cwd=scores_path)
426       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
427       tar.wait()
428
429       # copy & bzip eland files
430       for lanes_dictionary in g.eland_results.results:
431           for eland_lane in lanes_dictionary.values():
432               source_name = eland_lane.pathname
433               path, name = os.path.split(eland_lane.pathname)
434               dest_name = os.path.join(cycle_dir, name)
435               logging.info("Saving eland file %s to %s" % \
436                            (source_name, dest_name))
437
438               if is_compressed(name):
439                 logging.info('Already compressed, Saving to %s' % (dest_name, ))
440                 shutil.copy(source_name, dest_name)
441               else:
442                 # not compressed
443                 dest_name += '.bz2'
444                 args = ['bzip2', '-9', '-c', source_name]
445                 logging.info('Running: %s' % ( " ".join(args) ))
446                 bzip_dest = open(dest_name, 'w')
447                 bzip = subprocess.Popen(args, stdout=bzip_dest)
448                 logging.info('Saving to %s' % (dest_name, ))
449                 bzip.wait()
450
451 def rm_list(files, dry_run=True):
452     for f in files:
453         if os.path.exists(f):
454             logging.info('deleting %s' % (f,))
455             if not dry_run:
456                 if os.path.isdir(f):
457                     shutil.rmtree(f)
458                 else:
459                     os.unlink(f)
460         else:
461             logging.warn("%s doesn't exist."% (f,))
462
463 def clean_runs(runs, dry_run=True):
464     """
465     Clean up run folders to optimize for compression.
466     """
467     if dry_run:
468         logging.info('In dry-run mode')
469
470     for run in runs:
471         logging.info('Cleaninging %s' % (run.pathname,))
472         # rm RunLog*.xml
473         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
474         rm_list(runlogs, dry_run)
475         # rm pipeline_*.txt
476         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
477         rm_list(pipeline_logs, dry_run)
478         # rm gclog.txt?
479         # rm NetCopy.log? Isn't this robocopy?
480         logs = glob(os.path.join(run.pathname, '*.log'))
481         rm_list(logs, dry_run)
482         # rm nfn.log?
483         # Calibration
484         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
485         rm_list(calibration_dir, dry_run)
486         # rm Images/L*
487         logging.info("Cleaning images")
488         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
489         rm_list(image_dirs, dry_run)
490         # cd Data/C1-*_Firecrest*
491         logging.info("Cleaning intermediate files")
492         # make clean_intermediate
493         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
494             clean_process = subprocess.Popen(['make', 'clean_intermediate'], 
495                                              cwd=run.image_analysis.pathname,)
496             clean_process.wait()
497
498
499