Modify the srf utility to tar.bz2 the qseq files instead of the using
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
index 5e9bd0b6a544d323fbfc5feb444d1fbf671933e9..038ae5bbd6c57a60e77fbcb73f934a21fd50dc54 100644 (file)
@@ -9,6 +9,7 @@ import shutil
 import stat
 import subprocess
 import sys
+import tarfile
 import time
 
 try:
@@ -25,6 +26,7 @@ LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
 
 from htsworkflow.util.alphanum import alphanum
 from htsworkflow.util.ethelp import indent, flatten
+from htsworkflow.util.queuecommands import QueueCommands
 
 from htsworkflow.pipelines import srf
 
@@ -376,22 +378,63 @@ def is_compressed(filename):
     else:
         return False
 
+def save_flowcell_reports(data_dir, cycle_dir):
+    """
+    Save the flowcell quality reports
+    """
+    data_dir = os.path.abspath(data_dir)
+    status_file = os.path.join(data_dir, 'Status.xml')
+    reports_dir = os.path.join(data_dir, 'reports')
+    reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
+    if os.path.exists(reports_dir):
+        cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
+        if os.path.exists(status_file):
+            cmd_list.extend(['Status.xml', 'Status.xsl'])
+        logging.info("Saving reports from " + reports_dir)
+        cwd = os.getcwd()
+        os.chdir(data_dir)
+        q = QueueCommands([" ".join(cmd_list)])
+        q.run()
+        os.chdir(cwd)
+
+
 def save_summary_file(gerald_object, cycle_dir):
-      # Copy Summary.htm
-      summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
-      if os.path.exists(summary_path):
-          logging.info('Copying %s to %s' % (summary_path, cycle_dir))
-          shutil.copy(summary_path, cycle_dir)
-      else:
-          logging.info('Summary file %s was not found' % (summary_path,))
+    # Copy Summary.htm
+    summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
+    if os.path.exists(summary_path):
+        logging.info('Copying %s to %s' % (summary_path, cycle_dir))
+        shutil.copy(summary_path, cycle_dir)
+    else:
+        logging.info('Summary file %s was not found' % (summary_path,))
 
-    
-def compress_score_files(gerald_object, cycle_dir):
+def save_ivc_plot(bustard_object, cycle_dir):
+    """
+    Save the IVC page and its supporting images
+    """
+    plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
+    plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
+    plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
+
+    plot_target_path = os.path.join(cycle_dir, 'Plots')
+
+    if os.path.exists(plot_html):
+        logging.debug("Saving %s" % ( plot_html, ))
+        logging.debug("Saving %s" % ( plot_images, ))
+        shutil.copy(plot_html, cycle_dir)
+        if not os.path.exists(plot_target_path):
+            os.mkdir(plot_target_path)    
+        for plot_file in glob(plot_images):
+            shutil.copy(plot_file, plot_target_path)
+    else:
+        logging.warning('Missing IVC.html file, not archiving')
+
+
+def compress_score_files(bustard_object, cycle_dir):
     """
     Compress score files into our result directory
     """
     # check for g.pathname/Temp a new feature of 1.1rc1
-    scores_path = gerald_object.pathname
+    scores_path = bustard_object.pathname
     scores_path_temp = os.path.join(scores_path, 'Temp')
     if os.path.isdir(scores_path_temp):
         scores_path = scores_path_temp
@@ -402,14 +445,14 @@ def compress_score_files(gerald_object, cycle_dir):
         if re.match('.*_score.txt', f):
             score_files.append(f)
 
-    tar_cmd = ['/bin/tar', 'c'] + score_files
+    tar_cmd = ['tar', 'c'] + score_files
     bzip_cmd = [ 'bzip2', '-9', '-c' ]
     tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
     tar_dest = open(tar_dest_name, 'w')
     logging.info("Compressing score files from %s" % (scores_path,))
     logging.info("Running tar: " + " ".join(tar_cmd[:10]))
     logging.info("Running bzip2: " + " ".join(bzip_cmd))
-    logging.info("Writing to %s" %(tar_dest_name))
+    logging.info("Writing to %s" %(tar_dest_name,))
 
     env = {'BZIP': '-9'}
     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
@@ -417,11 +460,14 @@ def compress_score_files(gerald_object, cycle_dir):
     bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
     tar.wait()
 
-def compress_eland_results(gerald_object, cycle_dir):
+
+def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
     """
     Compress eland result files into the archive directory
     """
     # copy & bzip eland files
+    bz_commands = []
+    
     for lanes_dictionary in gerald_object.eland_results.results:
         for eland_lane in lanes_dictionary.values():
             source_name = eland_lane.pathname
@@ -436,14 +482,28 @@ def compress_eland_results(gerald_object, cycle_dir):
             else:
               # not compressed
               dest_name += '.bz2'
-              args = ['bzip2', '-9', '-c', source_name]
-              logging.info('Running: %s' % ( " ".join(args) ))
-              bzip_dest = open(dest_name, 'w')
-              bzip = subprocess.Popen(args, stdout=bzip_dest)
-              logging.info('Saving to %s' % (dest_name, ))
-              bzip.wait()
+              args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
+              bz_commands.append(" ".join(args))
+              #logging.info('Running: %s' % ( " ".join(args) ))
+              #bzip_dest = open(dest_name, 'w')
+              #bzip = subprocess.Popen(args, stdout=bzip_dest)
+              #logging.info('Saving to %s' % (dest_name, ))
+              #bzip.wait()
+              
+    if len(bz_commands) > 0:
+      q = QueueCommands(bz_commands, num_jobs)
+      q.run()
+      
     
 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
+    """
+    Iterate over runfolders in runs extracting the most useful information.
+      * run parameters (in run-*.xml)
+      * eland_result files
+      * score files
+      * Summary.htm
+      * srf files (raw sequence & qualities)
+    """
     if output_base_dir is None:
         output_base_dir = os.getcwd()
 
@@ -463,25 +523,36 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
       else:
         os.mkdir(cycle_dir)
 
-      # copy stuff out of the main run
-      g = r.gerald
-
       # save run file
       r.save(cycle_dir)
 
+      # save illumina flowcell status report
+      save_flowcell_reports( os.path.join(r.image_analysis.pathname, '..'), cycle_dir )
+      
+      # save stuff from bustard
+      # grab IVC plot
+      save_ivc_plot(r.bustard, cycle_dir)
+
+      # build base call saving commands
+      if site is not None:
+        lanes = range(1,9)
+        run_name = srf.pathname_to_run_name(r.pathname)
+        srf_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
+        srf.run_commands(r.bustard.pathname, srf_cmds, num_jobs)
+
+      # save stuff from GERALD
+      # copy stuff out of the main run
+      g = r.gerald
+
       # save summary file
       save_summary_file(g, cycle_dir)
       
-      # tar score files
-      compress_score_files(g, cycle_dir)
-
       # compress eland result files
-      compress_eland_results(g, cycle_dir)
+      compress_eland_results(g, cycle_dir, num_jobs)
       
-      # build srf commands
-      lanes = range(1,9)
-      srf_cmds = srf.make_commands(r.pathname, lanes, "woldlab", cycle_dir)
-      srf.run_srf_commands(r.bustard.pathname, srf_cmds, 2)
+      # md5 all the compressed files once we're done
+      md5_commands = srf.make_md5_commands(cycle_dir)
+      srf.run_commands(cycle_dir, md5_commands, num_jobs)
       
 def rm_list(files, dry_run=True):
     for f in files: