Modify the srf utility to tar.bz2 the qseq files instead of the using
authorDiane Trout <diane@caltech.edu>
Fri, 11 Dec 2009 23:56:10 +0000 (23:56 +0000)
committerDiane Trout <diane@caltech.edu>
Fri, 11 Dec 2009 23:56:10 +0000 (23:56 +0000)
the srf utility.
Additionally I updated the runfolder script to capture a few more
pieces of information (in addition to the switch to qseq files).
I'm now capturing the IVC plot and pngs, and the
flow cell reports generated by the 1.4 and later version of the
pipeline.

htsworkflow/pipelines/runfolder.py
htsworkflow/pipelines/srf.py
scripts/srf

index cfa68a4a462d9377e2eefabd1d9c42590a5f66ee..038ae5bbd6c57a60e77fbcb73f934a21fd50dc54 100644 (file)
@@ -26,6 +26,7 @@ LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
 
 from htsworkflow.util.alphanum import alphanum
 from htsworkflow.util.ethelp import indent, flatten
+from htsworkflow.util.queuecommands import QueueCommands
 
 from htsworkflow.pipelines import srf
 
@@ -377,16 +378,57 @@ def is_compressed(filename):
     else:
         return False
 
+def save_flowcell_reports(data_dir, cycle_dir):
+    """
+    Save the flowcell quality reports
+    """
+    data_dir = os.path.abspath(data_dir)
+    status_file = os.path.join(data_dir, 'Status.xml')
+    reports_dir = os.path.join(data_dir, 'reports')
+    reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
+    if os.path.exists(reports_dir):
+        cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
+        if os.path.exists(status_file):
+            cmd_list.extend(['Status.xml', 'Status.xsl'])
+        logging.info("Saving reports from " + reports_dir)
+        cwd = os.getcwd()
+        os.chdir(data_dir)
+        q = QueueCommands([" ".join(cmd_list)])
+        q.run()
+        os.chdir(cwd)
+
+
 def save_summary_file(gerald_object, cycle_dir):
-      # Copy Summary.htm
-      summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
-      if os.path.exists(summary_path):
-          logging.info('Copying %s to %s' % (summary_path, cycle_dir))
-          shutil.copy(summary_path, cycle_dir)
-      else:
-          logging.info('Summary file %s was not found' % (summary_path,))
+    # Copy Summary.htm
+    summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
+    if os.path.exists(summary_path):
+        logging.info('Copying %s to %s' % (summary_path, cycle_dir))
+        shutil.copy(summary_path, cycle_dir)
+    else:
+        logging.info('Summary file %s was not found' % (summary_path,))
+
+def save_ivc_plot(bustard_object, cycle_dir):
+    """
+    Save the IVC page and its supporting images
+    """
+    plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
+    plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
+    plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
+
+    plot_target_path = os.path.join(cycle_dir, 'Plots')
+
+    if os.path.exists(plot_html):
+        logging.debug("Saving %s" % ( plot_html, ))
+        logging.debug("Saving %s" % ( plot_images, ))
+        shutil.copy(plot_html, cycle_dir)
+        if not os.path.exists(plot_target_path):
+            os.mkdir(plot_target_path)    
+        for plot_file in glob(plot_images):
+            shutil.copy(plot_file, plot_target_path)
+    else:
+        logging.warning('Missing IVC.html file, not archiving')
+
 
-    
 def compress_score_files(bustard_object, cycle_dir):
     """
     Compress score files into our result directory
@@ -419,11 +461,13 @@ def compress_score_files(bustard_object, cycle_dir):
     tar.wait()
 
 
-def compress_eland_results(gerald_object, cycle_dir):
+def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
     """
     Compress eland result files into the archive directory
     """
     # copy & bzip eland files
+    bz_commands = []
+    
     for lanes_dictionary in gerald_object.eland_results.results:
         for eland_lane in lanes_dictionary.values():
             source_name = eland_lane.pathname
@@ -438,12 +482,18 @@ def compress_eland_results(gerald_object, cycle_dir):
             else:
               # not compressed
               dest_name += '.bz2'
-              args = ['bzip2', '-9', '-c', source_name]
-              logging.info('Running: %s' % ( " ".join(args) ))
-              bzip_dest = open(dest_name, 'w')
-              bzip = subprocess.Popen(args, stdout=bzip_dest)
-              logging.info('Saving to %s' % (dest_name, ))
-              bzip.wait()
+              args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
+              bz_commands.append(" ".join(args))
+              #logging.info('Running: %s' % ( " ".join(args) ))
+              #bzip_dest = open(dest_name, 'w')
+              #bzip = subprocess.Popen(args, stdout=bzip_dest)
+              #logging.info('Saving to %s' % (dest_name, ))
+              #bzip.wait()
+              
+    if len(bz_commands) > 0:
+      q = QueueCommands(bz_commands, num_jobs)
+      q.run()
+      
     
 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
     """
@@ -473,27 +523,36 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
       else:
         os.mkdir(cycle_dir)
 
-      # copy stuff out of the main run
-      g = r.gerald
-
       # save run file
       r.save(cycle_dir)
 
-      # save summary file
-      save_summary_file(g, cycle_dir)
+      # save illumina flowcell status report
+      save_flowcell_reports( os.path.join(r.image_analysis.pathname, '..'), cycle_dir )
       
-      # tar score files
-      compress_score_files(r.bustard, cycle_dir)
+      # save stuff from bustard
+      # grab IVC plot
+      save_ivc_plot(r.bustard, cycle_dir)
 
-      # compress eland result files
-      compress_eland_results(g, cycle_dir)
-      
-      # build srf commands
+      # build base call saving commands
       if site is not None:
         lanes = range(1,9)
         run_name = srf.pathname_to_run_name(r.pathname)
-        srf_cmds = srf.make_commands(run_name, lanes, site, cycle_dir)
-        srf.run_srf_commands(r.bustard.pathname, srf_cmds, 2)
+        srf_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
+        srf.run_commands(r.bustard.pathname, srf_cmds, num_jobs)
+
+      # save stuff from GERALD
+      # copy stuff out of the main run
+      g = r.gerald
+
+      # save summary file
+      save_summary_file(g, cycle_dir)
+      
+      # compress eland result files
+      compress_eland_results(g, cycle_dir, num_jobs)
+      
+      # md5 all the compressed files once we're done
+      md5_commands = srf.make_md5_commands(cycle_dir)
+      srf.run_commands(cycle_dir, md5_commands, num_jobs)
       
 def rm_list(files, dry_run=True):
     for f in files:
index 3809bc6441316a8055a61e53b45c1a4007a1ba5e..b27c33e019210fa49f175c95910f30949ce8df21 100644 (file)
@@ -1,3 +1,4 @@
+from glob import glob
 import logging
 import os
 
@@ -11,15 +12,28 @@ def pathname_to_run_name(base):
   """
   Convert a pathname to a base runfolder name
   handle the case with a trailing /
+
+  >>> print pathname_to_run_name("/a/b/c/run")
+  run
+  >>> print pathname_to_run_name("/a/b/c/run/")
+  run
+  >>> print pathname_to_run_name("run")
+  run
+  >>> print pathname_to_run_name("run/")
+  run
+  >>> print pathname_to_run_name("../run")
+  run
+  >>> print pathname_to_run_name("../run/")
+  run
   """
   name = ""
   while len(name) == 0:
     base, name = os.path.split(base)
     if len(base) == 0:
-      return None
+      break
   return name
 
-def make_commands(run_name, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11):
+def make_srf_commands(run_name, bustard_dir, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11):
   """
   make a subprocess-friendly list of command line arguments to run solexa2srf
   generates files like: 
@@ -65,11 +79,94 @@ def make_commands(run_name, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11):
     cmd_list.append(" ".join(cmd))
   return cmd_list
 
-def run_srf_commands(bustard_dir, cmd_list, num_jobs):
-    logging.info("chdir to %s" % (bustard_dir,))
+def create_qseq_patterns(bustard_dir):
+  """
+  Scan a bustard directory for qseq files and determine a glob pattern
+  """
+  # grab one tile for each lane.
+  qseqs = glob(os.path.join(bustard_dir, '*_0001_qseq.txt'))
+  qseqs = [ os.path.split(x)[-1] for x in qseqs ]
+  if len(qseqs[0].split('_')) == 4:
+    # single ended
+    return [(None,"s_%d_[0-9][0-9][0-9][0-9]_qseq.txt")]
+  elif len(qseqs[0].split('_')) == 5:
+    # more than 1 read
+    # build a dictionary of read numbers by lane
+    # ( just in case we didn't run all 8 lanes )
+    lanes = {}
+    for q in qseqs:
+      sample, lane, read, tile, extension = q.split('_')
+      lanes.setdefault(lane, []).append(read)
+    qseq_patterns = []
+    # grab a lane from the dictionary
+    # I don't think it matters which one.
+    k = lanes.keys()[0]
+    # build the list of patterns
+    for read in lanes[k]:
+      read = int(read)
+      qseq_patterns.append((read, 's_%d_' + '%d_[0-9][0-9][0-9][0-9]_qseq.txt' % (read,)))
+    return qseq_patterns
+  else:
+    raise RuntimeError('unrecognized qseq pattern, not a single or multiple read pattern')
+  
+def make_qseq_commands(run_name, bustard_dir, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11):
+  """
+  make a subprocess-friendly list of command line arguments to run solexa2srf
+  generates files like: 
+  woldlab:080514_HWI-EAS229_0029_20768AAXX:8.srf
+   site        run name                    lane
+             
+  run_name - most of the file name (run folder name is a good choice)
+  lanes - list of integers corresponding to which lanes to process
+  site_name - name of your "sequencing site" or "Individual"
+  destdir - where to write all the srf files
+  """
+  # clean up pathname
+  logging.info("run_name %s" % ( run_name, ))
+  
+  cmd_list = []
+  for lane in lanes:
+    name_prefix = '%s_%%l_%%t_' % (run_name,)
+    destdir = os.path.normpath(destdir)
+    qseq_patterns = create_qseq_patterns(bustard_dir)
+
+    for read, pattern in qseq_patterns:
+      if read is None:
+        destname = '%s_%s_l%d.tar.bz2' % (site_name, run_name, lane)
+        dest_path = os.path.join(destdir, destname)
+      else:
+        destname = '%s_%s_l%d_r%d.tar.bz2' % (site_name, run_name, lane, read)
+        dest_path = os.path.join(destdir, destname)
+        
+      cmd = " ".join(['tar', 'cjf', dest_path, pattern % (lane,) ])
+      logging.info("Generated command: " + cmd)
+      cmd_list.append(cmd)
+      
+  return cmd_list
+
+def run_commands(new_dir, cmd_list, num_jobs):
+    logging.info("chdir to %s" % (new_dir,))
     curdir = os.getcwd()
-    os.chdir(bustard_dir)
+    os.chdir(new_dir)
     q = queuecommands.QueueCommands(cmd_list, num_jobs)
     q.run()
     os.chdir(curdir)
     
+def make_md5_commands(destdir):
+  """
+  Scan the cycle dir and create md5s for the contents
+  """
+  cmd_list = []
+  destdir = os.path.abspath(destdir)
+  bz2s = glob(os.path.join(destdir,"*.bz2"))
+  gzs = glob(os.path.join(destdir,"*gz"))
+  srfs = glob(os.path.join(destdir,"*.srf"))
+
+  file_list = bz2s + gzs + srfs
+
+  for f in file_list:
+      cmd = " ".join(['md5sum', f, '>', f+'.md5'])
+      logging.info('generated command: '+cmd)
+      cmd_list.append(cmd)
+
+  return cmd_list
index d363a4e63ed4d0e248db6cee433b02504ee7d70c..bcf835dbfa29c1d3707d0e9e602d5c5882093ce4 100644 (file)
@@ -6,7 +6,8 @@ import os
 import sys
 
 from htsworkflow.pipelines import runfolder
-from htsworkflow.pipelines.srf import make_commands, run_srf_commands, pathname_to_run_name
+from htsworkflow.pipelines.srf import make_srf_commands, make_qseq_commands, \
+                                      run_commands, pathname_to_run_name
 from htsworkflow.pipelines.srf import ILLUMINA2SRF10, ILLUMINA2SRF11, SOLEXA2SRF
 
 def make_parser():
@@ -84,7 +85,8 @@ def main(cmdline=None):
         parser.error(
           "Number of lane arguments must match number of runfolders"
         )
-    
+
+    make_commands = make_qseq_commands
     # build list of commands
     cmds = {}
     for runfolder_path, lanes in zip(args, lanes_list):
@@ -99,14 +101,19 @@ def main(cmdline=None):
           return 1
         elif len(runs) == 1:
           bustard_dir = runs[0].bustard.pathname
-          cmds[bustard_dir] = make_commands(run_name, lanes, opts.site, opts.dest_dir, opts.runfolder_version)
+          cmds[bustard_dir] = make_commands(run_name,
+                                            bustard_dir,
+                                            lanes,
+                                            opts.site,
+                                            opts.dest_dir,
+                                            opts.runfolder_version)
         else:
           print "ERROR: Couldn't find a bustard directory in", runfolder_path
           return 1
 
     if not opts.dry_run:
       for cwd, cmd_list in cmds.items():
-        run_srf_commands(cwd, cmd_list, opts.jobs)
+        run_commands(cwd, cmd_list, opts.jobs)
     else:
       for cwd, cmd_list in cmds.items():
         print cwd