Modify the srf utility to tar.bz2 the qseq files instead of the using
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import tarfile
13 import time
14
15 try:
16   from xml.etree import ElementTree
17 except ImportError, e:
18   from elementtree import ElementTree
19
20 EUROPEAN_STRPTIME = "%d-%m-%Y"
21 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
22 VERSION_RE = "([0-9\.]+)"
23 USER_RE = "([a-zA-Z0-9]+)"
24 LANES_PER_FLOWCELL = 8
25 LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
26
27 from htsworkflow.util.alphanum import alphanum
28 from htsworkflow.util.ethelp import indent, flatten
29 from htsworkflow.util.queuecommands import QueueCommands
30
31 from htsworkflow.pipelines import srf
32
33 class PipelineRun(object):
34     """
35     Capture "interesting" information about a pipeline run
36     """
37     XML_VERSION = 1
38     PIPELINE_RUN = 'PipelineRun'
39     FLOWCELL_ID = 'FlowcellID'
40
41     def __init__(self, pathname=None, xml=None):
42         if pathname is not None:
43           self.pathname = os.path.normpath(pathname)
44         else:
45           self.pathname = None
46         self._name = None
47         self._flowcell_id = None
48         self.image_analysis = None
49         self.bustard = None
50         self.gerald = None
51
52         if xml is not None:
53           self.set_elements(xml)
54
55     def _get_flowcell_id(self):
56         # extract flowcell ID
57         if self._flowcell_id is None:
58           config_dir = os.path.join(self.pathname, 'Config')
59           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
60           if os.path.exists(flowcell_id_path):
61             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
62             self._flowcell_id = flowcell_id_tree.findtext('Text')
63           else:
64             path_fields = self.pathname.split('_')
65             if len(path_fields) > 0:
66               # guessing last element of filename
67               flowcell_id = path_fields[-1]
68             else:
69               flowcell_id = 'unknown'
70
71             logging.warning(
72               "Flowcell id was not found, guessing %s" % (
73                  flowcell_id))
74             self._flowcell_id = flowcell_id
75         return self._flowcell_id
76     flowcell_id = property(_get_flowcell_id)
77
78     def get_elements(self):
79         """
80         make one master xml file from all of our sub-components.
81         """
82         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
83         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
84         flowcell.text = self.flowcell_id
85         root.append(self.image_analysis.get_elements())
86         root.append(self.bustard.get_elements())
87         root.append(self.gerald.get_elements())
88         return root
89
90     def set_elements(self, tree):
91         # this file gets imported by all the others,
92         # so we need to hide the imports to avoid a cyclic imports
93         from htsworkflow.pipelines import firecrest
94         from htsworkflow.pipelines import ipar
95         from htsworkflow.pipelines import bustard
96         from htsworkflow.pipelines import gerald
97
98         tag = tree.tag.lower()
99         if tag != PipelineRun.PIPELINE_RUN.lower():
100           raise ValueError('Pipeline Run Expecting %s got %s' % (
101               PipelineRun.PIPELINE_RUN, tag))
102         for element in tree:
103           tag = element.tag.lower()
104           if tag == PipelineRun.FLOWCELL_ID.lower():
105             self._flowcell_id = element.text
106           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
107           # you should only have Firecrest or IPAR, never both of them.
108           elif tag == firecrest.Firecrest.FIRECREST.lower():
109             self.image_analysis = firecrest.Firecrest(xml=element)
110           elif tag == ipar.IPAR.IPAR.lower():
111             self.image_analysis = ipar.IPAR(xml=element)
112           elif tag == bustard.Bustard.BUSTARD.lower():
113             self.bustard = bustard.Bustard(xml=element)
114           elif tag == gerald.Gerald.GERALD.lower():
115             self.gerald = gerald.Gerald(xml=element)
116           else:
117             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
118
119     def _get_run_name(self):
120         """
121         Given a run tuple, find the latest date and use that as our name
122         """
123         if self._name is None:
124           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
125           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
126           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
127         return self._name
128     name = property(_get_run_name)
129
130     def save(self, destdir=None):
131         if destdir is None:
132             destdir = ''
133         logging.info("Saving run report "+ self.name)
134         xml = self.get_elements()
135         indent(xml)
136         dest_pathname = os.path.join(destdir, self.name)
137         ElementTree.ElementTree(xml).write(dest_pathname)
138
139     def load(self, filename):
140         logging.info("Loading run report from " + filename)
141         tree = ElementTree.parse(filename).getroot()
142         self.set_elements(tree)
143
144 def load_pipeline_run_xml(pathname):
145     """
146     Load and instantiate a Pipeline run from a run xml file
147
148     :Parameters: 
149       - `pathname` : location of an run xml file
150
151     :Returns: initialized PipelineRun object
152     """
153     tree = ElementTree.parse(pathname).getroot()
154     run = PipelineRun(xml=tree)
155     return run
156
157 def get_runs(runfolder):
158     """
159     Search through a run folder for all the various sub component runs
160     and then return a PipelineRun for each different combination.
161
162     For example if there are two different GERALD runs, this will
163     generate two different PipelineRun objects, that differ
164     in there gerald component.
165     """
166     from htsworkflow.pipelines import firecrest
167     from htsworkflow.pipelines import ipar
168     from htsworkflow.pipelines import bustard
169     from htsworkflow.pipelines import gerald
170
171     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
172         logging.info("Looking for bustard directories in %s" % (pathname,))
173         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
174         # RTA BaseCalls looks enough like Bustard.
175         bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
176         for bustard_pathname in bustard_dirs:
177             logging.info("Found bustard directory %s" % (bustard_pathname,))
178             b = bustard.bustard(bustard_pathname)
179             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
180             logging.info("Looking for gerald directories in %s" % (pathname,))
181             for gerald_pathname in glob(gerald_glob):
182                 logging.info("Found gerald directory %s" % (gerald_pathname,))
183                 try:
184                     g = gerald.gerald(gerald_pathname)
185                     p = PipelineRun(runfolder)
186                     p.image_analysis = image_analysis
187                     p.bustard = b
188                     p.gerald = g
189                     runs.append(p)
190                 except IOError, e:
191                     logging.error("Ignoring " + str(e))
192
193     datadir = os.path.join(runfolder, 'Data')
194
195     logging.info('Searching for runs in ' + datadir)
196     runs = []
197     # scan for firecrest directories
198     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
199         logging.info('Found firecrest in ' + datadir)
200         image_analysis = firecrest.firecrest(firecrest_pathname)
201         if image_analysis is None:
202             logging.warn(
203                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
204             )
205         else:
206             scan_post_image_analysis(
207                 runs, runfolder, image_analysis, firecrest_pathname
208             )
209     # scan for IPAR directories
210     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
211     # The Intensities directory from the RTA software looks a lot like IPAR
212     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
213     for ipar_pathname in ipar_dirs:
214         logging.info('Found ipar directories in ' + datadir)
215         image_analysis = ipar.ipar(ipar_pathname)
216         if image_analysis is None:
217             logging.warn(
218                 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
219             )
220         else:
221             scan_post_image_analysis(
222                 runs, runfolder, image_analysis, ipar_pathname
223             )
224
225     return runs
226
227 def get_specific_run(gerald_dir):
228     """
229     Given a gerald directory, construct a PipelineRun out of its parents
230
231     Basically this allows specifying a particular run instead of the previous
232     get_runs which scans a runfolder for various combinations of
233     firecrest/ipar/bustard/gerald runs.
234     """
235     from htsworkflow.pipelines import firecrest
236     from htsworkflow.pipelines import ipar
237     from htsworkflow.pipelines import bustard
238     from htsworkflow.pipelines import gerald
239
240     gerald_dir = os.path.expanduser(gerald_dir)
241     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
242     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
243
244     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
245    
246     logging.info('--- use-run detected options ---')
247     logging.info('runfolder: %s' % (runfolder_dir,))
248     logging.info('image_dir: %s' % (image_dir,))
249     logging.info('bustard_dir: %s' % (bustard_dir,))
250     logging.info('gerald_dir: %s' % (gerald_dir,))
251
252     # find our processed image dir
253     image_run = None
254     # split into parent, and leaf directory
255     # leaf directory should be an IPAR or firecrest directory
256     data_dir, short_image_dir = os.path.split(image_dir)
257     logging.info('data_dir: %s' % (data_dir,))
258     logging.info('short_iamge_dir: %s' %(short_image_dir,))
259
260     # guess which type of image processing directory we have by looking
261     # in the leaf directory name
262     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
263         image_run = firecrest.firecrest(image_dir)
264     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
265         image_run = ipar.ipar(image_dir)
266     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
267         image_run = ipar.ipar(image_dir)
268
269     # if we din't find a run, report the error and return 
270     if image_run is None:
271         msg = '%s does not contain an image processing step' % (image_dir,)
272         logging.error(msg)
273         return None
274
275     # find our base calling
276     base_calling_run = bustard.bustard(bustard_dir)
277     if base_calling_run is None:
278         logging.error('%s does not contain a bustard run' % (bustard_dir,))
279         return None
280
281     # find alignments
282     gerald_run = gerald.gerald(gerald_dir)
283     if gerald_run is None:
284         logging.error('%s does not contain a gerald run' % (gerald_dir,))
285         return None
286
287     p = PipelineRun(runfolder_dir)
288     p.image_analysis = image_run
289     p.bustard = base_calling_run
290     p.gerald = gerald_run
291     
292     logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
293     return p
294
295 def extract_run_parameters(runs):
296     """
297     Search through runfolder_path for various runs and grab their parameters
298     """
299     for run in runs:
300       run.save()
301
302 def summarize_mapped_reads(genome_map, mapped_reads):
303     """
304     Summarize per chromosome reads into a genome count
305     But handle spike-in/contamination symlinks seperately.
306     """
307     summarized_reads = {}
308     genome_reads = 0
309     genome = 'unknown'
310     for k, v in mapped_reads.items():
311         path, k = os.path.split(k)
312         if len(path) > 0 and not genome_map.has_key(path):
313             genome = path
314             genome_reads += v
315         else:
316             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
317     summarized_reads[genome] = genome_reads
318     return summarized_reads
319
320 def summarize_lane(gerald, lane_id):
321     report = []
322     summary_results = gerald.summary.lane_results
323     for end in range(len(summary_results)):  
324       eland_result = gerald.eland_results.results[end][lane_id]
325       report.append("Sample name %s" % (eland_result.sample_name))
326       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
327       if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
328           cluster = summary_results[end][eland_result.lane_id].cluster
329           report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
330       report.append("Total Reads: %d" % (eland_result.reads))
331
332       if hasattr(eland_result, 'match_codes'):
333           mc = eland_result.match_codes
334           nm = mc['NM']
335           nm_percent = float(nm)/eland_result.reads  * 100
336           qc = mc['QC']
337           qc_percent = float(qc)/eland_result.reads * 100
338
339           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
340           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
341           report.append('Unique (0,1,2 mismatches) %d %d %d' % \
342                         (mc['U0'], mc['U1'], mc['U2']))
343           report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
344                         (mc['R0'], mc['R1'], mc['R2']))
345
346       if hasattr(eland_result, 'genome_map'):
347           report.append("Mapped Reads")
348           mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
349           for name, counts in mapped_reads.items():
350             report.append("  %s: %d" % (name, counts))
351
352       report.append('')
353     return report
354
355 def summary_report(runs):
356     """
357     Summarize cluster numbers and mapped read counts for a runfolder
358     """
359     report = []
360     for run in runs:
361         # print a run name?
362         report.append('Summary for %s' % (run.name,))
363         # sort the report
364         eland_keys = run.gerald.eland_results.results[0].keys()
365         eland_keys.sort(alphanum)
366
367         for lane_id in eland_keys:
368             report.extend(summarize_lane(run.gerald, lane_id))
369             report.append('---')
370             report.append('')
371         return os.linesep.join(report)
372
373 def is_compressed(filename):
374     if os.path.splitext(filename)[1] == ".gz":
375         return True
376     elif os.path.splitext(filename)[1] == '.bz2':
377         return True
378     else:
379         return False
380
381 def save_flowcell_reports(data_dir, cycle_dir):
382     """
383     Save the flowcell quality reports
384     """
385     data_dir = os.path.abspath(data_dir)
386     status_file = os.path.join(data_dir, 'Status.xml')
387     reports_dir = os.path.join(data_dir, 'reports')
388     reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
389     if os.path.exists(reports_dir):
390         cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
391         if os.path.exists(status_file):
392             cmd_list.extend(['Status.xml', 'Status.xsl'])
393         logging.info("Saving reports from " + reports_dir)
394         cwd = os.getcwd()
395         os.chdir(data_dir)
396         q = QueueCommands([" ".join(cmd_list)])
397         q.run()
398         os.chdir(cwd)
399
400
401 def save_summary_file(gerald_object, cycle_dir):
402     # Copy Summary.htm
403     summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
404     if os.path.exists(summary_path):
405         logging.info('Copying %s to %s' % (summary_path, cycle_dir))
406         shutil.copy(summary_path, cycle_dir)
407     else:
408         logging.info('Summary file %s was not found' % (summary_path,))
409
410 def save_ivc_plot(bustard_object, cycle_dir):
411     """
412     Save the IVC page and its supporting images
413     """
414     plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
415     plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
416     plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
417
418     plot_target_path = os.path.join(cycle_dir, 'Plots')
419
420     if os.path.exists(plot_html):
421         logging.debug("Saving %s" % ( plot_html, ))
422         logging.debug("Saving %s" % ( plot_images, ))
423         shutil.copy(plot_html, cycle_dir)
424         if not os.path.exists(plot_target_path):
425             os.mkdir(plot_target_path)    
426         for plot_file in glob(plot_images):
427             shutil.copy(plot_file, plot_target_path)
428     else:
429         logging.warning('Missing IVC.html file, not archiving')
430
431
432 def compress_score_files(bustard_object, cycle_dir):
433     """
434     Compress score files into our result directory
435     """
436     # check for g.pathname/Temp a new feature of 1.1rc1
437     scores_path = bustard_object.pathname
438     scores_path_temp = os.path.join(scores_path, 'Temp')
439     if os.path.isdir(scores_path_temp):
440         scores_path = scores_path_temp
441
442     # hopefully we have a directory that contains s_*_score files
443     score_files = []
444     for f in os.listdir(scores_path):
445         if re.match('.*_score.txt', f):
446             score_files.append(f)
447
448     tar_cmd = ['tar', 'c'] + score_files
449     bzip_cmd = [ 'bzip2', '-9', '-c' ]
450     tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
451     tar_dest = open(tar_dest_name, 'w')
452     logging.info("Compressing score files from %s" % (scores_path,))
453     logging.info("Running tar: " + " ".join(tar_cmd[:10]))
454     logging.info("Running bzip2: " + " ".join(bzip_cmd))
455     logging.info("Writing to %s" %(tar_dest_name,))
456
457     env = {'BZIP': '-9'}
458     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
459                            cwd=scores_path)
460     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
461     tar.wait()
462
463
464 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
465     """
466     Compress eland result files into the archive directory
467     """
468     # copy & bzip eland files
469     bz_commands = []
470     
471     for lanes_dictionary in gerald_object.eland_results.results:
472         for eland_lane in lanes_dictionary.values():
473             source_name = eland_lane.pathname
474             path, name = os.path.split(eland_lane.pathname)
475             dest_name = os.path.join(cycle_dir, name)
476             logging.info("Saving eland file %s to %s" % \
477                          (source_name, dest_name))
478             
479             if is_compressed(name):
480               logging.info('Already compressed, Saving to %s' % (dest_name, ))
481               shutil.copy(source_name, dest_name)
482             else:
483               # not compressed
484               dest_name += '.bz2'
485               args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
486               bz_commands.append(" ".join(args))
487               #logging.info('Running: %s' % ( " ".join(args) ))
488               #bzip_dest = open(dest_name, 'w')
489               #bzip = subprocess.Popen(args, stdout=bzip_dest)
490               #logging.info('Saving to %s' % (dest_name, ))
491               #bzip.wait()
492               
493     if len(bz_commands) > 0:
494       q = QueueCommands(bz_commands, num_jobs)
495       q.run()
496       
497     
498 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
499     """
500     Iterate over runfolders in runs extracting the most useful information.
501       * run parameters (in run-*.xml)
502       * eland_result files
503       * score files
504       * Summary.htm
505       * srf files (raw sequence & qualities)
506     """
507     if output_base_dir is None:
508         output_base_dir = os.getcwd()
509
510     for r in runs:
511       result_dir = os.path.join(output_base_dir, r.flowcell_id)
512       logging.info("Using %s as result directory" % (result_dir,))
513       if not os.path.exists(result_dir):
514         os.mkdir(result_dir)
515
516       # create cycle_dir
517       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
518       logging.info("Filling in %s" % (cycle,))
519       cycle_dir = os.path.join(result_dir, cycle)
520       if os.path.exists(cycle_dir):
521         logging.error("%s already exists, not overwriting" % (cycle_dir,))
522         continue
523       else:
524         os.mkdir(cycle_dir)
525
526       # save run file
527       r.save(cycle_dir)
528
529       # save illumina flowcell status report
530       save_flowcell_reports( os.path.join(r.image_analysis.pathname, '..'), cycle_dir )
531       
532       # save stuff from bustard
533       # grab IVC plot
534       save_ivc_plot(r.bustard, cycle_dir)
535
536       # build base call saving commands
537       if site is not None:
538         lanes = range(1,9)
539         run_name = srf.pathname_to_run_name(r.pathname)
540         srf_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
541         srf.run_commands(r.bustard.pathname, srf_cmds, num_jobs)
542
543       # save stuff from GERALD
544       # copy stuff out of the main run
545       g = r.gerald
546
547       # save summary file
548       save_summary_file(g, cycle_dir)
549       
550       # compress eland result files
551       compress_eland_results(g, cycle_dir, num_jobs)
552       
553       # md5 all the compressed files once we're done
554       md5_commands = srf.make_md5_commands(cycle_dir)
555       srf.run_commands(cycle_dir, md5_commands, num_jobs)
556       
557 def rm_list(files, dry_run=True):
558     for f in files:
559         if os.path.exists(f):
560             logging.info('deleting %s' % (f,))
561             if not dry_run:
562                 if os.path.isdir(f):
563                     shutil.rmtree(f)
564                 else:
565                     os.unlink(f)
566         else:
567             logging.warn("%s doesn't exist."% (f,))
568
569 def clean_runs(runs, dry_run=True):
570     """
571     Clean up run folders to optimize for compression.
572     """
573     if dry_run:
574         logging.info('In dry-run mode')
575
576     for run in runs:
577         logging.info('Cleaninging %s' % (run.pathname,))
578         # rm RunLog*.xml
579         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
580         rm_list(runlogs, dry_run)
581         # rm pipeline_*.txt
582         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
583         rm_list(pipeline_logs, dry_run)
584         # rm gclog.txt?
585         # rm NetCopy.log? Isn't this robocopy?
586         logs = glob(os.path.join(run.pathname, '*.log'))
587         rm_list(logs, dry_run)
588         # rm nfn.log?
589         # Calibration
590         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
591         rm_list(calibration_dir, dry_run)
592         # rm Images/L*
593         logging.info("Cleaning images")
594         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
595         rm_list(image_dirs, dry_run)
596         # cd Data/C1-*_Firecrest*
597         logging.info("Cleaning intermediate files")
598         # make clean_intermediate
599         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
600             clean_process = subprocess.Popen(['make', 'clean_intermediate'], 
601                                              cwd=run.image_analysis.pathname,)
602             clean_process.wait()
603
604
605