IPAR detection is more reliable than firecrest so do it first, and then
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
index dee3231068e95150e9e699d96dc3cc80b90614f5..5d663b3d9924890097cd4219b254fdf8339f0f9c 100644 (file)
@@ -25,7 +25,6 @@ LANES_PER_FLOWCELL = 8
 from htsworkflow.util.alphanum import alphanum
 from htsworkflow.util.ethelp import indent, flatten
 
-
 class PipelineRun(object):
     """
     Capture "interesting" information about a pipeline run
@@ -34,20 +33,20 @@ class PipelineRun(object):
     PIPELINE_RUN = 'PipelineRun'
     FLOWCELL_ID = 'FlowcellID'
 
-    def __init__(self, pathname=None, firecrest=None, bustard=None, gerald=None, xml=None):
+    def __init__(self, pathname=None, xml=None):
         if pathname is not None:
           self.pathname = os.path.normpath(pathname)
         else:
           self.pathname = None
         self._name = None
         self._flowcell_id = None
-        self.firecrest = firecrest
-        self.bustard = bustard
-        self.gerald = gerald
+        self.image_analysis = None
+        self.bustard = None
+        self.gerald = None
 
         if xml is not None:
           self.set_elements(xml)
-    
+
     def _get_flowcell_id(self):
         # extract flowcell ID
         if self._flowcell_id is None:
@@ -63,7 +62,7 @@ class PipelineRun(object):
               flowcell_id = path_fields[-1]
             else:
               flowcell_id = 'unknown'
-              
+
            logging.warning(
              "Flowcell id was not found, guessing %s" % (
                 flowcell_id))
@@ -78,7 +77,7 @@ class PipelineRun(object):
         root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
         flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
         flowcell.text = self.flowcell_id
-        root.append(self.firecrest.get_elements())
+        root.append(self.image_analysis.get_elements())
         root.append(self.bustard.get_elements())
         root.append(self.gerald.get_elements())
         return root
@@ -87,6 +86,7 @@ class PipelineRun(object):
         # this file gets imported by all the others,
         # so we need to hide the imports to avoid a cyclic imports
         from htsworkflow.pipelines import firecrest
+        from htsworkflow.pipelines import ipar
         from htsworkflow.pipelines import bustard
         from htsworkflow.pipelines import gerald
 
@@ -99,8 +99,11 @@ class PipelineRun(object):
           if tag == PipelineRun.FLOWCELL_ID.lower():
             self._flowcell_id = element.text
           #ok the xword.Xword.XWORD pattern for module.class.constant is lame
+          # you should only have Firecrest or IPAR, never both of them.
           elif tag == firecrest.Firecrest.FIRECREST.lower():
-            self.firecrest = firecrest.Firecrest(xml=element)
+            self.image_analysis = firecrest.Firecrest(xml=element)
+          elif tag == ipar.IPAR.IPAR.lower():
+            self.image_analysis = ipar.IPAR(xml=element)
           elif tag == bustard.Bustard.BUSTARD.lower():
             self.bustard = bustard.Bustard(xml=element)
           elif tag == gerald.Gerald.GERALD.lower():
@@ -113,7 +116,7 @@ class PipelineRun(object):
         Given a run tuple, find the latest date and use that as our name
         """
         if self._name is None:
-          tmax = max(self.firecrest.time, self.bustard.time, self.gerald.time)
+          tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
           timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
           self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
         return self._name
@@ -133,6 +136,19 @@ class PipelineRun(object):
         tree = ElementTree.parse(filename).getroot()
         self.set_elements(tree)
 
+def load_pipeline_run_xml(pathname):
+    """
+    Load and instantiate a Pipeline run from a run xml file
+
+    :Parameters: 
+      - `pathname` : location of an run xml file
+
+    :Returns: initialized PipelineRun object
+    """
+    tree = ElementTree.parse(pathname).getroot()
+    run = PipelineRun(xml=tree)
+    return run
+
 def get_runs(runfolder):
     """
     Search through a run folder for all the various sub component runs
@@ -143,28 +159,114 @@ def get_runs(runfolder):
     in there gerald component.
     """
     from htsworkflow.pipelines import firecrest
+    from htsworkflow.pipelines import ipar
     from htsworkflow.pipelines import bustard
     from htsworkflow.pipelines import gerald
 
-    datadir = os.path.join(runfolder, 'Data')
-
-    logging.info('Searching for runs in ' + datadir)
-    runs = []
-    for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
-        f = firecrest.firecrest(firecrest_pathname)
-        bustard_glob = os.path.join(firecrest_pathname, "Bustard*")
+    def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
+        logging.info("Looking for bustard directories in %s" % (pathname,))
+        bustard_glob = os.path.join(pathname, "Bustard*")
         for bustard_pathname in glob(bustard_glob):
+            logging.info("Found bustard directory %s" % (bustard_pathname,))
             b = bustard.bustard(bustard_pathname)
             gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
+            logging.info("Looking for gerald directories in %s" % (pathname,))
             for gerald_pathname in glob(gerald_glob):
+                logging.info("Found gerald directory %s" % (gerald_pathname,))
                 try:
                     g = gerald.gerald(gerald_pathname)
-                    runs.append(PipelineRun(runfolder, f, b, g))
+                    p = PipelineRun(runfolder)
+                    p.image_analysis = image_analysis
+                    p.bustard = b
+                    p.gerald = g
+                    runs.append(p)
                 except IOError, e:
-                    print "Ignoring", str(e)
+                    logging.error("Ignoring " + str(e))
+
+    datadir = os.path.join(runfolder, 'Data')
+
+    logging.info('Searching for runs in ' + datadir)
+    runs = []
+    # scan for firecrest directories
+    for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
+        logging.info('Found firecrest in ' + datadir)
+        image_analysis = firecrest.firecrest(firecrest_pathname)
+        if image_analysis is None:
+           logging.warn(
+                "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
+            )
+       else:
+            scan_post_image_analysis(
+                runs, runfolder, image_analysis, firecrest_pathname
+            )
+    # scan for IPAR directories
+    for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
+        logging.info('Found ipar directories in ' + datadir)
+        image_analysis = ipar.ipar(ipar_pathname)
+        if image_analysis is None:
+           logging.warn(
+                "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
+            )
+       else:
+            scan_post_image_analysis(
+                runs, runfolder, image_analysis, ipar_pathname
+            )
+
     return runs
-                
+
+def get_specific_run(gerald_dir):
+    """
+    Given a gerald directory, construct a PipelineRun out of its parents
+
+    Basically this allows specifying a particular run instead of the previous
+    get_runs which scans a runfolder for various combinations of
+    firecrest/ipar/bustard/gerald runs.
+    """
+    from htsworkflow.pipelines import firecrest
+    from htsworkflow.pipelines import ipar
+    from htsworkflow.pipelines import bustard
+    from htsworkflow.pipelines import gerald
+
+    bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
+    image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
+
+    runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
+   
+    logging.debug('--- use-run detected options ---')
+    logging.debug('runfolder: %s' % (runfolder_dir,))
+    logging.debug('image_dir: %s' % (image_dir,))
+    logging.debug('bustard_dir: %s' % (bustard_dir,))
+    logging.debug('gerald_dir: %s' % (gerald_dir,))
+
+    # find our processed image dir
+    image_run = ipar.ipar(image_dir)
+    if image_run is None:
+        image_run = firecrest.firecrest(image_dir)
+    if image_run is None:
+        msg = '%s does not contain an image processing step' % (image_dir,)
+        logging.error(msg)
+        return None
+
+    # find our base calling
+    base_calling_run = bustard.bustard(bustard_dir)
+    if base_calling_run is None:
+        logging.error('%s does not contain a bustard run' % (bustard_dir,))
+        return None
+
+    # find alignments
+    gerald_run = gerald.gerald(gerald_dir)
+    if gerald_run is None:
+        logging.error('%s does not contain a gerald run' % (gerald_dir,))
+        return None
+
+    p = PipelineRun(runfolder_dir)
+    p.image_analysis = image_run
+    p.bustard = base_calling_run
+    p.gerald = gerald_run
     
+    logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
+    return p
+
 def extract_run_parameters(runs):
     """
     Search through runfolder_path for various runs and grab their parameters
@@ -172,7 +274,7 @@ def extract_run_parameters(runs):
     for run in runs:
       run.save()
 
-def summarize_mapped_reads(mapped_reads):
+def summarize_mapped_reads(genome_map, mapped_reads):
     """
     Summarize per chromosome reads into a genome count
     But handle spike-in/contamination symlinks seperately.
@@ -182,7 +284,7 @@ def summarize_mapped_reads(mapped_reads):
     genome = 'unknown'
     for k, v in mapped_reads.items():
         path, k = os.path.split(k)
-        if len(path) > 0:
+        if len(path) > 0 and not genome_map.has_key(path):
             genome = path
             genome_reads += v
         else:
@@ -190,6 +292,35 @@ def summarize_mapped_reads(mapped_reads):
     summarized_reads[genome] = genome_reads
     return summarized_reads
 
+def summarize_lane(gerald, lane_id):
+    report = []
+    summary_results = gerald.summary.lane_results
+    for end in range(len(summary_results)):  
+      eland_result = gerald.eland_results.results[end][lane_id]
+      report.append("Sample name %s" % (eland_result.sample_name))
+      report.append("Lane id %s end %s" % (eland_result.lane_id, end))
+      cluster = summary_results[end][eland_result.lane_id].cluster
+      report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
+      report.append("Total Reads: %d" % (eland_result.reads))
+      mc = eland_result._match_codes
+      nm = mc['NM']
+      nm_percent = float(nm)/eland_result.reads  * 100
+      qc = mc['QC']
+      qc_percent = float(qc)/eland_result.reads * 100
+
+      report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
+      report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
+      report.append('Unique (0,1,2 mismatches) %d %d %d' % \
+                    (mc['U0'], mc['U1'], mc['U2']))
+      report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
+                    (mc['R0'], mc['R1'], mc['R2']))
+      report.append("Mapped Reads")
+      mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
+      for name, counts in mapped_reads.items():
+        report.append("  %s: %d" % (name, counts))
+      report.append('')
+    return report
+
 def summary_report(runs):
     """
     Summarize cluster numbers and mapped read counts for a runfolder
@@ -199,37 +330,23 @@ def summary_report(runs):
         # print a run name?
         report.append('Summary for %s' % (run.name,))
        # sort the report
-       eland_keys = run.gerald.eland_results.results.keys()
+       eland_keys = run.gerald.eland_results.results[0].keys()
        eland_keys.sort(alphanum)
 
-        lane_results = run.gerald.summary.lane_results
        for lane_id in eland_keys:
-           result = run.gerald.eland_results.results[lane_id]
-            report.append("Sample name %s" % (result.sample_name))
-            report.append("Lane id %s" % (result.lane_id,))
-            cluster = lane_results[result.lane_id].cluster
-            report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
-            report.append("Total Reads: %d" % (result.reads))
-            mc = result._match_codes
-            nm = mc['NM']
-            nm_percent = float(nm)/result.reads  * 100
-            qc = mc['QC']
-            qc_percent = float(qc)/result.reads * 100
-
-           report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
-           report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
-            report.append('Unique (0,1,2 mismatches) %d %d %d' % \
-                          (mc['U0'], mc['U1'], mc['U2']))
-            report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
-                          (mc['R0'], mc['R1'], mc['R2']))
-            report.append("Mapped Reads")
-            mapped_reads = summarize_mapped_reads(result.mapped_reads)
-            for name, counts in mapped_reads.items():
-              report.append("  %s: %d" % (name, counts))
+            report.extend(summarize_lane(run.gerald, lane_id))
             report.append('---')
             report.append('')
         return os.linesep.join(report)
 
+def is_compressed(filename):
+    if os.path.splitext(filename)[1] == ".gz":
+        return True
+    elif os.path.splitext(filename)[1] == '.bz2':
+        return True
+    else:
+        return False
+
 def extract_results(runs, output_base_dir=None):
     if output_base_dir is None:
         output_base_dir = os.getcwd()
@@ -239,9 +356,9 @@ def extract_results(runs, output_base_dir=None):
       logging.info("Using %s as result directory" % (result_dir,))
       if not os.path.exists(result_dir):
         os.mkdir(result_dir)
-      
+
       # create cycle_dir
-      cycle = "C%d-%d" % (r.firecrest.start, r.firecrest.stop)
+      cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
       logging.info("Filling in %s" % (cycle,))
       cycle_dir = os.path.join(result_dir, cycle)
       if os.path.exists(cycle_dir):
@@ -256,6 +373,8 @@ def extract_results(runs, output_base_dir=None):
       # save run file
       r.save(cycle_dir)
 
+      return
+
       # Copy Summary.htm
       summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
       if os.path.exists(summary_path):
@@ -266,7 +385,15 @@ def extract_results(runs, output_base_dir=None):
 
       # tar score files
       score_files = []
-      for f in os.listdir(g.pathname):
+
+      # check for g.pathname/Temp a new feature of 1.1rc1
+      scores_path = g.pathname
+      scores_path_temp = os.path.join(scores_path, 'Temp')
+      if os.path.isdir(scores_path_temp):
+          scores_path = scores_path_temp
+
+      # hopefully we have a directory that contains s_*_score files
+      for f in os.listdir(scores_path):
           if re.match('.*_score.txt', f):
               score_files.append(f)
 
@@ -274,27 +401,37 @@ def extract_results(runs, output_base_dir=None):
       bzip_cmd = [ 'bzip2', '-9', '-c' ]
       tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
       tar_dest = open(tar_dest_name, 'w')
-      logging.info("Compressing score files in %s" % (g.pathname,))
+      logging.info("Compressing score files from %s" % (scores_path,))
       logging.info("Running tar: " + " ".join(tar_cmd[:10]))
       logging.info("Running bzip2: " + " ".join(bzip_cmd))
       logging.info("Writing to %s" %(tar_dest_name))
-      
-      tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, cwd=g.pathname)
+
+      tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, 
+                             cwd=scores_path)
       bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
       tar.wait()
 
       # copy & bzip eland files
-      for eland_lane in g.eland_results.values():
-          source_name = eland_lane.pathname
-          path, name = os.path.split(eland_lane.pathname)
-          dest_name = os.path.join(cycle_dir, name+'.bz2')
-
-          args = ['bzip2', '-9', '-c', source_name]
-          logging.info('Running: %s' % ( " ".join(args) ))
-          bzip_dest = open(dest_name, 'w')
-          bzip = subprocess.Popen(args, stdout=bzip_dest)
-          logging.info('Saving to %s' % (dest_name, ))
-          bzip.wait()
+      for lanes_dictionary in g.eland_results.results:
+          for eland_lane in lanes_dictionary.values():
+              source_name = eland_lane.pathname
+              path, name = os.path.split(eland_lane.pathname)
+              dest_name = os.path.join(cycle_dir, name)
+             logging.info("Saving eland file %s to %s" % \
+                          (source_name, dest_name))
+
+              if is_compressed(name):
+                logging.info('Already compressed, Saving to %s' % (dest_name, ))
+                shutil.copy(source_name, dest_name)
+              else:
+                # not compressed
+                dest_name += '.bz2'
+                args = ['bzip2', '-9', '-c', source_name]
+                logging.info('Running: %s' % ( " ".join(args) ))
+                bzip_dest = open(dest_name, 'w')
+                bzip = subprocess.Popen(args, stdout=bzip_dest)
+                logging.info('Saving to %s' % (dest_name, ))
+                bzip.wait()
 
 def clean_runs(runs):
     """