Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
index 20afdfd0ca16f5353f3ee6ffeacd90158931b7e9..8513f7b2da2b266803f12ec32acf650242fb165e 100644 (file)
@@ -2,15 +2,20 @@
 """
 import logging
 import os
+from pprint import pformat
 import sys
 import types
 
 from htsworkflow.pipelines.sequences import scan_for_sequences
 from htsworkflow.pipelines import qseq2fastq
 from htsworkflow.pipelines import srf2fastq
+from htsworkflow.pipelines import desplit_fastq
 from htsworkflow.util.api import HtswApi
 from htsworkflow.util.conversion import parse_flowcell_id
 
+from django.conf import settings
+from django.template import Context, loader
+
 LOGGER = logging.getLogger(__name__)
 
 class CondorFastqExtract(object):
@@ -31,79 +36,64 @@ class CondorFastqExtract(object):
         self.log_path = log_path
         self.force = force
 
-    def build_fastqs(self, library_result_map ):
+    def create_scripts(self, result_map ):
         """
         Generate condor scripts to build any needed fastq files
 
         Args:
-          library_result_map (list):  [(library_id, destination directory), ...]
+          result_map: htsworkflow.submission.results.ResultMap()
         """
-        qseq_condor_header = self.get_qseq_condor_header()
-        qseq_condor_entries = []
-        srf_condor_header = self.get_srf_condor_header()
-        srf_condor_entries = []
-        lib_db = self.find_archive_sequence_files(library_result_map)
-
-        needed_targets = self.find_missing_targets(library_result_map, lib_db)
+        template_map = {'srf': 'srf.condor',
+                        'qseq': 'qseq.condor',
+                        'split_fastq': 'split_fastq.condor'}
+
+        condor_entries = self.build_condor_arguments(result_map)
+        for script_type in template_map.keys():
+            template = loader.get_template(template_map[script_type])
+            variables = {'python': sys.executable,
+                         'logdir': self.log_path,
+                         'env': os.environ.get('PYTHONPATH', None),
+                         'args': condor_entries[script_type],
+                         }
+
+            context = Context(variables)
+
+            with open(script_type + '.condor','w+') as outstream:
+                outstream.write(template.render(context))
+
+    def build_condor_arguments(self, result_map):
+        condor_entries = {'srf': [],
+                          'qseq': [],
+                          'split_fastq': []}
+        conversion_funcs = {'srf': self.condor_srf_to_fastq,
+                            'qseq': self.condor_qseq_to_fastq,
+                            'split_fastq': self.condor_desplit_fastq
+                            }
+
+        lib_db = self.find_archive_sequence_files(result_map)
+        needed_targets = self.find_missing_targets(result_map, lib_db)
 
         for target_pathname, available_sources in needed_targets.items():
             LOGGER.debug(' target : %s' % (target_pathname,))
             LOGGER.debug(' candidate sources: %s' % (available_sources,))
-            if available_sources.has_key('qseq'):
-                source = available_sources['qseq']
-                qseq_condor_entries.append(
-                    self.condor_qseq_to_fastq(source.path,
-                                              target_pathname,
-                                              source.flowcell)
-                )
-            elif available_sources.has_key('srf'):
-                source = available_sources['srf']
-                mid = getattr(source, 'mid_point', None)
-                srf_condor_entries.append(
-                    self.condor_srf_to_fastq(source.path,
-                                             target_pathname,
-                                             source.paired,
-                                             source.flowcell,
-                                             mid)
-                )
+            for condor_type in available_sources.keys():
+                conversion = conversion_funcs.get(condor_type, None)
+                if conversion is None:
+                    errmsg = "Unrecognized type: {0} for {1}"
+                    print errmsg.format(condor_type,
+                                        pformat(available_sources))
+                    continue
+                sources = available_sources.get(condor_type, None)
+
+                if sources is not None:
+                    condor_entries.setdefault(condor_type, []).append(
+                        conversion(sources, target_pathname))
             else:
                 print " need file", target_pathname
 
-        if len(srf_condor_entries) > 0:
-            make_submit_script('srf.fastq.condor',
-                               srf_condor_header,
-                               srf_condor_entries)
-
-        if len(qseq_condor_entries) > 0:
-            make_submit_script('qseq.fastq.condor',
-                               qseq_condor_header,
-                               qseq_condor_entries)
-
-
-    def get_qseq_condor_header(self):
-        return """Universe=vanilla
-executable=%(exe)s
-error=%(log)s/qseq2fastq.$(process).out
-output=%(log)s/qseq2fastq.$(process).out
-log=%(log)s/qseq2fastq.log
-
-""" % {'exe': sys.executable,
-       'log': self.log_path }
-
-    def get_srf_condor_header(self):
-        return """Universe=vanilla
-executable=%(exe)s
-output=%(log)s/srf_pair_fastq.$(process).out
-error=%(log)s/srf_pair_fastq.$(process).out
-log=%(log)s/srf_pair_fastq.log
-environment="PYTHONPATH=%(env)s"
-
-""" % {'exe': sys.executable,
-           'log': self.log_path,
-           'env': os.environ.get('PYTHONPATH', '')
-      }
-
-    def find_archive_sequence_files(self,  library_result_map):
+        return condor_entries
+
+    def find_archive_sequence_files(self,  result_map):
         """
         Find archived sequence files associated with our results.
         """
@@ -112,7 +102,7 @@ environment="PYTHONPATH=%(env)s"
         lib_db = {}
         seq_dirs = set()
         candidate_lanes = {}
-        for lib_id, result_dir in library_result_map:
+        for lib_id in result_map.keys():
             lib_info = self.api.get_library(lib_id)
             lib_info['lanes'] = {}
             lib_db[lib_id] = lib_info
@@ -139,7 +129,7 @@ environment="PYTHONPATH=%(env)s"
 
         return lib_db
 
-    def find_missing_targets(self, library_result_map, lib_db):
+    def find_missing_targets(self, result_map, lib_db):
         """
         Check if the sequence file exists.
         This requires computing what the sequence name is and checking
@@ -151,7 +141,8 @@ environment="PYTHONPATH=%(env)s"
         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
         # find what targets we're missing
         needed_targets = {}
-        for lib_id, result_dir in library_result_map:
+        for lib_id in result_map.keys():
+            result_dir = result_map[lib_id]
             lib = lib_db[lib_id]
             lane_dict = make_lane_dict(lib_db, lib_id)
 
@@ -179,92 +170,60 @@ environment="PYTHONPATH=%(env)s"
 
                     # end filters
                     if seq.paired:
-                        target_name = fastq_paired_template % filename_attributes
+                        target_name = fastq_paired_template % \
+                                      filename_attributes
                     else:
-                        target_name = fastq_single_template % filename_attributes
+                        target_name = fastq_single_template % \
+                                      filename_attributes
 
                     target_pathname = os.path.join(result_dir, target_name)
                     if self.force or not os.path.exists(target_pathname):
                         t = needed_targets.setdefault(target_pathname, {})
-                        t[seq.filetype] = seq
+                        t.setdefault(seq.filetype, []).append(seq)
 
         return needed_targets
 
 
-    def condor_srf_to_fastq(self,
-                            srf_file,
-                            target_pathname,
-                            paired,
-                            flowcell=None,
-                            mid=None):
-        py = srf2fastq.__file__
-        args = [ py, srf_file, '--verbose']
-        if paired:
-            args.extend(['--left', target_pathname])
-            # this is ugly. I did it because I was pregenerating the target
-            # names before I tried to figure out what sources could generate
-            # those targets, and everything up to this point had been
-            # one-to-one. So I couldn't figure out how to pair the
-            # target names.
-            # With this at least the command will run correctly.
-            # however if we rename the default targets, this'll break
-            # also I think it'll generate it twice.
-            args.extend(['--right',
-                         target_pathname.replace('_r1.fastq', '_r2.fastq')])
-        else:
-            args.extend(['--single', target_pathname ])
-        if flowcell is not None:
-            args.extend(['--flowcell', flowcell])
-
-        if mid is not None:
-            args.extend(['-m', str(mid)])
-
-        if self.force:
-            args.extend(['--force'])
-
-        script = """arguments="%s"
-queue
-""" % (" ".join(args),)
-
-        return  script
-
-
-    def condor_qseq_to_fastq(self, qseq_file, target_pathname, flowcell=None):
-        py = qseq2fastq.__file__
-        args = [py, '-i', qseq_file, '-o', target_pathname ]
-        if flowcell is not None:
-            args.extend(['-f', flowcell])
-        script = """arguments="%s"
-queue
-""" % (" ".join(args))
-
-        return script
-
-def make_submit_script(target, header, body_list):
-    """
-    write out a text file
-
-    this was intended for condor submit scripts
-
-    Args:
-      target (str or stream):
-        if target is a string, we will open and close the file
-        if target is a stream, the caller is responsible.
-
-      header (str);
-        header to write at the beginning of the file
-      body_list (list of strs):
-        a list of blocks to add to the file.
-    """
-    if type(target) in types.StringTypes:
-        f = open(target,"w")
-    else:
-        f = target
-    f.write(header)
-    for entry in body_list:
-        f.write(entry)
-    if type(target) in types.StringTypes:
-        f.close()
+    def condor_srf_to_fastq(self, sources, target_pathname):
+        if len(sources) > 1:
+            raise ValueError("srf to fastq can only handle one file")
+
+        return {
+            'sources': [os.path.abspath(sources[0].path)],
+            'pyscript': srf2fastq.__file__,
+            'flowcell': sources[0].flowcell,
+            'ispaired': sources[0].paired,
+            'target': target_pathname,
+            'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
+            'mid': getattr(sources[0], 'mid_point', None),
+            'force': self.force,
+        }
+
+    def condor_qseq_to_fastq(self, sources, target_pathname):
+        paths = []
+        for source in sources:
+            paths.append(source.path)
+        paths.sort()
+        return {
+            'pyscript': qseq2fastq.__file__,
+            'flowcell': sources[0].flowcell,
+            'target': target_pathname,
+            'sources': paths,
+            'ispaired': sources[0].paired,
+            'istar': len(sources) == 1,
+        }
+
+    def condor_desplit_fastq(self, sources, target_pathname):
+        paths = []
+        for source in sources:
+            paths.append(source.path)
+        paths.sort()
+        return {
+            'pyscript': desplit_fastq.__file__,
+            'target': target_pathname,
+            'sources': paths,
+            'ispaired': sources[0].paired,
+        }
 
 def make_lane_dict(lib_db, lib_id):
     """