Add building srf files to runfolder as part of --extract-results
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
1 """
2 Core information needed to inspect a runfolder.
3 """
4 from glob import glob
5 import logging
6 import os
7 import re
8 import shutil
9 import stat
10 import subprocess
11 import sys
12 import time
13
14 try:
15   from xml.etree import ElementTree
16 except ImportError, e:
17   from elementtree import ElementTree
18
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
25
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
28
29 from htsworkflow.pipelines import srf
30
31 class PipelineRun(object):
32     """
33     Capture "interesting" information about a pipeline run
34     """
35     XML_VERSION = 1
36     PIPELINE_RUN = 'PipelineRun'
37     FLOWCELL_ID = 'FlowcellID'
38
39     def __init__(self, pathname=None, xml=None):
40         if pathname is not None:
41           self.pathname = os.path.normpath(pathname)
42         else:
43           self.pathname = None
44         self._name = None
45         self._flowcell_id = None
46         self.image_analysis = None
47         self.bustard = None
48         self.gerald = None
49
50         if xml is not None:
51           self.set_elements(xml)
52
53     def _get_flowcell_id(self):
54         # extract flowcell ID
55         if self._flowcell_id is None:
56           config_dir = os.path.join(self.pathname, 'Config')
57           flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
58           if os.path.exists(flowcell_id_path):
59             flowcell_id_tree = ElementTree.parse(flowcell_id_path)
60             self._flowcell_id = flowcell_id_tree.findtext('Text')
61           else:
62             path_fields = self.pathname.split('_')
63             if len(path_fields) > 0:
64               # guessing last element of filename
65               flowcell_id = path_fields[-1]
66             else:
67               flowcell_id = 'unknown'
68
69             logging.warning(
70               "Flowcell id was not found, guessing %s" % (
71                  flowcell_id))
72             self._flowcell_id = flowcell_id
73         return self._flowcell_id
74     flowcell_id = property(_get_flowcell_id)
75
76     def get_elements(self):
77         """
78         make one master xml file from all of our sub-components.
79         """
80         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
81         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
82         flowcell.text = self.flowcell_id
83         root.append(self.image_analysis.get_elements())
84         root.append(self.bustard.get_elements())
85         root.append(self.gerald.get_elements())
86         return root
87
88     def set_elements(self, tree):
89         # this file gets imported by all the others,
90         # so we need to hide the imports to avoid a cyclic imports
91         from htsworkflow.pipelines import firecrest
92         from htsworkflow.pipelines import ipar
93         from htsworkflow.pipelines import bustard
94         from htsworkflow.pipelines import gerald
95
96         tag = tree.tag.lower()
97         if tag != PipelineRun.PIPELINE_RUN.lower():
98           raise ValueError('Pipeline Run Expecting %s got %s' % (
99               PipelineRun.PIPELINE_RUN, tag))
100         for element in tree:
101           tag = element.tag.lower()
102           if tag == PipelineRun.FLOWCELL_ID.lower():
103             self._flowcell_id = element.text
104           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
105           # you should only have Firecrest or IPAR, never both of them.
106           elif tag == firecrest.Firecrest.FIRECREST.lower():
107             self.image_analysis = firecrest.Firecrest(xml=element)
108           elif tag == ipar.IPAR.IPAR.lower():
109             self.image_analysis = ipar.IPAR(xml=element)
110           elif tag == bustard.Bustard.BUSTARD.lower():
111             self.bustard = bustard.Bustard(xml=element)
112           elif tag == gerald.Gerald.GERALD.lower():
113             self.gerald = gerald.Gerald(xml=element)
114           else:
115             logging.warn('PipelineRun unrecognized tag %s' % (tag,))
116
117     def _get_run_name(self):
118         """
119         Given a run tuple, find the latest date and use that as our name
120         """
121         if self._name is None:
122           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
123           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
124           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
125         return self._name
126     name = property(_get_run_name)
127
128     def save(self, destdir=None):
129         if destdir is None:
130             destdir = ''
131         logging.info("Saving run report "+ self.name)
132         xml = self.get_elements()
133         indent(xml)
134         dest_pathname = os.path.join(destdir, self.name)
135         ElementTree.ElementTree(xml).write(dest_pathname)
136
137     def load(self, filename):
138         logging.info("Loading run report from " + filename)
139         tree = ElementTree.parse(filename).getroot()
140         self.set_elements(tree)
141
142 def load_pipeline_run_xml(pathname):
143     """
144     Load and instantiate a Pipeline run from a run xml file
145
146     :Parameters: 
147       - `pathname` : location of an run xml file
148
149     :Returns: initialized PipelineRun object
150     """
151     tree = ElementTree.parse(pathname).getroot()
152     run = PipelineRun(xml=tree)
153     return run
154
155 def get_runs(runfolder):
156     """
157     Search through a run folder for all the various sub component runs
158     and then return a PipelineRun for each different combination.
159
160     For example if there are two different GERALD runs, this will
161     generate two different PipelineRun objects, that differ
162     in there gerald component.
163     """
164     from htsworkflow.pipelines import firecrest
165     from htsworkflow.pipelines import ipar
166     from htsworkflow.pipelines import bustard
167     from htsworkflow.pipelines import gerald
168
169     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
170         logging.info("Looking for bustard directories in %s" % (pathname,))
171         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
172         # RTA BaseCalls looks enough like Bustard.
173         bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
174         for bustard_pathname in bustard_dirs:
175             logging.info("Found bustard directory %s" % (bustard_pathname,))
176             b = bustard.bustard(bustard_pathname)
177             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
178             logging.info("Looking for gerald directories in %s" % (pathname,))
179             for gerald_pathname in glob(gerald_glob):
180                 logging.info("Found gerald directory %s" % (gerald_pathname,))
181                 try:
182                     g = gerald.gerald(gerald_pathname)
183                     p = PipelineRun(runfolder)
184                     p.image_analysis = image_analysis
185                     p.bustard = b
186                     p.gerald = g
187                     runs.append(p)
188                 except IOError, e:
189                     logging.error("Ignoring " + str(e))
190
191     datadir = os.path.join(runfolder, 'Data')
192
193     logging.info('Searching for runs in ' + datadir)
194     runs = []
195     # scan for firecrest directories
196     for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
197         logging.info('Found firecrest in ' + datadir)
198         image_analysis = firecrest.firecrest(firecrest_pathname)
199         if image_analysis is None:
200             logging.warn(
201                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
202             )
203         else:
204             scan_post_image_analysis(
205                 runs, runfolder, image_analysis, firecrest_pathname
206             )
207     # scan for IPAR directories
208     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
209     # The Intensities directory from the RTA software looks a lot like IPAR
210     ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
211     for ipar_pathname in ipar_dirs:
212         logging.info('Found ipar directories in ' + datadir)
213         image_analysis = ipar.ipar(ipar_pathname)
214         if image_analysis is None:
215             logging.warn(
216                 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
217             )
218         else:
219             scan_post_image_analysis(
220                 runs, runfolder, image_analysis, ipar_pathname
221             )
222
223     return runs
224
225 def get_specific_run(gerald_dir):
226     """
227     Given a gerald directory, construct a PipelineRun out of its parents
228
229     Basically this allows specifying a particular run instead of the previous
230     get_runs which scans a runfolder for various combinations of
231     firecrest/ipar/bustard/gerald runs.
232     """
233     from htsworkflow.pipelines import firecrest
234     from htsworkflow.pipelines import ipar
235     from htsworkflow.pipelines import bustard
236     from htsworkflow.pipelines import gerald
237
238     gerald_dir = os.path.expanduser(gerald_dir)
239     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
240     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
241
242     runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
243    
244     logging.info('--- use-run detected options ---')
245     logging.info('runfolder: %s' % (runfolder_dir,))
246     logging.info('image_dir: %s' % (image_dir,))
247     logging.info('bustard_dir: %s' % (bustard_dir,))
248     logging.info('gerald_dir: %s' % (gerald_dir,))
249
250     # find our processed image dir
251     image_run = None
252     # split into parent, and leaf directory
253     # leaf directory should be an IPAR or firecrest directory
254     data_dir, short_image_dir = os.path.split(image_dir)
255     logging.info('data_dir: %s' % (data_dir,))
256     logging.info('short_iamge_dir: %s' %(short_image_dir,))
257
258     # guess which type of image processing directory we have by looking
259     # in the leaf directory name
260     if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
261         image_run = firecrest.firecrest(image_dir)
262     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
263         image_run = ipar.ipar(image_dir)
264     elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
265         image_run = ipar.ipar(image_dir)
266
267     # if we din't find a run, report the error and return 
268     if image_run is None:
269         msg = '%s does not contain an image processing step' % (image_dir,)
270         logging.error(msg)
271         return None
272
273     # find our base calling
274     base_calling_run = bustard.bustard(bustard_dir)
275     if base_calling_run is None:
276         logging.error('%s does not contain a bustard run' % (bustard_dir,))
277         return None
278
279     # find alignments
280     gerald_run = gerald.gerald(gerald_dir)
281     if gerald_run is None:
282         logging.error('%s does not contain a gerald run' % (gerald_dir,))
283         return None
284
285     p = PipelineRun(runfolder_dir)
286     p.image_analysis = image_run
287     p.bustard = base_calling_run
288     p.gerald = gerald_run
289     
290     logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
291     return p
292
293 def extract_run_parameters(runs):
294     """
295     Search through runfolder_path for various runs and grab their parameters
296     """
297     for run in runs:
298       run.save()
299
300 def summarize_mapped_reads(genome_map, mapped_reads):
301     """
302     Summarize per chromosome reads into a genome count
303     But handle spike-in/contamination symlinks seperately.
304     """
305     summarized_reads = {}
306     genome_reads = 0
307     genome = 'unknown'
308     for k, v in mapped_reads.items():
309         path, k = os.path.split(k)
310         if len(path) > 0 and not genome_map.has_key(path):
311             genome = path
312             genome_reads += v
313         else:
314             summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
315     summarized_reads[genome] = genome_reads
316     return summarized_reads
317
318 def summarize_lane(gerald, lane_id):
319     report = []
320     summary_results = gerald.summary.lane_results
321     for end in range(len(summary_results)):  
322       eland_result = gerald.eland_results.results[end][lane_id]
323       report.append("Sample name %s" % (eland_result.sample_name))
324       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
325       if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
326           cluster = summary_results[end][eland_result.lane_id].cluster
327           report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
328       report.append("Total Reads: %d" % (eland_result.reads))
329
330       if hasattr(eland_result, 'match_codes'):
331           mc = eland_result.match_codes
332           nm = mc['NM']
333           nm_percent = float(nm)/eland_result.reads  * 100
334           qc = mc['QC']
335           qc_percent = float(qc)/eland_result.reads * 100
336
337           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
338           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
339           report.append('Unique (0,1,2 mismatches) %d %d %d' % \
340                         (mc['U0'], mc['U1'], mc['U2']))
341           report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
342                         (mc['R0'], mc['R1'], mc['R2']))
343
344       if hasattr(eland_result, 'genome_map'):
345           report.append("Mapped Reads")
346           mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
347           for name, counts in mapped_reads.items():
348             report.append("  %s: %d" % (name, counts))
349
350       report.append('')
351     return report
352
353 def summary_report(runs):
354     """
355     Summarize cluster numbers and mapped read counts for a runfolder
356     """
357     report = []
358     for run in runs:
359         # print a run name?
360         report.append('Summary for %s' % (run.name,))
361         # sort the report
362         eland_keys = run.gerald.eland_results.results[0].keys()
363         eland_keys.sort(alphanum)
364
365         for lane_id in eland_keys:
366             report.extend(summarize_lane(run.gerald, lane_id))
367             report.append('---')
368             report.append('')
369         return os.linesep.join(report)
370
371 def is_compressed(filename):
372     if os.path.splitext(filename)[1] == ".gz":
373         return True
374     elif os.path.splitext(filename)[1] == '.bz2':
375         return True
376     else:
377         return False
378
379 def save_summary_file(gerald_object, cycle_dir):
380       # Copy Summary.htm
381       summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
382       if os.path.exists(summary_path):
383           logging.info('Copying %s to %s' % (summary_path, cycle_dir))
384           shutil.copy(summary_path, cycle_dir)
385       else:
386           logging.info('Summary file %s was not found' % (summary_path,))
387
388     
389 def compress_score_files(gerald_object, cycle_dir):
390     """
391     Compress score files into our result directory
392     """
393     # check for g.pathname/Temp a new feature of 1.1rc1
394     scores_path = gerald_object.pathname
395     scores_path_temp = os.path.join(scores_path, 'Temp')
396     if os.path.isdir(scores_path_temp):
397         scores_path = scores_path_temp
398
399     # hopefully we have a directory that contains s_*_score files
400     score_files = []
401     for f in os.listdir(scores_path):
402         if re.match('.*_score.txt', f):
403             score_files.append(f)
404
405     tar_cmd = ['/bin/tar', 'c'] + score_files
406     bzip_cmd = [ 'bzip2', '-9', '-c' ]
407     tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
408     tar_dest = open(tar_dest_name, 'w')
409     logging.info("Compressing score files from %s" % (scores_path,))
410     logging.info("Running tar: " + " ".join(tar_cmd[:10]))
411     logging.info("Running bzip2: " + " ".join(bzip_cmd))
412     logging.info("Writing to %s" %(tar_dest_name))
413
414     env = {'BZIP': '-9'}
415     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
416                            cwd=scores_path)
417     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
418     tar.wait()
419
420 def compress_eland_results(gerald_object, cycle_dir):
421     """
422     Compress eland result files into the archive directory
423     """
424     # copy & bzip eland files
425     for lanes_dictionary in gerald_object.eland_results.results:
426         for eland_lane in lanes_dictionary.values():
427             source_name = eland_lane.pathname
428             path, name = os.path.split(eland_lane.pathname)
429             dest_name = os.path.join(cycle_dir, name)
430             logging.info("Saving eland file %s to %s" % \
431                          (source_name, dest_name))
432             
433             if is_compressed(name):
434               logging.info('Already compressed, Saving to %s' % (dest_name, ))
435               shutil.copy(source_name, dest_name)
436             else:
437               # not compressed
438               dest_name += '.bz2'
439               args = ['bzip2', '-9', '-c', source_name]
440               logging.info('Running: %s' % ( " ".join(args) ))
441               bzip_dest = open(dest_name, 'w')
442               bzip = subprocess.Popen(args, stdout=bzip_dest)
443               logging.info('Saving to %s' % (dest_name, ))
444               bzip.wait()
445     
446 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
447     if output_base_dir is None:
448         output_base_dir = os.getcwd()
449
450     for r in runs:
451       result_dir = os.path.join(output_base_dir, r.flowcell_id)
452       logging.info("Using %s as result directory" % (result_dir,))
453       if not os.path.exists(result_dir):
454         os.mkdir(result_dir)
455
456       # create cycle_dir
457       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
458       logging.info("Filling in %s" % (cycle,))
459       cycle_dir = os.path.join(result_dir, cycle)
460       if os.path.exists(cycle_dir):
461         logging.error("%s already exists, not overwriting" % (cycle_dir,))
462         continue
463       else:
464         os.mkdir(cycle_dir)
465
466       # copy stuff out of the main run
467       g = r.gerald
468
469       # save run file
470       r.save(cycle_dir)
471
472       # save summary file
473       save_summary_file(g, cycle_dir)
474       
475       # tar score files
476       compress_score_files(g, cycle_dir)
477
478       # compress eland result files
479       compress_eland_results(g, cycle_dir)
480       
481       # build srf commands
482       lanes = range(1,9)
483       srf_cmds = srf.make_commands(r.pathname, lanes, "woldlab", cycle_dir)
484       srf.run_srf_commands(r.bustard.pathname, srf_cmds, 2)
485       
486 def rm_list(files, dry_run=True):
487     for f in files:
488         if os.path.exists(f):
489             logging.info('deleting %s' % (f,))
490             if not dry_run:
491                 if os.path.isdir(f):
492                     shutil.rmtree(f)
493                 else:
494                     os.unlink(f)
495         else:
496             logging.warn("%s doesn't exist."% (f,))
497
498 def clean_runs(runs, dry_run=True):
499     """
500     Clean up run folders to optimize for compression.
501     """
502     if dry_run:
503         logging.info('In dry-run mode')
504
505     for run in runs:
506         logging.info('Cleaninging %s' % (run.pathname,))
507         # rm RunLog*.xml
508         runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
509         rm_list(runlogs, dry_run)
510         # rm pipeline_*.txt
511         pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
512         rm_list(pipeline_logs, dry_run)
513         # rm gclog.txt?
514         # rm NetCopy.log? Isn't this robocopy?
515         logs = glob(os.path.join(run.pathname, '*.log'))
516         rm_list(logs, dry_run)
517         # rm nfn.log?
518         # Calibration
519         calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
520         rm_list(calibration_dir, dry_run)
521         # rm Images/L*
522         logging.info("Cleaning images")
523         image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
524         rm_list(image_dirs, dry_run)
525         # cd Data/C1-*_Firecrest*
526         logging.info("Cleaning intermediate files")
527         # make clean_intermediate
528         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
529             clean_process = subprocess.Popen(['make', 'clean_intermediate'], 
530                                              cwd=run.image_analysis.pathname,)
531             clean_process.wait()
532
533
534