The srf/qseq --raw-format patch.
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
index d628f9bf1623ce20f6fabec3d0206702e2197341..e1b474b6cd70846ee6f702ce01365a5c93db0190 100644 (file)
@@ -22,10 +22,11 @@ EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
 VERSION_RE = "([0-9\.]+)"
 USER_RE = "([a-zA-Z0-9]+)"
 LANES_PER_FLOWCELL = 8
-LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
+LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
 
 from htsworkflow.util.alphanum import alphanum
 from htsworkflow.util.ethelp import indent, flatten
+from htsworkflow.util.queuecommands import QueueCommands
 
 from htsworkflow.pipelines import srf
 
@@ -74,6 +75,13 @@ class PipelineRun(object):
         return self._flowcell_id
     flowcell_id = property(_get_flowcell_id)
 
+    def _get_runfolder_name(self):
+        if self.gerald is None:
+            return None
+        else:
+            return self.gerald.runfolder_name
+    runfolder_name = property(_get_runfolder_name)
+
     def get_elements(self):
         """
         make one master xml file from all of our sub-components.
@@ -122,14 +130,14 @@ class PipelineRun(object):
         if self._name is None:
           tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
-          self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
+          self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
         return self._name
     name = property(_get_run_name)
 
     def save(self, destdir=None):
         if destdir is None:
             destdir = ''
-        logging.info("Saving run report "+ self.name)
+        logging.info("Saving run report " + self.name)
         xml = self.get_elements()
         indent(xml)
         dest_pathname = os.path.join(destdir, self.name)
@@ -194,7 +202,7 @@ def get_runs(runfolder):
     logging.info('Searching for runs in ' + datadir)
     runs = []
     # scan for firecrest directories
-    for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
+    for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
         logging.info('Found firecrest in ' + datadir)
         image_analysis = firecrest.firecrest(firecrest_pathname)
         if image_analysis is None:
@@ -214,7 +222,7 @@ def get_runs(runfolder):
         image_analysis = ipar.ipar(ipar_pathname)
         if image_analysis is None:
             logging.warn(
-                "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
+                "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
             )
         else:
             scan_post_image_analysis(
@@ -240,8 +248,8 @@ def get_specific_run(gerald_dir):
     bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
     image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
 
-    runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
-   
+    runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
+
     logging.info('--- use-run detected options ---')
     logging.info('runfolder: %s' % (runfolder_dir,))
     logging.info('image_dir: %s' % (image_dir,))
@@ -254,7 +262,7 @@ def get_specific_run(gerald_dir):
     # leaf directory should be an IPAR or firecrest directory
     data_dir, short_image_dir = os.path.split(image_dir)
     logging.info('data_dir: %s' % (data_dir,))
-    logging.info('short_iamge_dir: %s' %(short_image_dir,))
+    logging.info('short_iamge_dir: %s' % (short_image_dir,))
 
     # guess which type of image processing directory we have by looking
     # in the leaf directory name
@@ -287,7 +295,7 @@ def get_specific_run(gerald_dir):
     p.image_analysis = image_run
     p.bustard = base_calling_run
     p.gerald = gerald_run
-    
+
     logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
     return p
 
@@ -319,7 +327,7 @@ def summarize_mapped_reads(genome_map, mapped_reads):
 def summarize_lane(gerald, lane_id):
     report = []
     summary_results = gerald.summary.lane_results
-    for end in range(len(summary_results)):  
+    for end in range(len(summary_results)):
       eland_result = gerald.eland_results.results[end][lane_id]
       report.append("Sample name %s" % (eland_result.sample_name))
       report.append("Lane id %s end %s" % (eland_result.lane_id, end))
@@ -331,9 +339,9 @@ def summarize_lane(gerald, lane_id):
       if hasattr(eland_result, 'match_codes'):
           mc = eland_result.match_codes
           nm = mc['NM']
-          nm_percent = float(nm)/eland_result.reads  * 100
+          nm_percent = float(nm) / eland_result.reads * 100
           qc = mc['QC']
-          qc_percent = float(qc)/eland_result.reads * 100
+          qc_percent = float(qc) / eland_result.reads * 100
 
           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
@@ -377,16 +385,57 @@ def is_compressed(filename):
     else:
         return False
 
+def save_flowcell_reports(data_dir, cycle_dir):
+    """
+    Save the flowcell quality reports
+    """
+    data_dir = os.path.abspath(data_dir)
+    status_file = os.path.join(data_dir, 'Status.xml')
+    reports_dir = os.path.join(data_dir, 'reports')
+    reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
+    if os.path.exists(reports_dir):
+        cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
+        if os.path.exists(status_file):
+            cmd_list.extend(['Status.xml', 'Status.xsl'])
+        logging.info("Saving reports from " + reports_dir)
+        cwd = os.getcwd()
+        os.chdir(data_dir)
+        q = QueueCommands([" ".join(cmd_list)])
+        q.run()
+        os.chdir(cwd)
+
+
 def save_summary_file(gerald_object, cycle_dir):
-      # Copy Summary.htm
-      summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
-      if os.path.exists(summary_path):
-          logging.info('Copying %s to %s' % (summary_path, cycle_dir))
-          shutil.copy(summary_path, cycle_dir)
-      else:
-          logging.info('Summary file %s was not found' % (summary_path,))
+    # Copy Summary.htm
+    summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
+    if os.path.exists(summary_path):
+        logging.info('Copying %s to %s' % (summary_path, cycle_dir))
+        shutil.copy(summary_path, cycle_dir)
+    else:
+        logging.info('Summary file %s was not found' % (summary_path,))
+
+def save_ivc_plot(bustard_object, cycle_dir):
+    """
+    Save the IVC page and its supporting images
+    """
+    plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
+    plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
+    plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
+
+    plot_target_path = os.path.join(cycle_dir, 'Plots')
+
+    if os.path.exists(plot_html):
+        logging.debug("Saving %s" % (plot_html,))
+        logging.debug("Saving %s" % (plot_images,))
+        shutil.copy(plot_html, cycle_dir)
+        if not os.path.exists(plot_target_path):
+            os.mkdir(plot_target_path)
+        for plot_file in glob(plot_images):
+            shutil.copy(plot_file, plot_target_path)
+    else:
+        logging.warning('Missing IVC.html file, not archiving')
+
 
-    
 def compress_score_files(bustard_object, cycle_dir):
     """
     Compress score files into our result directory
@@ -405,12 +454,12 @@ def compress_score_files(bustard_object, cycle_dir):
 
     tar_cmd = ['tar', 'c'] + score_files
     bzip_cmd = [ 'bzip2', '-9', '-c' ]
-    tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
+    tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
     tar_dest = open(tar_dest_name, 'w')
     logging.info("Compressing score files from %s" % (scores_path,))
     logging.info("Running tar: " + " ".join(tar_cmd[:10]))
     logging.info("Running bzip2: " + " ".join(bzip_cmd))
-    logging.info("Writing to %s" %(tar_dest_name,))
+    logging.info("Writing to %s" % (tar_dest_name,))
 
     env = {'BZIP': '-9'}
     tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
@@ -419,33 +468,45 @@ def compress_score_files(bustard_object, cycle_dir):
     tar.wait()
 
 
-def compress_eland_results(gerald_object, cycle_dir):
+def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
     """
     Compress eland result files into the archive directory
     """
     # copy & bzip eland files
+    bz_commands = []
+
     for lanes_dictionary in gerald_object.eland_results.results:
         for eland_lane in lanes_dictionary.values():
             source_name = eland_lane.pathname
-            path, name = os.path.split(eland_lane.pathname)
-            dest_name = os.path.join(cycle_dir, name)
-            logging.info("Saving eland file %s to %s" % \
-                         (source_name, dest_name))
-            
-            if is_compressed(name):
-              logging.info('Already compressed, Saving to %s' % (dest_name, ))
-              shutil.copy(source_name, dest_name)
+            if source_name is None:
+              logging.info(
+                "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
             else:
-              # not compressed
-              dest_name += '.bz2'
-              args = ['bzip2', '-9', '-c', source_name]
-              logging.info('Running: %s' % ( " ".join(args) ))
-              bzip_dest = open(dest_name, 'w')
-              bzip = subprocess.Popen(args, stdout=bzip_dest)
-              logging.info('Saving to %s' % (dest_name, ))
-              bzip.wait()
-    
-def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
+              path, name = os.path.split(source_name)
+              dest_name = os.path.join(cycle_dir, name)
+              logging.info("Saving eland file %s to %s" % \
+                         (source_name, dest_name))
+
+              if is_compressed(name):
+                logging.info('Already compressed, Saving to %s' % (dest_name,))
+                shutil.copy(source_name, dest_name)
+              else:
+                # not compressed
+                dest_name += '.bz2'
+                args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
+                bz_commands.append(" ".join(args))
+                #logging.info('Running: %s' % ( " ".join(args) ))
+                #bzip_dest = open(dest_name, 'w')
+                #bzip = subprocess.Popen(args, stdout=bzip_dest)
+                #logging.info('Saving to %s' % (dest_name, ))
+                #bzip.wait()
+
+    if len(bz_commands) > 0:
+      q = QueueCommands(bz_commands, num_jobs)
+      q.run()
+
+
+def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
     """
     Iterate over runfolders in runs extracting the most useful information.
       * run parameters (in run-*.xml)
@@ -467,33 +528,53 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
       cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
       logging.info("Filling in %s" % (cycle,))
       cycle_dir = os.path.join(result_dir, cycle)
+      cycle_dir = os.path.abspath(cycle_dir)
       if os.path.exists(cycle_dir):
         logging.error("%s already exists, not overwriting" % (cycle_dir,))
         continue
       else:
         os.mkdir(cycle_dir)
 
-      # copy stuff out of the main run
-      g = r.gerald
-
       # save run file
       r.save(cycle_dir)
 
+      # save illumina flowcell status report
+      save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
+
+      # save stuff from bustard
+      # grab IVC plot
+      save_ivc_plot(r.bustard, cycle_dir)
+
+      # build base call saving commands
+      if site is not None:
+        lanes = []
+        for lane in range(1, 9):
+          if r.gerald.lanes[lane].analysis != 'none':
+            lanes.append(lane)
+
+        run_name = srf.pathname_to_run_name(r.pathname)
+        if raw_format == 'qseq':
+            seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
+        elif raw_format == 'srf':
+            seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
+        else:
+            raise ValueError('Unknown --raw-format=%s' % (raw_format))
+        srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
+
+      # save stuff from GERALD
+      # copy stuff out of the main run
+      g = r.gerald
+
       # save summary file
       save_summary_file(g, cycle_dir)
-      
-      # tar score files
-      compress_score_files(r.bustard, cycle_dir)
 
       # compress eland result files
-      compress_eland_results(g, cycle_dir)
-      
-      # build srf commands
-      lanes = range(1,9)
-      run_name = srf.pathname_to_run_name(r.pathname)
-      srf_cmds = srf.make_commands(run_name, lanes, site, cycle_dir)
-      srf.run_srf_commands(r.bustard.pathname, srf_cmds, 2)
-      
+      compress_eland_results(g, cycle_dir, num_jobs)
+
+      # md5 all the compressed files once we're done
+      md5_commands = srf.make_md5_commands(cycle_dir)
+      srf.run_commands(cycle_dir, md5_commands, num_jobs)
+
 def rm_list(files, dry_run=True):
     for f in files:
         if os.path.exists(f):
@@ -504,7 +585,7 @@ def rm_list(files, dry_run=True):
                 else:
                     os.unlink(f)
         else:
-            logging.warn("%s doesn't exist."% (f,))
+            logging.warn("%s doesn't exist." % (f,))
 
 def clean_runs(runs, dry_run=True):
     """
@@ -537,7 +618,7 @@ def clean_runs(runs, dry_run=True):
         logging.info("Cleaning intermediate files")
         # make clean_intermediate
         if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
-            clean_process = subprocess.Popen(['make', 'clean_intermediate'], 
+            clean_process = subprocess.Popen(['make', 'clean_intermediate'],
                                              cwd=run.image_analysis.pathname,)
             clean_process.wait()