From 8096174ac7a288f0f2198c90286d5ddab8c59731 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Tue, 18 Sep 2012 11:20:26 -0700 Subject: [PATCH] Progress using rdf model to link fastqs with flowcell/lib metadata. I changed how I was using rdf:type -- the most raw data is now a 'sequencer_result' and now there's a seperate file_type attribute to indicate what kind of result file it is. I renamed find_missing_targets to update_fastq_targets as in addition to finding what fastqs we need to generate it'll also download missing flowcell information. I'm still having trouble fishing out the fastq files so this isn't ready yet. Finally minor tweaks to the soft file formatting to try and get it to render everything without spurious spaces. --- htsworkflow/pipelines/sequences.py | 31 +++++---- htsworkflow/pipelines/test/test_sequences.py | 12 +++- htsworkflow/submission/condorfastq.py | 64 +++++++++++-------- htsworkflow/submission/daf.py | 2 +- .../submission/test/test_condorfastq.py | 2 +- htsworkflow/templates/geo_fastqs.sparql | 2 +- htsworkflow/templates/geo_submission.soft | 24 +++---- 7 files changed, 79 insertions(+), 58 deletions(-) diff --git a/htsworkflow/pipelines/sequences.py b/htsworkflow/pipelines/sequences.py index 5baf1b5..23e7fe8 100644 --- a/htsworkflow/pipelines/sequences.py +++ b/htsworkflow/pipelines/sequences.py @@ -164,7 +164,7 @@ class SequenceFile(object): # a bit unreliable... assumes filesystem is encoded in utf-8 path = os.path.abspath(self.path.encode('utf-8')) fileNode = RDF.Node(RDF.Uri('file://' + path)) - add(model, fileNode, rdfNS['type'], libNS['raw_file']) + add(model, fileNode, rdfNS['type'], libNS['illumina_result']) add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell) add_lit(model, fileNode, libNS['lane_number'], self.lane) if self.read is not None: @@ -177,7 +177,7 @@ class SequenceFile(object): add_lit(model, fileNode, libNS['split_id'], self.split) add_lit(model, fileNode, libNS['cycle'], self.cycle) add_lit(model, fileNode, libNS['passed_filter'], self.pf) - add(model, fileNode, rdfNS['type'], libNS[self.filetype]) + add(model, fileNode, libNS['file_type'], libNS[self.filetype]) if base_url is not None: flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format( @@ -215,18 +215,15 @@ class SequenceFile(object): if not isinstance(seq_id, RDF.Node): seq_id = RDF.Node(RDF.Uri(seq_id)) - seqTypesStmt = RDF.Statement(seq_id, rdfNS['type'], None) - seqTypes = model.find_statements(seqTypesStmt) - isSequenceFile = False - for s in seqTypes: - if s.object == libNS['raw_file']: - isSequenceFile = True - else: - seq_type = stripNamespace(libNS, s.object) - - if not isSequenceFile: + result_statement = RDF.Statement(seq_id, + rdfNS['type'], + libNS['illumina_result']) + if not model.contains_statement(result_statement): raise KeyError(u"%s not found" % (unicode(seq_id),)) + seq_type_node = model.get_target(seq_id, libNS['file_type']) + seq_type = stripNamespace(libNS, seq_type_node) + path = urlparse(str(seq_id.uri)).path flowcellNode = get_one(seq_id, libNS['flowcell']) flowcell = get_one(seq_id, libNS['flowcell_id']) @@ -421,7 +418,7 @@ def update_model_sequence_library(model, base_url): prefix libNS: select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library where { - ?filenode a libNS:raw_file ; + ?filenode a libNS:illumina_result ; libNS:flowcell_id ?flowcell_id ; libNS:lane_number ?lane_id . OPTIONAL { ?filenode libNS:flowcell ?flowcell . } @@ -429,6 +426,7 @@ def update_model_sequence_library(model, base_url): OPTIONAL { ?filenode libNS:library_id ?library_id .} } """ + LOGGER.debug("update_model_sequence_library query %s", file_body) file_query = RDF.SPARQLQuery(file_body) files = file_query.execute(model) @@ -436,9 +434,13 @@ def update_model_sequence_library(model, base_url): flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/')) for f in files: filenode = f['filenode'] + LOGGER.debug("Updating file node %s", str(filenode)) lane_id = fromTypedNode(f['lane_id']) if f['flowcell'] is None: flowcell = flowcellNS[str(f['flowcell_id'])+'/'] + LOGGER.debug("Adding file (%s) to flowcell (%s) link", + str(filenode), + str(flowcell)) model.add_statement( RDF.Statement(filenode, libNS['flowcell'], flowcell)) else: @@ -452,6 +454,9 @@ def update_model_sequence_library(model, base_url): flowcell, lane_id) library_id = toTypedNode(simplify_uri(library)) + LOGGER.debug("Adding file (%s) to library (%s) link", + str(filenode), + str(library)) model.add_statement( RDF.Statement(filenode, libNS['library_id'], library_id)) if library is not None: diff --git a/htsworkflow/pipelines/test/test_sequences.py b/htsworkflow/pipelines/test/test_sequences.py index d051c03..34ddeab 100644 --- a/htsworkflow/pipelines/test/test_sequences.py +++ b/htsworkflow/pipelines/test/test_sequences.py @@ -344,13 +344,19 @@ class SequenceFileTests(unittest.TestCase): seq.save_to_model(model) files = list(model.find_statements( - RDF.Statement(None, rdfNS['type'], libraryOntology['raw_file']))) + RDF.Statement(None, + rdfNS['type'], + libraryOntology['illumina_result']))) self.assertEqual(len(files), 5) files = list(model.find_statements( - RDF.Statement(None, rdfNS['type'], libraryOntology['qseq']))) + RDF.Statement(None, + libraryOntology['file_type'], + libraryOntology['qseq']))) self.assertEqual(len(files), 4) files = list(model.find_statements( - RDF.Statement(None, rdfNS['type'], libraryOntology['split_fastq']))) + RDF.Statement(None, + libraryOntology['file_type'], + libraryOntology['split_fastq']))) self.assertEqual(len(files), 1) files = list(model.find_statements( diff --git a/htsworkflow/submission/condorfastq.py b/htsworkflow/submission/condorfastq.py index e48afd5..9aab790 100644 --- a/htsworkflow/submission/condorfastq.py +++ b/htsworkflow/submission/condorfastq.py @@ -15,9 +15,8 @@ from htsworkflow.pipelines import srf2fastq from htsworkflow.pipelines import desplit_fastq from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \ fromTypedNode, \ - libraryOntology, \ - stripNamespace, \ - rdfNS + stripNamespace +from htsworkflow.util.rdfns import * from htsworkflow.util.conversion import parse_flowcell_id from django.conf import settings @@ -61,7 +60,6 @@ class CondorFastqExtract(object): template_map = {'srf': 'srf.condor', 'qseq': 'qseq.condor', 'split_fastq': 'split_fastq.condor', - 'by_sample': 'lane_to_fastq.turtle', } env = None @@ -91,9 +89,8 @@ class CondorFastqExtract(object): 'qseq': self.condor_qseq_to_fastq, 'split_fastq': self.condor_desplit_fastq } - by_sample = {} sequences = self.find_archive_sequence_files(result_map) - needed_targets = self.find_missing_targets(result_map, sequences) + needed_targets = self.update_fastq_targets(result_map, sequences) for target_pathname, available_sources in needed_targets.items(): LOGGER.debug(' target : %s' % (target_pathname,)) @@ -110,13 +107,9 @@ class CondorFastqExtract(object): if sources is not None: condor_entries.setdefault(condor_type, []).append( conversion(sources, target_pathname)) - for s in sources: - by_sample.setdefault(s.lane_number,[]).append( - target_pathname) else: print " need file", target_pathname - condor_entries['by_sample'] = by_sample return condor_entries def find_archive_sequence_files(self, result_map): @@ -127,8 +120,7 @@ class CondorFastqExtract(object): flowcell_ids = self.find_relavant_flowcell_ids() self.import_sequences(flowcell_ids) - - query = RDF.SPARQLQuery(""" + query_text = """ prefix libns: prefix rdf: prefix xsd: @@ -145,19 +137,26 @@ class CondorFastqExtract(object): libns:flowcell_id ?flowcell_id ; libns:library ?library ; libns:library_id ?library_id ; - rdf:type ?filetype ; - a libns:raw_file . + libns:file_type ?filetype ; + a libns:illumina_result . ?flowcell libns:read_length ?read_length ; libns:flowcell_type ?flowcell_type . OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status } - FILTER(?filetype != libns:raw_file) + FILTER(?filetype != libns:sequencer_result) } - """) + """ + LOGGER.debug("find_archive_sequence_files query: %s", + query_text) + query = RDF.SPARQLQuery(query_text) results = [] for r in query.execute(self.model): library_id = fromTypedNode(r['library_id']) if library_id in result_map: - results.append(SequenceResult(r)) + seq = SequenceResult(r) + LOGGER.debug("Creating sequence result for library %s: %s", + library_id, + repr(seq)) + results.append(seq) return results def import_libraries(self, result_map): @@ -171,8 +170,11 @@ class CondorFastqExtract(object): """Import library data into our model if we don't have it already """ q = RDF.Statement(library, rdfNS['type'], libraryOntology['library']) + present = False if not self.model.contains_statement(q): + present = True load_into_model(self.model, 'rdfa', library) + LOGGER.debug("Did we import %s: %s", library, present) def find_relavant_flowcell_ids(self): """Generate set of flowcell ids that had samples of interest on them @@ -200,7 +202,6 @@ WHERE { return flowcell_ids def import_sequences(self, flowcell_ids): - seq_dirs = [] for f in flowcell_ids: seq_dirs.append(os.path.join(self.sequences_path, str(f))) @@ -209,13 +210,11 @@ WHERE { seq.save_to_model(self.model, self.host) update_model_sequence_library(self.model, self.host) - def find_missing_targets(self, result_map, raw_files): - """ - Check if the sequence file exists. - This requires computing what the sequence name is and checking - to see if it can be found in the sequence location. + def update_fastq_targets(self, result_map, raw_files): + """Return list of fastq files that need to be built. - Adds seq.paired flag to sequences listed in lib_db[*]['lanes'] + Also update model with link between illumina result files + and our target fastq file. """ fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq' fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq' @@ -244,9 +243,18 @@ WHERE { if self.force or not os.path.exists(target_pathname): t = needed_targets.setdefault(target_pathname, {}) t.setdefault(seq.filetype, []).append(seq) - + self.add_target_source_links(target_pathname, seq) return needed_targets + def add_target_source_links(self, target, seq): + """Add link between target pathname and the 'lane' that produced it + (note lane objects are now post demultiplexing.) + """ + target_uri = 'file://' + target + target_node = RDF.Node(RDF.Uri(target_uri)) + source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode) + self.model.add_statement(source_stmt) + def condor_srf_to_fastq(self, sources, target_pathname): if len(sources) > 1: raise ValueError("srf to fastq can only handle one file") @@ -350,3 +358,9 @@ class SequenceResult(object): errmsg = u"Unsupported scheme {0} for {1}" raise ValueError(errmsg.format(url.scheme, unicode(url))) path = property(_get_path) + + def __repr__(self): + return "SequenceResult({0},{1},{2})".format( + str(self.filenode), + str(self.library_id), + str(self.flowcell_id)) diff --git a/htsworkflow/submission/daf.py b/htsworkflow/submission/daf.py index 09b285b..f04ac8f 100644 --- a/htsworkflow/submission/daf.py +++ b/htsworkflow/submission/daf.py @@ -385,8 +385,8 @@ class UCSCSubmission(object): def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir): # add file specific information LOGGER.debug("Updating file md5sum") - fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename)) submission_pathname = os.path.join(submission_dir, filename) + fileNode = RDF.Node(RDF.Uri("file://" + submission_pathname)) self.model.add_statement( RDF.Statement(submissionView, dafTermOntology['has_file'], diff --git a/htsworkflow/submission/test/test_condorfastq.py b/htsworkflow/submission/test/test_condorfastq.py index 24d6e49..94df7b6 100644 --- a/htsworkflow/submission/test/test_condorfastq.py +++ b/htsworkflow/submission/test/test_condorfastq.py @@ -441,7 +441,7 @@ class TestCondorFastq(unittest.TestCase): def test_find_needed_targets(self): lib_db = self.extract.find_archive_sequence_files(self.result_map) - needed_targets = self.extract.find_missing_targets(self.result_map, + needed_targets = self.extract.update_fastq_targets(self.result_map, lib_db) self.assertEqual(len(needed_targets), 9) srf_30221 = needed_targets[ diff --git a/htsworkflow/templates/geo_fastqs.sparql b/htsworkflow/templates/geo_fastqs.sparql index e7fcbc1..de9097b 100644 --- a/htsworkflow/templates/geo_fastqs.sparql +++ b/htsworkflow/templates/geo_fastqs.sparql @@ -11,7 +11,7 @@ WHERE { ?file ucscDaf:filename ?filename ; ucscDaf:md5sum ?md5sum ; - libraryOntology:has_lane ?lane ; + libraryOntology:library ?library ; a ?file_type . ?file_type a <{{file_class}}> ; geoSoft:fileTypeLabel ?file_type_label . diff --git a/htsworkflow/templates/geo_submission.soft b/htsworkflow/templates/geo_submission.soft index 969ff53..8ae3ed9 100644 --- a/htsworkflow/templates/geo_submission.soft +++ b/htsworkflow/templates/geo_submission.soft @@ -1,5 +1,6 @@ -{% for name, value in series %}{{name}}={{value}} -{% endfor %}{% for row in samples %}^SAMPLE={{row.name}} +{% for name, value in series %} +{{name}}={{value}}{% endfor %}{% for row in samples %} +^SAMPLE={{row.name}} !Sample_type=SRA !Sample_title={{row.name}} !Sample_series_id={{ series_id }} @@ -14,18 +15,13 @@ !Sample_extract_protocol={{ row.extractProtocol|safe }} !Sample_data_processing={{ row.dataProtocol|safe }} !Sample_molecule_ch1={{ row.extractMolecule }} -!Sample_characteristics_ch1=labExpId: {{ row.library_id }} -!Sample_characteristics_ch1=replicate: {{ row.replicate }} -{% if row.cell %}{% spaceless %} -!Sample_characteristics_ch1=cell: {{ row.cell }} -{% endspaceless %}{% endif %} -{% if row.readType %}{% spaceless %} -!Sample_characteristics_ch1=readType: {{ row.readType }} -{% endspaceless %}{% endif %}{% if row.antibody %}{% spaceless %} -!Sample_characteristics_ch1=cell: {{ row.antibody }} -{% endspaceless %}{% endif %}{% for run in row.run %} -!Sample_characteristics_ch1=Illumina image processing pipeline version: {{ run.image_software }}-{{ run.image_version }} -!Sample_characteristics_ch1=Illumina base-calling pipeline version: {{ run.image_software }}-{{ run.image_version }}{% endfor %}{% for raw in row.raw %} +!Sample_characteristics_ch1=labExpId: {{ row.library_id }}{% if row.replicate %} +!Sample_characteristics_ch1=replicate: {{ row.replicate }}{% endif %}{% if row.cell %} +!Sample_characteristics_ch1=cell: {{ row.cell }}{% endif %}{% if row.readType %} +!Sample_characteristics_ch1=readType: {{ row.readType }}{% endif %}{% if row.antibody %} +!Sample_characteristics_ch1=cell: {{ row.antibody }}{% endif %}{% for run in row.run %}{% if run.image_software %} +!Sample_characteristics_ch1=Illumina image processing pipeline version: {{ run.image_software }}-{{ run.image_version }}{% endif %}{% if run.basecall_software %} +!Sample_characteristics_ch1=Illumina base-calling pipeline version: {{ run.basecall_software }}-{{ run.basecall_version }}{% endif %}{% endfor %}{% for raw in row.raw %} !Sample_raw_file_{{forloop.counter}}={{ raw.filename }} !Sample_raw_file_type_{{forloop.counter}}={{raw.file_type_label}} !Sample_raw_file_insert_size_{{forloop.counter}}={{ row.insertLength }} -- 2.30.2