Progress using rdf model to link fastqs with flowcell/lib metadata.
authorDiane Trout <diane@caltech.edu>
Tue, 18 Sep 2012 18:20:26 +0000 (11:20 -0700)
committerDiane Trout <diane@caltech.edu>
Tue, 18 Sep 2012 18:20:26 +0000 (11:20 -0700)
I changed how I was using rdf:type -- the most raw data is now
a 'sequencer_result' and now there's a seperate file_type
attribute to indicate what kind of result file it is.

I renamed find_missing_targets to update_fastq_targets as
in addition to finding what fastqs we need to generate it'll
also download missing flowcell information.

I'm still having trouble fishing out the fastq files so this isn't
ready yet.

Finally minor tweaks to the soft file formatting to try
and get it to render everything without spurious spaces.

htsworkflow/pipelines/sequences.py
htsworkflow/pipelines/test/test_sequences.py
htsworkflow/submission/condorfastq.py
htsworkflow/submission/daf.py
htsworkflow/submission/test/test_condorfastq.py
htsworkflow/templates/geo_fastqs.sparql
htsworkflow/templates/geo_submission.soft

index 5baf1b5022a948b1022925737bc79cd3e425aafd..23e7fe8cfa401a52a52e936e91e3182311ed851f 100644 (file)
@@ -164,7 +164,7 @@ class SequenceFile(object):
         # a bit unreliable... assumes filesystem is encoded in utf-8
         path = os.path.abspath(self.path.encode('utf-8'))
         fileNode = RDF.Node(RDF.Uri('file://' + path))
-        add(model, fileNode, rdfNS['type'], libNS['raw_file'])
+        add(model, fileNode, rdfNS['type'], libNS['illumina_result'])
         add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
         add_lit(model, fileNode, libNS['lane_number'], self.lane)
         if self.read is not None:
@@ -177,7 +177,7 @@ class SequenceFile(object):
         add_lit(model, fileNode, libNS['split_id'], self.split)
         add_lit(model, fileNode, libNS['cycle'], self.cycle)
         add_lit(model, fileNode, libNS['passed_filter'], self.pf)
-        add(model, fileNode, rdfNS['type'], libNS[self.filetype])
+        add(model, fileNode, libNS['file_type'], libNS[self.filetype])
 
         if base_url is not None:
             flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
@@ -215,18 +215,15 @@ class SequenceFile(object):
 
         if not isinstance(seq_id, RDF.Node):
             seq_id = RDF.Node(RDF.Uri(seq_id))
-        seqTypesStmt = RDF.Statement(seq_id, rdfNS['type'], None)
-        seqTypes = model.find_statements(seqTypesStmt)
-        isSequenceFile = False
-        for s in seqTypes:
-            if s.object == libNS['raw_file']:
-                isSequenceFile = True
-            else:
-                seq_type = stripNamespace(libNS, s.object)
-
-        if not isSequenceFile:
+        result_statement = RDF.Statement(seq_id,
+                                         rdfNS['type'],
+                                         libNS['illumina_result'])
+        if not model.contains_statement(result_statement):
             raise KeyError(u"%s not found" % (unicode(seq_id),))
 
+        seq_type_node = model.get_target(seq_id, libNS['file_type'])
+        seq_type = stripNamespace(libNS, seq_type_node)
+
         path = urlparse(str(seq_id.uri)).path
         flowcellNode = get_one(seq_id, libNS['flowcell'])
         flowcell = get_one(seq_id, libNS['flowcell_id'])
@@ -421,7 +418,7 @@ def update_model_sequence_library(model, base_url):
     prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
     select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
     where {
-       ?filenode a libNS:raw_file ;
+       ?filenode a libNS:illumina_result ;
                  libNS:flowcell_id ?flowcell_id ;
                  libNS:lane_number ?lane_id .
        OPTIONAL { ?filenode libNS:flowcell ?flowcell . }
@@ -429,6 +426,7 @@ def update_model_sequence_library(model, base_url):
        OPTIONAL { ?filenode libNS:library_id ?library_id .}
     }
     """
+    LOGGER.debug("update_model_sequence_library query %s", file_body)
     file_query = RDF.SPARQLQuery(file_body)
     files = file_query.execute(model)
 
@@ -436,9 +434,13 @@ def update_model_sequence_library(model, base_url):
     flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
     for f in files:
         filenode = f['filenode']
+        LOGGER.debug("Updating file node %s", str(filenode))
         lane_id = fromTypedNode(f['lane_id'])
         if f['flowcell'] is None:
             flowcell = flowcellNS[str(f['flowcell_id'])+'/']
+            LOGGER.debug("Adding file (%s) to flowcell (%s) link",
+                         str(filenode),
+                         str(flowcell))
             model.add_statement(
                 RDF.Statement(filenode, libNS['flowcell'], flowcell))
         else:
@@ -452,6 +454,9 @@ def update_model_sequence_library(model, base_url):
                                                    flowcell,
                                                    lane_id)
                 library_id = toTypedNode(simplify_uri(library))
+                LOGGER.debug("Adding file (%s) to library (%s) link",
+                             str(filenode),
+                             str(library))
                 model.add_statement(
                     RDF.Statement(filenode, libNS['library_id'], library_id))
             if library is not None:
index d051c036ca2eddda6ffbbc1cc3f1c1f31bb5fcea..34ddeab593da5636386a7f0a7254a5e2af1b53e4 100644 (file)
@@ -344,13 +344,19 @@ class SequenceFileTests(unittest.TestCase):
             seq.save_to_model(model)
 
         files = list(model.find_statements(
-            RDF.Statement(None, rdfNS['type'], libraryOntology['raw_file'])))
+            RDF.Statement(None,
+                          rdfNS['type'],
+                          libraryOntology['illumina_result'])))
         self.assertEqual(len(files), 5)
         files = list(model.find_statements(
-            RDF.Statement(None, rdfNS['type'], libraryOntology['qseq'])))
+            RDF.Statement(None,
+                          libraryOntology['file_type'],
+                          libraryOntology['qseq'])))
         self.assertEqual(len(files), 4)
         files = list(model.find_statements(
-            RDF.Statement(None, rdfNS['type'], libraryOntology['split_fastq'])))
+            RDF.Statement(None,
+                          libraryOntology['file_type'],
+                          libraryOntology['split_fastq'])))
         self.assertEqual(len(files), 1)
 
         files = list(model.find_statements(
index e48afd5869b48a9bbc69059aa5ebcad90608312f..9aab790ef6fb6262e998b7ef1b4c2689144d5e88 100644 (file)
@@ -15,9 +15,8 @@ from htsworkflow.pipelines import srf2fastq
 from htsworkflow.pipelines import desplit_fastq
 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
      fromTypedNode, \
-     libraryOntology, \
-     stripNamespace, \
-     rdfNS
+     stripNamespace
+from htsworkflow.util.rdfns import *
 from htsworkflow.util.conversion import parse_flowcell_id
 
 from django.conf import settings
@@ -61,7 +60,6 @@ class CondorFastqExtract(object):
         template_map = {'srf': 'srf.condor',
                         'qseq': 'qseq.condor',
                         'split_fastq': 'split_fastq.condor',
-                        'by_sample': 'lane_to_fastq.turtle',
                         }
 
         env = None
@@ -91,9 +89,8 @@ class CondorFastqExtract(object):
                             'qseq': self.condor_qseq_to_fastq,
                             'split_fastq': self.condor_desplit_fastq
                             }
-        by_sample = {}
         sequences = self.find_archive_sequence_files(result_map)
-        needed_targets = self.find_missing_targets(result_map, sequences)
+        needed_targets = self.update_fastq_targets(result_map, sequences)
 
         for target_pathname, available_sources in needed_targets.items():
             LOGGER.debug(' target : %s' % (target_pathname,))
@@ -110,13 +107,9 @@ class CondorFastqExtract(object):
                 if sources is not None:
                     condor_entries.setdefault(condor_type, []).append(
                         conversion(sources, target_pathname))
-                    for s in sources:
-                        by_sample.setdefault(s.lane_number,[]).append(
-                            target_pathname)
             else:
                 print " need file", target_pathname
 
-        condor_entries['by_sample'] = by_sample
         return condor_entries
 
     def find_archive_sequence_files(self,  result_map):
@@ -127,8 +120,7 @@ class CondorFastqExtract(object):
         flowcell_ids = self.find_relavant_flowcell_ids()
         self.import_sequences(flowcell_ids)
 
-
-        query = RDF.SPARQLQuery("""
+        query_text = """
         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
@@ -145,19 +137,26 @@ class CondorFastqExtract(object):
                       libns:flowcell_id ?flowcell_id ;
                       libns:library ?library ;
                       libns:library_id ?library_id ;
-                      rdf:type ?filetype ;
-                      a libns:raw_file .
+                      libns:file_type ?filetype ;
+                      a libns:illumina_result .
             ?flowcell libns:read_length ?read_length ;
                       libns:flowcell_type ?flowcell_type .
             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
-            FILTER(?filetype != libns:raw_file)
+            FILTER(?filetype != libns:sequencer_result)
         }
-        """)
+        """
+        LOGGER.debug("find_archive_sequence_files query: %s",
+                     query_text)
+        query = RDF.SPARQLQuery(query_text)
         results = []
         for r in query.execute(self.model):
             library_id = fromTypedNode(r['library_id'])
             if library_id in result_map:
-                results.append(SequenceResult(r))
+                seq = SequenceResult(r)
+                LOGGER.debug("Creating sequence result for library %s: %s",
+                             library_id,
+                             repr(seq))
+                results.append(seq)
         return results
 
     def import_libraries(self, result_map):
@@ -171,8 +170,11 @@ class CondorFastqExtract(object):
         """Import library data into our model if we don't have it already
         """
         q = RDF.Statement(library, rdfNS['type'], libraryOntology['library'])
+        present = False
         if not self.model.contains_statement(q):
+            present = True
             load_into_model(self.model, 'rdfa', library)
+        LOGGER.debug("Did we import %s: %s", library, present)
 
     def find_relavant_flowcell_ids(self):
         """Generate set of flowcell ids that had samples of interest on them
@@ -200,7 +202,6 @@ WHERE {
         return flowcell_ids
 
     def import_sequences(self, flowcell_ids):
-
         seq_dirs = []
         for f in flowcell_ids:
             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
@@ -209,13 +210,11 @@ WHERE {
             seq.save_to_model(self.model, self.host)
         update_model_sequence_library(self.model, self.host)
 
-    def find_missing_targets(self, result_map, raw_files):
-        """
-        Check if the sequence file exists.
-        This requires computing what the sequence name is and checking
-        to see if it can be found in the sequence location.
+    def update_fastq_targets(self, result_map, raw_files):
+        """Return list of fastq files that need to be built.
 
-        Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
+        Also update model with link between illumina result files
+        and our target fastq file.
         """
         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
@@ -244,9 +243,18 @@ WHERE {
             if self.force or not os.path.exists(target_pathname):
                 t = needed_targets.setdefault(target_pathname, {})
                 t.setdefault(seq.filetype, []).append(seq)
-
+            self.add_target_source_links(target_pathname, seq)
         return needed_targets
 
+    def add_target_source_links(self, target, seq):
+        """Add link between target pathname and the 'lane' that produced it
+        (note lane objects are now post demultiplexing.)
+        """
+        target_uri = 'file://' + target
+        target_node = RDF.Node(RDF.Uri(target_uri))
+        source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
+        self.model.add_statement(source_stmt)
+
     def condor_srf_to_fastq(self, sources, target_pathname):
         if len(sources) > 1:
             raise ValueError("srf to fastq can only handle one file")
@@ -350,3 +358,9 @@ class SequenceResult(object):
             errmsg = u"Unsupported scheme {0} for {1}"
             raise ValueError(errmsg.format(url.scheme, unicode(url)))
     path = property(_get_path)
+
+    def __repr__(self):
+        return "SequenceResult({0},{1},{2})".format(
+            str(self.filenode),
+            str(self.library_id),
+            str(self.flowcell_id))
index 09b285b095ee2a15cfabee5bc5de3d866522c4f9..f04ac8fe5328e738a012738a5d256b9307e11504 100644 (file)
@@ -385,8 +385,8 @@ class UCSCSubmission(object):
     def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
         # add file specific information
         LOGGER.debug("Updating file md5sum")
-        fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
         submission_pathname = os.path.join(submission_dir, filename)
+        fileNode = RDF.Node(RDF.Uri("file://" + submission_pathname))
         self.model.add_statement(
             RDF.Statement(submissionView,
                           dafTermOntology['has_file'],
index 24d6e4982018dd23ac8e8746d7ade1856ae3d4e7..94df7b60b2f1e7dbca6d5e2fc2ecbacded55f30a 100644 (file)
@@ -441,7 +441,7 @@ class TestCondorFastq(unittest.TestCase):
     def test_find_needed_targets(self):
         lib_db = self.extract.find_archive_sequence_files(self.result_map)
 
-        needed_targets = self.extract.find_missing_targets(self.result_map,
+        needed_targets = self.extract.update_fastq_targets(self.result_map,
                                                            lib_db)
         self.assertEqual(len(needed_targets), 9)
         srf_30221 = needed_targets[
index e7fcbc12d0f8337e099a8bbe85409ad2e8c18432..de9097ba9962849dde661bbd9ebd38521573f781 100644 (file)
@@ -11,7 +11,7 @@ WHERE {
 
   ?file ucscDaf:filename ?filename ;
         ucscDaf:md5sum ?md5sum ;
-        libraryOntology:has_lane ?lane ;
+        libraryOntology:library ?library ;
         a ?file_type .
   ?file_type a <{{file_class}}> ;
              geoSoft:fileTypeLabel ?file_type_label .
index 969ff53836bd37b3d4f82747cf3d94c806852ca9..8ae3ed9ef78e0d737f1d7cada296a740e2ff553d 100644 (file)
@@ -1,5 +1,6 @@
-{% for name, value in series %}{{name}}={{value}}
-{% endfor %}{% for row in samples %}^SAMPLE={{row.name}}
+{% for name, value in series %}
+{{name}}={{value}}{% endfor %}{% for row in samples %}
+^SAMPLE={{row.name}}
 !Sample_type=SRA
 !Sample_title={{row.name}}
 !Sample_series_id={{ series_id }}
 !Sample_extract_protocol={{ row.extractProtocol|safe }}
 !Sample_data_processing={{ row.dataProtocol|safe }}
 !Sample_molecule_ch1={{ row.extractMolecule }}
-!Sample_characteristics_ch1=labExpId: {{ row.library_id }}
-!Sample_characteristics_ch1=replicate: {{ row.replicate }}
-{% if row.cell %}{% spaceless %}
-!Sample_characteristics_ch1=cell: {{ row.cell }}
-{% endspaceless %}{% endif %}
-{% if row.readType %}{% spaceless %}
-!Sample_characteristics_ch1=readType: {{ row.readType }}
-{% endspaceless %}{% endif %}{% if row.antibody %}{% spaceless %}
-!Sample_characteristics_ch1=cell: {{ row.antibody }}
-{% endspaceless %}{% endif %}{% for run in row.run %}
-!Sample_characteristics_ch1=Illumina image processing pipeline version: {{ run.image_software }}-{{ run.image_version }}
-!Sample_characteristics_ch1=Illumina base-calling pipeline version: {{ run.image_software }}-{{ run.image_version }}{% endfor %}{% for raw in row.raw %}
+!Sample_characteristics_ch1=labExpId: {{ row.library_id }}{% if row.replicate %}
+!Sample_characteristics_ch1=replicate: {{ row.replicate }}{% endif %}{% if row.cell %}
+!Sample_characteristics_ch1=cell: {{ row.cell }}{% endif %}{% if row.readType %}
+!Sample_characteristics_ch1=readType: {{ row.readType }}{% endif %}{% if row.antibody %}
+!Sample_characteristics_ch1=cell: {{ row.antibody }}{% endif %}{% for run in row.run %}{% if run.image_software %}
+!Sample_characteristics_ch1=Illumina image processing pipeline version: {{ run.image_software }}-{{ run.image_version }}{% endif %}{% if run.basecall_software %}
+!Sample_characteristics_ch1=Illumina base-calling pipeline version: {{ run.basecall_software }}-{{ run.basecall_version }}{% endif %}{% endfor %}{% for raw in row.raw %}
 !Sample_raw_file_{{forloop.counter}}={{ raw.filename }}
 !Sample_raw_file_type_{{forloop.counter}}={{raw.file_type_label}}
 !Sample_raw_file_insert_size_{{forloop.counter}}={{ row.insertLength }}