X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=blobdiff_plain;f=htsworkflow%2Fsubmission%2Fcondorfastq.py;h=9aab790ef6fb6262e998b7ef1b4c2689144d5e88;hp=e48afd5869b48a9bbc69059aa5ebcad90608312f;hb=8096174ac7a288f0f2198c90286d5ddab8c59731;hpb=5d3908c830a1cded1b3bfde11c8293c05e997ac1 diff --git a/htsworkflow/submission/condorfastq.py b/htsworkflow/submission/condorfastq.py index e48afd5..9aab790 100644 --- a/htsworkflow/submission/condorfastq.py +++ b/htsworkflow/submission/condorfastq.py @@ -15,9 +15,8 @@ from htsworkflow.pipelines import srf2fastq from htsworkflow.pipelines import desplit_fastq from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \ fromTypedNode, \ - libraryOntology, \ - stripNamespace, \ - rdfNS + stripNamespace +from htsworkflow.util.rdfns import * from htsworkflow.util.conversion import parse_flowcell_id from django.conf import settings @@ -61,7 +60,6 @@ class CondorFastqExtract(object): template_map = {'srf': 'srf.condor', 'qseq': 'qseq.condor', 'split_fastq': 'split_fastq.condor', - 'by_sample': 'lane_to_fastq.turtle', } env = None @@ -91,9 +89,8 @@ class CondorFastqExtract(object): 'qseq': self.condor_qseq_to_fastq, 'split_fastq': self.condor_desplit_fastq } - by_sample = {} sequences = self.find_archive_sequence_files(result_map) - needed_targets = self.find_missing_targets(result_map, sequences) + needed_targets = self.update_fastq_targets(result_map, sequences) for target_pathname, available_sources in needed_targets.items(): LOGGER.debug(' target : %s' % (target_pathname,)) @@ -110,13 +107,9 @@ class CondorFastqExtract(object): if sources is not None: condor_entries.setdefault(condor_type, []).append( conversion(sources, target_pathname)) - for s in sources: - by_sample.setdefault(s.lane_number,[]).append( - target_pathname) else: print " need file", target_pathname - condor_entries['by_sample'] = by_sample return condor_entries def find_archive_sequence_files(self, result_map): @@ -127,8 +120,7 @@ class CondorFastqExtract(object): flowcell_ids = self.find_relavant_flowcell_ids() self.import_sequences(flowcell_ids) - - query = RDF.SPARQLQuery(""" + query_text = """ prefix libns: prefix rdf: prefix xsd: @@ -145,19 +137,26 @@ class CondorFastqExtract(object): libns:flowcell_id ?flowcell_id ; libns:library ?library ; libns:library_id ?library_id ; - rdf:type ?filetype ; - a libns:raw_file . + libns:file_type ?filetype ; + a libns:illumina_result . ?flowcell libns:read_length ?read_length ; libns:flowcell_type ?flowcell_type . OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status } - FILTER(?filetype != libns:raw_file) + FILTER(?filetype != libns:sequencer_result) } - """) + """ + LOGGER.debug("find_archive_sequence_files query: %s", + query_text) + query = RDF.SPARQLQuery(query_text) results = [] for r in query.execute(self.model): library_id = fromTypedNode(r['library_id']) if library_id in result_map: - results.append(SequenceResult(r)) + seq = SequenceResult(r) + LOGGER.debug("Creating sequence result for library %s: %s", + library_id, + repr(seq)) + results.append(seq) return results def import_libraries(self, result_map): @@ -171,8 +170,11 @@ class CondorFastqExtract(object): """Import library data into our model if we don't have it already """ q = RDF.Statement(library, rdfNS['type'], libraryOntology['library']) + present = False if not self.model.contains_statement(q): + present = True load_into_model(self.model, 'rdfa', library) + LOGGER.debug("Did we import %s: %s", library, present) def find_relavant_flowcell_ids(self): """Generate set of flowcell ids that had samples of interest on them @@ -200,7 +202,6 @@ WHERE { return flowcell_ids def import_sequences(self, flowcell_ids): - seq_dirs = [] for f in flowcell_ids: seq_dirs.append(os.path.join(self.sequences_path, str(f))) @@ -209,13 +210,11 @@ WHERE { seq.save_to_model(self.model, self.host) update_model_sequence_library(self.model, self.host) - def find_missing_targets(self, result_map, raw_files): - """ - Check if the sequence file exists. - This requires computing what the sequence name is and checking - to see if it can be found in the sequence location. + def update_fastq_targets(self, result_map, raw_files): + """Return list of fastq files that need to be built. - Adds seq.paired flag to sequences listed in lib_db[*]['lanes'] + Also update model with link between illumina result files + and our target fastq file. """ fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq' fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq' @@ -244,9 +243,18 @@ WHERE { if self.force or not os.path.exists(target_pathname): t = needed_targets.setdefault(target_pathname, {}) t.setdefault(seq.filetype, []).append(seq) - + self.add_target_source_links(target_pathname, seq) return needed_targets + def add_target_source_links(self, target, seq): + """Add link between target pathname and the 'lane' that produced it + (note lane objects are now post demultiplexing.) + """ + target_uri = 'file://' + target + target_node = RDF.Node(RDF.Uri(target_uri)) + source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode) + self.model.add_statement(source_stmt) + def condor_srf_to_fastq(self, sources, target_pathname): if len(sources) > 1: raise ValueError("srf to fastq can only handle one file") @@ -350,3 +358,9 @@ class SequenceResult(object): errmsg = u"Unsupported scheme {0} for {1}" raise ValueError(errmsg.format(url.scheme, unicode(url))) path = property(_get_path) + + def __repr__(self): + return "SequenceResult({0},{1},{2})".format( + str(self.filenode), + str(self.library_id), + str(self.flowcell_id))