cfa68a4a462d9377e2eefabd1d9c42590a5f66ee
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import tarfile
13 import time
14
15 try:
16   from xml.etree import ElementTree
17 except ImportError, e:
18   from elementtree import ElementTree
19
20 EUROPEAN_STRPTIME = "%d-%m-%Y"
21 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
22 VERSION_RE = "([0-9\.]+)"
23 USER_RE = "([a-zA-Z0-9]+)"
24 LANES_PER_FLOWCELL = 8
25 LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
26
27 from htsworkflow.util.alphanum import alphanum
28 from htsworkflow.util.ethelp import indent, flatten
29
30 from htsworkflow.pipelines import srf
31
32 class PipelineRun(object):
33     """
34     Capture "interesting" information about a pipeline run
35     """
36     XML_VERSION = 1
37     PIPELINE_RUN = 'PipelineRun'
38     FLOWCELL_ID = 'FlowcellID'
39
40     def __init__(self, pathname=None, xml=None):
41         if pathname is not None:
42           self.pathname = os.path.normpath(pathname)
43         else:
44           self.pathname = None
45         self._name = None
46         self._flowcell_id = None
47         self.image_analysis = None
48         self.bustard = None
49         self.gerald = None
50
51         if xml is not None:
52           self.set_elements(xml)
53
54     def _get_flowcell_id(self):
55         # extract flowcell ID
56         if self._flowcell_id is None:
57           config_dir = os.path.join(self.pathname, 'Config')
58           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
59           if os.path.exists(flowcell_id_path):
60             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
61             self._flowcell_id = flowcell_id_tree.findtext('Text')
62           else:
63             path_fields = self.pathname.split('_')
64             if len(path_fields) > 0:
65               # guessing last element of filename
66               flowcell_id = path_fields[-1]
67             else:
68               flowcell_id = 'unknown'
69
70             logging.warning(
71               "Flowcell id was not found, guessing %s" % (
72                  flowcell_id))
73             self._flowcell_id = flowcell_id
74         return self._flowcell_id
75     flowcell_id = property(_get_flowcell_id)
76
77     def get_elements(self):
78         """
79         make one master xml file from all of our sub-components.
80         """
81         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
82         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
83         flowcell.text = self.flowcell_id
84         root.append(self.image_analysis.get_elements())
85         root.append(self.bustard.get_elements())
86         root.append(self.gerald.get_elements())
87         return root
88
89     def set_elements(self, tree):
90         # this file gets imported by all the others,
91         # so we need to hide the imports to avoid a cyclic imports
92         from htsworkflow.pipelines import firecrest
93         from htsworkflow.pipelines import ipar
94         from htsworkflow.pipelines import bustard
95         from htsworkflow.pipelines import gerald
96
97         tag = tree.tag.lower()
98         if tag != PipelineRun.PIPELINE_RUN.lower():
99           raise ValueError('Pipeline Run Expecting %s got %s' % (
100               PipelineRun.PIPELINE_RUN, tag))
101         for element in tree:
102           tag = element.tag.lower()
103           if tag == PipelineRun.FLOWCELL_ID.lower():
104             self._flowcell_id = element.text
105           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
106           # you should only have Firecrest or IPAR, never both of them.
107           elif tag == firecrest.Firecrest.FIRECREST.lower():
108             self.image_analysis = firecrest.Firecrest(xml=element)
109           elif tag == ipar.IPAR.IPAR.lower():
110             self.image_analysis = ipar.IPAR(xml=element)
111           elif tag == bustard.Bustard.BUSTARD.lower():
112             self.bustard = bustard.Bustard(xml=element)
113           elif tag == gerald.Gerald.GERALD.lower():
114             self.gerald = gerald.Gerald(xml=element)
115           else:
116             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
117
118     def _get_run_name(self):
119         """
120         Given a run tuple, find the latest date and use that as our name
121         """
122         if self._name is None:
123           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
124           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
125           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
126         return self._name
127     name = property(_get_run_name)
128
129     def save(self, destdir=None):
130         if destdir is None:
131             destdir = ''
132         logging.info("Saving run report "+ self.name)
133         xml = self.get_elements()
134         indent(xml)
135         dest_pathname = os.path.join(destdir, self.name)
136         ElementTree.ElementTree(xml).write(dest_pathname)
137
138     def load(self, filename):
139         logging.info("Loading run report from " + filename)
140         tree = ElementTree.parse(filename).getroot()
141         self.set_elements(tree)
142
143 def load_pipeline_run_xml(pathname):
144     """
145     Load and instantiate a Pipeline run from a run xml file
146
147     :Parameters: 
148       - `pathname` : location of an run xml file
149
150     :Returns: initialized PipelineRun object
151     """
152     tree = ElementTree.parse(pathname).getroot()
153     run = PipelineRun(xml=tree)
154     return run
155
156 def get_runs(runfolder):
157     """
158     Search through a run folder for all the various sub component runs
159     and then return a PipelineRun for each different combination.
160
161     For example if there are two different GERALD runs, this will
162     generate two different PipelineRun objects, that differ
163     in there gerald component.
164     """
165     from htsworkflow.pipelines import firecrest
166     from htsworkflow.pipelines import ipar
167     from htsworkflow.pipelines import bustard
168     from htsworkflow.pipelines import gerald
169
170     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
171         logging.info("Looking for bustard directories in %s" % (pathname,))
172         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
173         # RTA BaseCalls looks enough like Bustard.
174         bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
175         for bustard_pathname in bustard_dirs:
176             logging.info("Found bustard directory %s" % (bustard_pathname,))
177             b = bustard.bustard(bustard_pathname)
178             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
179             logging.info("Looking for gerald directories in %s" % (pathname,))
180             for gerald_pathname in glob(gerald_glob):
181                 logging.info("Found gerald directory %s" % (gerald_pathname,))
182                 try:
183                     g = gerald.gerald(gerald_pathname)
184                     p = PipelineRun(runfolder)
185                     p.image_analysis = image_analysis
186                     p.bustard = b
187                     p.gerald = g
188                     runs.append(p)
189                 except IOError, e:
190                     logging.error("Ignoring " + str(e))
191
192     datadir = os.path.join(runfolder, 'Data')
193
194     logging.info('Searching for runs in ' + datadir)
195     runs = []
196     # scan for firecrest directories
197     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
198         logging.info('Found firecrest in ' + datadir)
199         image_analysis = firecrest.firecrest(firecrest_pathname)
200         if image_analysis is None:
201             logging.warn(
202                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
203             )
204         else:
205             scan_post_image_analysis(
206                 runs, runfolder, image_analysis, firecrest_pathname
207             )
208     # scan for IPAR directories
209     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
210     # The Intensities directory from the RTA software looks a lot like IPAR
211     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
212     for ipar_pathname in ipar_dirs:
213         logging.info('Found ipar directories in ' + datadir)
214         image_analysis = ipar.ipar(ipar_pathname)
215         if image_analysis is None:
216             logging.warn(
217                 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
218             )
219         else:
220             scan_post_image_analysis(
221                 runs, runfolder, image_analysis, ipar_pathname
222             )
223
224     return runs
225
226 def get_specific_run(gerald_dir):
227     """
228     Given a gerald directory, construct a PipelineRun out of its parents
229
230     Basically this allows specifying a particular run instead of the previous
231     get_runs which scans a runfolder for various combinations of
232     firecrest/ipar/bustard/gerald runs.
233     """
234     from htsworkflow.pipelines import firecrest
235     from htsworkflow.pipelines import ipar
236     from htsworkflow.pipelines import bustard
237     from htsworkflow.pipelines import gerald
238
239     gerald_dir = os.path.expanduser(gerald_dir)
240     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
241     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
242
243     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
244    
245     logging.info('--- use-run detected options ---')
246     logging.info('runfolder: %s' % (runfolder_dir,))
247     logging.info('image_dir: %s' % (image_dir,))
248     logging.info('bustard_dir: %s' % (bustard_dir,))
249     logging.info('gerald_dir: %s' % (gerald_dir,))
250
251     # find our processed image dir
252     image_run = None
253     # split into parent, and leaf directory
254     # leaf directory should be an IPAR or firecrest directory
255     data_dir, short_image_dir = os.path.split(image_dir)
256     logging.info('data_dir: %s' % (data_dir,))
257     logging.info('short_iamge_dir: %s' %(short_image_dir,))
258
259     # guess which type of image processing directory we have by looking
260     # in the leaf directory name
261     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
262         image_run = firecrest.firecrest(image_dir)
263     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
264         image_run = ipar.ipar(image_dir)
265     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
266         image_run = ipar.ipar(image_dir)
267
268     # if we din't find a run, report the error and return 
269     if image_run is None:
270         msg = '%s does not contain an image processing step' % (image_dir,)
271         logging.error(msg)
272         return None
273
274     # find our base calling
275     base_calling_run = bustard.bustard(bustard_dir)
276     if base_calling_run is None:
277         logging.error('%s does not contain a bustard run' % (bustard_dir,))
278         return None
279
280     # find alignments
281     gerald_run = gerald.gerald(gerald_dir)
282     if gerald_run is None:
283         logging.error('%s does not contain a gerald run' % (gerald_dir,))
284         return None
285
286     p = PipelineRun(runfolder_dir)
287     p.image_analysis = image_run
288     p.bustard = base_calling_run
289     p.gerald = gerald_run
290     
291     logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
292     return p
293
294 def extract_run_parameters(runs):
295     """
296     Search through runfolder_path for various runs and grab their parameters
297     """
298     for run in runs:
299       run.save()
300
301 def summarize_mapped_reads(genome_map, mapped_reads):
302     """
303     Summarize per chromosome reads into a genome count
304     But handle spike-in/contamination symlinks seperately.
305     """
306     summarized_reads = {}
307     genome_reads = 0
308     genome = 'unknown'
309     for k, v in mapped_reads.items():
310         path, k = os.path.split(k)
311         if len(path) > 0 and not genome_map.has_key(path):
312             genome = path
313             genome_reads += v
314         else:
315             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
316     summarized_reads[genome] = genome_reads
317     return summarized_reads
318
319 def summarize_lane(gerald, lane_id):
320     report = []
321     summary_results = gerald.summary.lane_results
322     for end in range(len(summary_results)):  
323       eland_result = gerald.eland_results.results[end][lane_id]
324       report.append("Sample name %s" % (eland_result.sample_name))
325       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
326       if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
327           cluster = summary_results[end][eland_result.lane_id].cluster
328           report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
329       report.append("Total Reads: %d" % (eland_result.reads))
330
331       if hasattr(eland_result, 'match_codes'):
332           mc = eland_result.match_codes
333           nm = mc['NM']
334           nm_percent = float(nm)/eland_result.reads  * 100
335           qc = mc['QC']
336           qc_percent = float(qc)/eland_result.reads * 100
337
338           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
339           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
340           report.append('Unique (0,1,2 mismatches) %d %d %d' % \
341                         (mc['U0'], mc['U1'], mc['U2']))
342           report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
343                         (mc['R0'], mc['R1'], mc['R2']))
344
345       if hasattr(eland_result, 'genome_map'):
346           report.append("Mapped Reads")
347           mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
348           for name, counts in mapped_reads.items():
349             report.append("  %s: %d" % (name, counts))
350
351       report.append('')
352     return report
353
354 def summary_report(runs):
355     """
356     Summarize cluster numbers and mapped read counts for a runfolder
357     """
358     report = []
359     for run in runs:
360         # print a run name?
361         report.append('Summary for %s' % (run.name,))
362         # sort the report
363         eland_keys = run.gerald.eland_results.results[0].keys()
364         eland_keys.sort(alphanum)
365
366         for lane_id in eland_keys:
367             report.extend(summarize_lane(run.gerald, lane_id))
368             report.append('---')
369             report.append('')
370         return os.linesep.join(report)
371
372 def is_compressed(filename):
373     if os.path.splitext(filename)[1] == ".gz":
374         return True
375     elif os.path.splitext(filename)[1] == '.bz2':
376         return True
377     else:
378         return False
379
380 def save_summary_file(gerald_object, cycle_dir):
381       # Copy Summary.htm
382       summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
383       if os.path.exists(summary_path):
384           logging.info('Copying %s to %s' % (summary_path, cycle_dir))
385           shutil.copy(summary_path, cycle_dir)
386       else:
387           logging.info('Summary file %s was not found' % (summary_path,))
388
389     
390 def compress_score_files(bustard_object, cycle_dir):
391     """
392     Compress score files into our result directory
393     """
394     # check for g.pathname/Temp a new feature of 1.1rc1
395     scores_path = bustard_object.pathname
396     scores_path_temp = os.path.join(scores_path, 'Temp')
397     if os.path.isdir(scores_path_temp):
398         scores_path = scores_path_temp
399
400     # hopefully we have a directory that contains s_*_score files
401     score_files = []
402     for f in os.listdir(scores_path):
403         if re.match('.*_score.txt', f):
404             score_files.append(f)
405
406     tar_cmd = ['tar', 'c'] + score_files
407     bzip_cmd = [ 'bzip2', '-9', '-c' ]
408     tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
409     tar_dest = open(tar_dest_name, 'w')
410     logging.info("Compressing score files from %s" % (scores_path,))
411     logging.info("Running tar: " + " ".join(tar_cmd[:10]))
412     logging.info("Running bzip2: " + " ".join(bzip_cmd))
413     logging.info("Writing to %s" %(tar_dest_name,))
414
415     env = {'BZIP': '-9'}
416     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
417                            cwd=scores_path)
418     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
419     tar.wait()
420
421
422 def compress_eland_results(gerald_object, cycle_dir):
423     """
424     Compress eland result files into the archive directory
425     """
426     # copy & bzip eland files
427     for lanes_dictionary in gerald_object.eland_results.results:
428         for eland_lane in lanes_dictionary.values():
429             source_name = eland_lane.pathname
430             path, name = os.path.split(eland_lane.pathname)
431             dest_name = os.path.join(cycle_dir, name)
432             logging.info("Saving eland file %s to %s" % \
433                          (source_name, dest_name))
434             
435             if is_compressed(name):
436               logging.info('Already compressed, Saving to %s' % (dest_name, ))
437               shutil.copy(source_name, dest_name)
438             else:
439               # not compressed
440               dest_name += '.bz2'
441               args = ['bzip2', '-9', '-c', source_name]
442               logging.info('Running: %s' % ( " ".join(args) ))
443               bzip_dest = open(dest_name, 'w')
444               bzip = subprocess.Popen(args, stdout=bzip_dest)
445               logging.info('Saving to %s' % (dest_name, ))
446               bzip.wait()
447     
448 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
449     """
450     Iterate over runfolders in runs extracting the most useful information.
451       * run parameters (in run-*.xml)
452       * eland_result files
453       * score files
454       * Summary.htm
455       * srf files (raw sequence & qualities)
456     """
457     if output_base_dir is None:
458         output_base_dir = os.getcwd()
459
460     for r in runs:
461       result_dir = os.path.join(output_base_dir, r.flowcell_id)
462       logging.info("Using %s as result directory" % (result_dir,))
463       if not os.path.exists(result_dir):
464         os.mkdir(result_dir)
465
466       # create cycle_dir
467       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
468       logging.info("Filling in %s" % (cycle,))
469       cycle_dir = os.path.join(result_dir, cycle)
470       if os.path.exists(cycle_dir):
471         logging.error("%s already exists, not overwriting" % (cycle_dir,))
472         continue
473       else:
474         os.mkdir(cycle_dir)
475
476       # copy stuff out of the main run
477       g = r.gerald
478
479       # save run file
480       r.save(cycle_dir)
481
482       # save summary file
483       save_summary_file(g, cycle_dir)
484       
485       # tar score files
486       compress_score_files(r.bustard, cycle_dir)
487
488       # compress eland result files
489       compress_eland_results(g, cycle_dir)
490       
491       # build srf commands
492       if site is not None:
493         lanes = range(1,9)
494         run_name = srf.pathname_to_run_name(r.pathname)
495         srf_cmds = srf.make_commands(run_name, lanes, site, cycle_dir)
496         srf.run_srf_commands(r.bustard.pathname, srf_cmds, 2)
497       
498 def rm_list(files, dry_run=True):
499     for f in files:
500         if os.path.exists(f):
501             logging.info('deleting %s' % (f,))
502             if not dry_run:
503                 if os.path.isdir(f):
504                     shutil.rmtree(f)
505                 else:
506                     os.unlink(f)
507         else:
508             logging.warn("%s doesn't exist."% (f,))
509
510 def clean_runs(runs, dry_run=True):
511     """
512     Clean up run folders to optimize for compression.
513     """
514     if dry_run:
515         logging.info('In dry-run mode')
516
517     for run in runs:
518         logging.info('Cleaninging %s' % (run.pathname,))
519         # rm RunLog*.xml
520         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
521         rm_list(runlogs, dry_run)
522         # rm pipeline_*.txt
523         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
524         rm_list(pipeline_logs, dry_run)
525         # rm gclog.txt?
526         # rm NetCopy.log? Isn't this robocopy?
527         logs = glob(os.path.join(run.pathname, '*.log'))
528         rm_list(logs, dry_run)
529         # rm nfn.log?
530         # Calibration
531         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
532         rm_list(calibration_dir, dry_run)
533         # rm Images/L*
534         logging.info("Cleaning images")
535         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
536         rm_list(image_dirs, dry_run)
537         # cd Data/C1-*_Firecrest*
538         logging.info("Cleaning intermediate files")
539         # make clean_intermediate
540         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
541             clean_process = subprocess.Popen(['make', 'clean_intermediate'], 
542                                              cwd=run.image_analysis.pathname,)
543             clean_process.wait()
544
545
546