Directly generate compressed fastq files from HiSeq split fastqs.
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
index d79502d2af7052a06e871cc0520af2e056b7ec7f..37d60edf9572ff8c51e6328b88ecc53467496ae8 100644 (file)
@@ -13,9 +13,10 @@ from htsworkflow.pipelines.samplekey import SampleKey
 from htsworkflow.pipelines import qseq2fastq
 from htsworkflow.pipelines import srf2fastq
 from htsworkflow.pipelines import desplit_fastq
+from htsworkflow.submission.fastqname import FastqName
 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
      fromTypedNode, \
-     stripNamespace
+     strip_namespace
 from htsworkflow.util.rdfns import *
 from htsworkflow.util.conversion import parse_flowcell_id
 
@@ -26,11 +27,16 @@ import RDF
 
 LOGGER = logging.getLogger(__name__)
 
+COMPRESSION_EXTENSIONS = {
+    None: '',
+    'gzip': '.gz'
+}
 
 class CondorFastqExtract(object):
     def __init__(self, host, sequences_path,
                  log_path='log',
                  model=None,
+                 compression=None,
                  force=False):
         """Extract fastqs from results archive
 
@@ -39,16 +45,19 @@ class CondorFastqExtract(object):
           apidata (dict): id & key to post to the server
           sequences_path (str): root of the directory tree to scan for files
           log_path (str): where to put condor log files
+          compression (str): one of 'gzip', 'bzip2'
           force (bool): do we force overwriting current files?
         """
         self.host = host
         self.model = get_model(model)
         self.sequences_path = sequences_path
         self.log_path = log_path
+        self.compression=compression
         self.force = force
         LOGGER.info("CondorFastq host={0}".format(self.host))
         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
+        LOGGER.info("Compression {0}".format(self.compression))
 
     def create_scripts(self, result_map ):
         """
@@ -219,8 +228,6 @@ WHERE {
         Also update model with link between illumina result files
         and our target fastq file.
         """
-        fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
-        fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
         # find what targets we're missing
         needed_targets = {}
         for seq in raw_files:
@@ -231,18 +238,15 @@ WHERE {
                 'lib_id': seq.library_id,
                 'lane': seq.lane_number,
                 'read': seq.read,
-                'cycle': seq.cycle
+                'cycle': seq.cycle,
+                'compression_extension': COMPRESSION_EXTENSIONS[self.compression],
+                'is_paired': seq.ispaired
             }
 
-            if seq.ispaired:
-                target_name = fastq_paired_template % \
-                              filename_attributes
-            else:
-                target_name = fastq_single_template % \
-                              filename_attributes
+            fqName = FastqName(**filename_attributes)
 
             result_dir = result_map[seq.library_id]
-            target_pathname = os.path.join(result_dir, target_name)
+            target_pathname = os.path.join(result_dir, fqName.filename)
             if self.force or not os.path.exists(target_pathname):
                 t = needed_targets.setdefault(target_pathname, {})
                 t.setdefault(seq.filetype, []).append(seq)
@@ -296,9 +300,14 @@ WHERE {
         for source in sources:
             paths.append(source.path)
         paths.sort()
+        compression_argument = ''
+        if self.compression:
+            compression_argument = '--'+self.compression
+
         return {
             'pyscript': desplit_fastq.__file__,
             'target': target_pathname,
+            'compression': compression_argument,
             'sources': paths,
             'ispaired': sources[0].ispaired,
         }
@@ -350,7 +359,7 @@ class SequenceResult(object):
     ispaired = property(_get_ispaired)
 
     def _get_filetype(self):
-        return stripNamespace(libraryOntology, self._filetype)
+        return strip_namespace(libraryOntology, self._filetype)
     filetype = property(_get_filetype)
 
     def _get_path(self):