Try to make Aligned result directories optional in hiseq runs.
[htsworkflow.git] / htsworkflow / pipelines / runfolder.py
index 3eab735873cc9e0087b4e3bf9b0fa77ac60e13c3..a3b745717ca6e958b2ee0b579563c3b6e2024db5 100644 (file)
@@ -201,50 +201,6 @@ def get_runs(runfolder, flowcell_id=None):
     from htsworkflow.pipelines import bustard
     from htsworkflow.pipelines import gerald
 
-    def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname):
-        LOGGER.info("Looking for bustard directories in %s" % (pathname,))
-        bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
-        # RTA BaseCalls looks enough like Bustard.
-        bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
-        for bustard_pathname in bustard_dirs:
-            LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
-            b = bustard.bustard(bustard_pathname)
-            build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
-
-            build_aligned_runs(image_analysis, runs, b, datadir, runfolder)
-
-    def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
-        gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
-        LOGGER.info("Looking for gerald directories in %s" % (pathname,))
-        for gerald_pathname in glob(gerald_glob):
-            LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
-            try:
-                g = gerald.gerald(gerald_pathname)
-                p = PipelineRun(runfolder, flowcell_id)
-                p.datadir = datadir
-                p.image_analysis = image_analysis
-                p.bustard = b
-                p.gerald = g
-                runs.append(p)
-            except IOError, e:
-                LOGGER.error("Ignoring " + str(e))
-
-
-    def build_aligned_runs(image_analysis, runs, b, datadir, runfolder):
-        aligned_glob = os.path.join(runfolder, 'Aligned*')
-        for aligned in glob(aligned_glob):
-            LOGGER.info("Found aligned directory %s" % (aligned,))
-            try:
-                g = gerald.gerald(aligned)
-                p = PipelineRun(runfolder, flowcell_id)
-                p.datadir = datadir
-                p.image_analysis = image_analysis
-                p.bustard = b
-                p.gerald = g
-                runs.append(p)
-            except IOError, e:
-                LOGGER.error("Ignoring " + str(e))
-
     datadir = os.path.join(runfolder, 'Data')
 
     LOGGER.info('Searching for runs in ' + datadir)
@@ -259,7 +215,7 @@ def get_runs(runfolder, flowcell_id=None):
             )
         else:
             scan_post_image_analysis(
-                runs, runfolder, datadir, image_analysis, firecrest_pathname
+                runs, runfolder, datadir, image_analysis, firecrest_pathname, flowcell_id
             )
     # scan for IPAR directories
     ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
@@ -274,11 +230,111 @@ def get_runs(runfolder, flowcell_id=None):
             )
         else:
             scan_post_image_analysis(
-                runs, runfolder, datadir, image_analysis, ipar_pathname
+                runs, runfolder, datadir, image_analysis, ipar_pathname, flowcell_id
             )
 
     return runs
 
+def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
+                             pathname, flowcell_id):
+    added = build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id)
+    # If we're a multiplexed run, don't look for older run type.
+    if added > 0:
+        return
+
+    LOGGER.info("Looking for bustard directories in %s" % (pathname,))
+    bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
+    # RTA BaseCalls looks enough like Bustard.
+    bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
+    for bustard_pathname in bustard_dirs:
+        LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
+        b = bustard.bustard(bustard_pathname)
+        build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname,
+                          runfolder, flowcell_id)
+
+
+def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder,
+                      flowcell_id):
+    start = len(runs)
+    gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
+    LOGGER.info("Looking for gerald directories in %s" % (pathname,))
+    for gerald_pathname in glob(gerald_glob):
+        LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
+        try:
+            g = gerald.gerald(gerald_pathname)
+            p = PipelineRun(runfolder, flowcell_id)
+            p.datadir = datadir
+            p.image_analysis = image_analysis
+            p.bustard = b
+            p.gerald = g
+            runs.append(p)
+        except IOError, e:
+            LOGGER.error("Ignoring " + str(e))
+    return len(runs) - start
+
+
+def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id):
+    start = len(runs)
+    aligned_glob = os.path.join(runfolder, 'Aligned*')
+    unaligned_glob = os.path.join(runfolder, 'Unaligned*')
+
+    aligned_paths = glob(aligned_glob)
+    unaligned_paths = glob(unaligned_glob)
+
+    matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths)
+    LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths))
+
+    for aligned, unaligned in matched_paths:
+        if unaligned is None:
+            LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned)
+            continue
+
+        g = gerald.gerald(aligned)
+        print "scan for aligned then remove them from unaligned list"
+        try:
+            p = PipelineRun(runfolder, flowcell_id)
+            p.datadir = datadir
+            p.image_analysis = image_analysis
+            p.bustard = bustard.bustard(unaligned)
+            if aligned:
+                p.gerald = gerald.gerald(aligned)
+            runs.append(p)
+        except IOError, e:
+            LOGGER.error("Ignoring " + str(e))
+    return len(runs) - start
+
+def hiseq_match_aligned_unaligned(aligned, unaligned):
+    """Match aligned and unaligned folders from seperate lists
+    """
+    unaligned_suffix_re = re.compile('Unaligned(?P<suffix>[\w]*)')
+
+    aligned_by_suffix = build_dir_dict_by_suffix('Aligned', aligned)
+    unaligned_by_suffix = build_dir_dict_by_suffix('Unaligned', unaligned)
+
+    keys = set(aligned_by_suffix.keys()).union(set(unaligned_by_suffix.keys()))
+
+    matches = []
+    for key in keys:
+        a = aligned_by_suffix.get(key)
+        u = unaligned_by_suffix.get(key)
+        matches.append((a, u))
+    return matches
+
+def build_dir_dict_by_suffix(prefix, dirnames):
+    """Build a dictionary indexed by suffix of last directory name.
+
+    It assumes a constant prefix
+    """
+    regex = re.compile('%s(?P<suffix>[\w]*)' % (prefix,))
+
+    by_suffix = {}
+    for absname in dirnames:
+        basename = os.path.basename(absname)
+        match = regex.match(basename)
+        if match:
+            by_suffix[match.group('suffix')] = absname
+    return by_suffix
+
 def get_specific_run(gerald_dir):
     """
     Given a gerald directory, construct a PipelineRun out of its parents
@@ -560,7 +616,7 @@ def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
       q.run()
 
 
-def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
+def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
     """
     Iterate over runfolders in runs extracting the most useful information.
       * run parameters (in run-*.xml)
@@ -573,68 +629,75 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, r
         output_base_dir = os.getcwd()
 
     for r in runs:
-      result_dir = os.path.join(output_base_dir, r.flowcell_id)
-      LOGGER.info("Using %s as result directory" % (result_dir,))
-      if not os.path.exists(result_dir):
-        os.mkdir(result_dir)
-
-      # create cycle_dir
-      cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
-      LOGGER.info("Filling in %s" % (cycle,))
-      cycle_dir = os.path.join(result_dir, cycle)
-      cycle_dir = os.path.abspath(cycle_dir)
-      if os.path.exists(cycle_dir):
-        LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
-        continue
-      else:
-        os.mkdir(cycle_dir)
-
-      # save run file
-      r.save(cycle_dir)
-
-      # save illumina flowcell status report
-      save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
-
-      # save stuff from bustard
-      # grab IVC plot
-      save_ivc_plot(r.bustard, cycle_dir)
-
-      # build base call saving commands
-      if site is not None:
-        lanes = []
-        for lane in range(1, 9):
-          lane_parameters = r.gerald.lanes.get(lane, None)
-          if lane_parameters is not None and lane_parameters.analysis != 'none':
-            lanes.append(lane)
-
-        run_name = srf.pathname_to_run_name(r.pathname)
-        seq_cmds = []
-        LOGGER.info("Raw Format is: %s" % (raw_format, ))
-        if raw_format == 'fastq':
-            rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
-            LOGGER.info("raw data = %s" % (rawpath,))
-            srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
-        elif raw_format == 'qseq':
-            seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
-        elif raw_format == 'srf':
-            seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
+        result_dir = os.path.join(output_base_dir, r.flowcell_id)
+        LOGGER.info("Using %s as result directory" % (result_dir,))
+        if not os.path.exists(result_dir):
+            os.mkdir(result_dir)
+
+        # create cycle_dir
+        cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
+        LOGGER.info("Filling in %s" % (cycle,))
+        cycle_dir = os.path.join(result_dir, cycle)
+        cycle_dir = os.path.abspath(cycle_dir)
+        if os.path.exists(cycle_dir):
+            LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
+            continue
         else:
-            raise ValueError('Unknown --raw-format=%s' % (raw_format))
-        srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
+            os.mkdir(cycle_dir)
+
+        # save run file
+        r.save(cycle_dir)
+
+        # save illumina flowcell status report
+        save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
+                              cycle_dir)
+
+        # save stuff from bustard
+        # grab IVC plot
+        save_ivc_plot(r.bustard, cycle_dir)
 
-      # save stuff from GERALD
-      # copy stuff out of the main run
-      g = r.gerald
+        # build base call saving commands
+        if site is not None:
+            save_raw_data(num_jobs, r, site, raw_format, cycle_dir)
 
-      # save summary file
-      save_summary_file(r, cycle_dir)
+        # save stuff from GERALD
+        # copy stuff out of the main run
+        g = r.gerald
 
-      # compress eland result files
-      compress_eland_results(g, cycle_dir, num_jobs)
+        # save summary file
+        save_summary_file(r, cycle_dir)
 
-      # md5 all the compressed files once we're done
-      md5_commands = srf.make_md5_commands(cycle_dir)
-      srf.run_commands(cycle_dir, md5_commands, num_jobs)
+        # compress eland result files
+        compress_eland_results(g, cycle_dir, num_jobs)
+
+        # md5 all the compressed files once we're done
+        md5_commands = srf.make_md5_commands(cycle_dir)
+        srf.run_commands(cycle_dir, md5_commands, num_jobs)
+
+def save_raw_data(num_jobs, r, site, raw_format, cycle_dir):
+    lanes = []
+    for lane in r.gerald.lanes:
+        lane_parameters = r.gerald.lanes.get(lane, None)
+        if lane_parameters is not None:
+            lanes.append(lane)
+
+    run_name = srf.pathname_to_run_name(r.pathname)
+    seq_cmds = []
+    if raw_format is None:
+        raw_format = r.bustard.sequence_format
+
+    LOGGER.info("Raw Format is: %s" % (raw_format, ))
+    if raw_format == 'fastq':
+        rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
+        LOGGER.info("raw data = %s" % (rawpath,))
+        srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
+    elif raw_format == 'qseq':
+        seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
+    elif raw_format == 'srf':
+        seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
+    else:
+        raise ValueError('Unknown --raw-format=%s' % (raw_format))
+    srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
 
 def rm_list(files, dry_run=True):
     for f in files: