Test the updated version of extract results that builds srf files.
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
index 097a61732884a7d2b90983cf01ff8ed61000e295..d628f9bf1623ce20f6fabec3d0206702e2197341 100644 (file)
@@ -9,6 +9,7 @@ import shutil
 import stat
 import subprocess
 import sys
+import tarfile
 import time
 
 try:
@@ -21,10 +22,13 @@ EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
 VERSION_RE = "([0-9\.]+)"
 USER_RE = "([a-zA-Z0-9]+)"
 LANES_PER_FLOWCELL = 8
+LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
 
 from htsworkflow.util.alphanum import alphanum
 from htsworkflow.util.ethelp import indent, flatten
 
+from htsworkflow.pipelines import srf
+
 class PipelineRun(object):
     """
     Capture "interesting" information about a pipeline run
@@ -165,8 +169,10 @@ def get_runs(runfolder):
 
     def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
         logging.info("Looking for bustard directories in %s" % (pathname,))
-        bustard_glob = os.path.join(pathname, "Bustard*")
-        for bustard_pathname in glob(bustard_glob):
+        bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
+        # RTA BaseCalls looks enough like Bustard.
+        bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
+        for bustard_pathname in bustard_dirs:
             logging.info("Found bustard directory %s" % (bustard_pathname,))
             b = bustard.bustard(bustard_pathname)
             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
@@ -192,22 +198,25 @@ def get_runs(runfolder):
         logging.info('Found firecrest in ' + datadir)
         image_analysis = firecrest.firecrest(firecrest_pathname)
         if image_analysis is None:
-           logging.warn(
+            logging.warn(
                 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
             )
-       else:
+        else:
             scan_post_image_analysis(
                 runs, runfolder, image_analysis, firecrest_pathname
             )
     # scan for IPAR directories
-    for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
+    ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
+    # The Intensities directory from the RTA software looks a lot like IPAR
+    ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
+    for ipar_pathname in ipar_dirs:
         logging.info('Found ipar directories in ' + datadir)
         image_analysis = ipar.ipar(ipar_pathname)
         if image_analysis is None:
-           logging.warn(
+            logging.warn(
                 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
             )
-       else:
+        else:
             scan_post_image_analysis(
                 runs, runfolder, image_analysis, ipar_pathname
             )
@@ -227,6 +236,7 @@ def get_specific_run(gerald_dir):
     from htsworkflow.pipelines import bustard
     from htsworkflow.pipelines import gerald
 
+    gerald_dir = os.path.expanduser(gerald_dir)
     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
 
@@ -252,6 +262,9 @@ def get_specific_run(gerald_dir):
         image_run = firecrest.firecrest(image_dir)
     elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
         image_run = ipar.ipar(image_dir)
+    elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
+        image_run = ipar.ipar(image_dir)
+
     # if we din't find a run, report the error and return 
     if image_run is None:
         msg = '%s does not contain an image processing step' % (image_dir,)
@@ -310,25 +323,31 @@ def summarize_lane(gerald, lane_id):
       eland_result = gerald.eland_results.results[end][lane_id]
       report.append("Sample name %s" % (eland_result.sample_name))
       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
-      cluster = summary_results[end][eland_result.lane_id].cluster
-      report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
+      if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
+          cluster = summary_results[end][eland_result.lane_id].cluster
+          report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
       report.append("Total Reads: %d" % (eland_result.reads))
-      mc = eland_result._match_codes
-      nm = mc['NM']
-      nm_percent = float(nm)/eland_result.reads  * 100
-      qc = mc['QC']
-      qc_percent = float(qc)/eland_result.reads * 100
-
-      report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
-      report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
-      report.append('Unique (0,1,2 mismatches) %d %d %d' % \
-                    (mc['U0'], mc['U1'], mc['U2']))
-      report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
-                    (mc['R0'], mc['R1'], mc['R2']))
-      report.append("Mapped Reads")
-      mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
-      for name, counts in mapped_reads.items():
-        report.append("  %s: %d" % (name, counts))
+
+      if hasattr(eland_result, 'match_codes'):
+          mc = eland_result.match_codes
+          nm = mc['NM']
+          nm_percent = float(nm)/eland_result.reads  * 100
+          qc = mc['QC']
+          qc_percent = float(qc)/eland_result.reads * 100
+
+          report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
+          report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
+          report.append('Unique (0,1,2 mismatches) %d %d %d' % \
+                        (mc['U0'], mc['U1'], mc['U2']))
+          report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
+                        (mc['R0'], mc['R1'], mc['R2']))
+
+      if hasattr(eland_result, 'genome_map'):
+          report.append("Mapped Reads")
+          mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
+          for name, counts in mapped_reads.items():
+            report.append("  %s: %d" % (name, counts))
+
       report.append('')
     return report
 
@@ -358,7 +377,83 @@ def is_compressed(filename):
     else:
         return False
 
-def extract_results(runs, output_base_dir=None):
+def save_summary_file(gerald_object, cycle_dir):
+      # Copy Summary.htm
+      summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
+      if os.path.exists(summary_path):
+          logging.info('Copying %s to %s' % (summary_path, cycle_dir))
+          shutil.copy(summary_path, cycle_dir)
+      else:
+          logging.info('Summary file %s was not found' % (summary_path,))
+
+    
+def compress_score_files(bustard_object, cycle_dir):
+    """
+    Compress score files into our result directory
+    """
+    # check for g.pathname/Temp a new feature of 1.1rc1
+    scores_path = bustard_object.pathname
+    scores_path_temp = os.path.join(scores_path, 'Temp')
+    if os.path.isdir(scores_path_temp):
+        scores_path = scores_path_temp
+
+    # hopefully we have a directory that contains s_*_score files
+    score_files = []
+    for f in os.listdir(scores_path):
+        if re.match('.*_score.txt', f):
+            score_files.append(f)
+
+    tar_cmd = ['tar', 'c'] + score_files
+    bzip_cmd = [ 'bzip2', '-9', '-c' ]
+    tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
+    tar_dest = open(tar_dest_name, 'w')
+    logging.info("Compressing score files from %s" % (scores_path,))
+    logging.info("Running tar: " + " ".join(tar_cmd[:10]))
+    logging.info("Running bzip2: " + " ".join(bzip_cmd))
+    logging.info("Writing to %s" %(tar_dest_name,))
+
+    env = {'BZIP': '-9'}
+    tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
+                           cwd=scores_path)
+    bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
+    tar.wait()
+
+
+def compress_eland_results(gerald_object, cycle_dir):
+    """
+    Compress eland result files into the archive directory
+    """
+    # copy & bzip eland files
+    for lanes_dictionary in gerald_object.eland_results.results:
+        for eland_lane in lanes_dictionary.values():
+            source_name = eland_lane.pathname
+            path, name = os.path.split(eland_lane.pathname)
+            dest_name = os.path.join(cycle_dir, name)
+            logging.info("Saving eland file %s to %s" % \
+                         (source_name, dest_name))
+            
+            if is_compressed(name):
+              logging.info('Already compressed, Saving to %s' % (dest_name, ))
+              shutil.copy(source_name, dest_name)
+            else:
+              # not compressed
+              dest_name += '.bz2'
+              args = ['bzip2', '-9', '-c', source_name]
+              logging.info('Running: %s' % ( " ".join(args) ))
+              bzip_dest = open(dest_name, 'w')
+              bzip = subprocess.Popen(args, stdout=bzip_dest)
+              logging.info('Saving to %s' % (dest_name, ))
+              bzip.wait()
+    
+def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
+    """
+    Iterate over runfolders in runs extracting the most useful information.
+      * run parameters (in run-*.xml)
+      * eland_result files
+      * score files
+      * Summary.htm
+      * srf files (raw sequence & qualities)
+    """
     if output_base_dir is None:
         output_base_dir = os.getcwd()
 
@@ -384,76 +479,67 @@ def extract_results(runs, output_base_dir=None):
       # save run file
       r.save(cycle_dir)
 
-      # Copy Summary.htm
-      summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
-      if os.path.exists(summary_path):
-          logging.info('Copying %s to %s' % (summary_path, cycle_dir))
-          shutil.copy(summary_path, cycle_dir)
-      else:
-          logging.info('Summary file %s was not found' % (summary_path,))
-
+      # save summary file
+      save_summary_file(g, cycle_dir)
+      
       # tar score files
-      score_files = []
-
-      # check for g.pathname/Temp a new feature of 1.1rc1
-      scores_path = g.pathname
-      scores_path_temp = os.path.join(scores_path, 'Temp')
-      if os.path.isdir(scores_path_temp):
-          scores_path = scores_path_temp
-
-      # hopefully we have a directory that contains s_*_score files
-      for f in os.listdir(scores_path):
-          if re.match('.*_score.txt', f):
-              score_files.append(f)
-
-      tar_cmd = ['/bin/tar', 'c'] + score_files
-      bzip_cmd = [ 'bzip2', '-9', '-c' ]
-      tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
-      tar_dest = open(tar_dest_name, 'w')
-      logging.info("Compressing score files from %s" % (scores_path,))
-      logging.info("Running tar: " + " ".join(tar_cmd[:10]))
-      logging.info("Running bzip2: " + " ".join(bzip_cmd))
-      logging.info("Writing to %s" %(tar_dest_name))
-
-      tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, 
-                             cwd=scores_path)
-      bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
-      tar.wait()
-
-      # copy & bzip eland files
-      for lanes_dictionary in g.eland_results.results:
-          for eland_lane in lanes_dictionary.values():
-              source_name = eland_lane.pathname
-              path, name = os.path.split(eland_lane.pathname)
-              dest_name = os.path.join(cycle_dir, name)
-             logging.info("Saving eland file %s to %s" % \
-                          (source_name, dest_name))
-
-              if is_compressed(name):
-                logging.info('Already compressed, Saving to %s' % (dest_name, ))
-                shutil.copy(source_name, dest_name)
-              else:
-                # not compressed
-                dest_name += '.bz2'
-                args = ['bzip2', '-9', '-c', source_name]
-                logging.info('Running: %s' % ( " ".join(args) ))
-                bzip_dest = open(dest_name, 'w')
-                bzip = subprocess.Popen(args, stdout=bzip_dest)
-                logging.info('Saving to %s' % (dest_name, ))
-                bzip.wait()
-
-def clean_runs(runs):
+      compress_score_files(r.bustard, cycle_dir)
+
+      # compress eland result files
+      compress_eland_results(g, cycle_dir)
+      
+      # build srf commands
+      lanes = range(1,9)
+      run_name = srf.pathname_to_run_name(r.pathname)
+      srf_cmds = srf.make_commands(run_name, lanes, site, cycle_dir)
+      srf.run_srf_commands(r.bustard.pathname, srf_cmds, 2)
+      
+def rm_list(files, dry_run=True):
+    for f in files:
+        if os.path.exists(f):
+            logging.info('deleting %s' % (f,))
+            if not dry_run:
+                if os.path.isdir(f):
+                    shutil.rmtree(f)
+                else:
+                    os.unlink(f)
+        else:
+            logging.warn("%s doesn't exist."% (f,))
+
+def clean_runs(runs, dry_run=True):
     """
     Clean up run folders to optimize for compression.
     """
-    # TODO: implement this.
-    # rm RunLog*.xml
-    # rm pipeline_*.txt
-    # rm gclog.txt
-    # rm NetCopy.log
-    # rm nfn.log
-    # rm Images/L*
-    # cd Data/C1-*_Firecrest*
-    # make clean_intermediate
-
-    pass
+    if dry_run:
+        logging.info('In dry-run mode')
+
+    for run in runs:
+        logging.info('Cleaninging %s' % (run.pathname,))
+        # rm RunLog*.xml
+        runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
+        rm_list(runlogs, dry_run)
+        # rm pipeline_*.txt
+        pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
+        rm_list(pipeline_logs, dry_run)
+        # rm gclog.txt?
+        # rm NetCopy.log? Isn't this robocopy?
+        logs = glob(os.path.join(run.pathname, '*.log'))
+        rm_list(logs, dry_run)
+        # rm nfn.log?
+        # Calibration
+        calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
+        rm_list(calibration_dir, dry_run)
+        # rm Images/L*
+        logging.info("Cleaning images")
+        image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
+        rm_list(image_dirs, dry_run)
+        # cd Data/C1-*_Firecrest*
+        logging.info("Cleaning intermediate files")
+        # make clean_intermediate
+        if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
+            clean_process = subprocess.Popen(['make', 'clean_intermediate'], 
+                                             cwd=run.image_analysis.pathname,)
+            clean_process.wait()
+
+
+