sparql_query, \
submissionOntology
from htsworkflow.submission.daf import \
- DAFMapper, \
+ UCSCSubmission, \
MetadataLookupException, \
get_submission_uri
+from htsworkflow.submission.results import ResultMap
from htsworkflow.submission.condorfastq import CondorFastqExtract
logger = logging.getLogger('ucsc_gather')
apidata = api.make_auth_from_opts(opts, parser)
model = get_model(opts.model, opts.db_path)
+ mapper = None
if opts.name:
- mapper = DAFMapper(opts.name, opts.daf, model)
+ mapper = UCSCSubmssion(opts.name, opts.daf, model)
if opts.library_url is not None:
mapper.library_url = opts.library_url
submission_uri = get_submission_uri(opts.name)
if opts.make_ddf and opts.daf is None:
parser.error("Please specify your daf when making ddf files")
- library_result_map = []
+ results = ResultMap()
for a in args:
- library_result_map.extend(read_library_result_map(a))
+ results.add_results_from_file(a)
if opts.make_tree_from is not None:
- make_tree_from(opts.make_tree_from, library_result_map)
+ results.make_tree_from(opts.make_tree_from)
if opts.link_daf:
- if opts.daf is None:
- parser.error("Please specify daf filename with --daf")
- link_daf(opts.daf, library_result_map)
+ if mapper is None:
+ parser.error("Specify a submission model")
+ if mapper.daf is None:
+ parser.error("Please load a daf first")
+ mapper.link_daf(results)
if opts.fastq:
extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
force=opts.force)
- extractor.create_scripts(library_result_map)
+ extractor.create_scripts(results)
if opts.scan_submission:
- scan_submission_dirs(mapper, library_result_map)
+ mapper.scan_submission_dirs(results)
if opts.make_ddf:
- make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
+ make_all_ddfs(mapper, results, opts.daf, force=opts.force)
if opts.zip_ddf:
- zip_ddfs(mapper, library_result_map, opts.daf)
+ zip_ddfs(mapper, results, opts.daf)
if opts.sparql:
sparql_query(model, opts.sparql)
return parser
-def make_tree_from(source_path, library_result_map):
- """Create a tree using data files from source path.
- """
- for lib_id, lib_path in library_result_map:
- if not os.path.exists(lib_path):
- logger.info("Making dir {0}".format(lib_path))
- os.mkdir(lib_path)
- source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
- if os.path.exists(source_lib_dir):
- pass
- for filename in os.listdir(source_lib_dir):
- source_pathname = os.path.join(source_lib_dir, filename)
- target_pathname = os.path.join(lib_path, filename)
- if not os.path.exists(source_pathname):
- raise IOError("{0} does not exist".format(source_pathname))
- if not os.path.exists(target_pathname):
- os.symlink(source_pathname, target_pathname)
- logger.info(
- 'LINK {0} to {1}'.format(source_pathname, target_pathname))
-
-
-def link_daf(daf_path, library_result_map):
- if not os.path.exists(daf_path):
- raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
-
- base_daf = os.path.basename(daf_path)
-
- for lib_id, result_dir in library_result_map:
- if not os.path.exists(result_dir):
- raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
- submission_daf = os.path.join(result_dir, base_daf)
- if not os.path.exists(submission_daf):
- if not os.path.exists(daf_path):
- raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
- os.link(daf_path, submission_daf)
-
-
-def scan_submission_dirs(view_map, library_result_map):
- """Look through our submission directories and collect needed information
- """
- for lib_id, result_dir in library_result_map:
- logger.info("Importing %s from %s" % (lib_id, result_dir))
- try:
- view_map.import_submission_dir(result_dir, lib_id)
- except MetadataLookupException, e:
- logger.error("Skipping %s: %s" % (lib_id, str(e)))
-
def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
dag_fragment = []
os.chdir(rootdir)
-def read_library_result_map(filename):
- """
- Read a file that maps library id to result directory.
- Does not support spaces in filenames.
-
- For example:
- 10000 result/foo/bar
- """
- stream = open(filename,'r')
-
- results = []
- for line in stream:
- line = line.rstrip()
- if not line.startswith('#') and len(line) > 0 :
- library_id, result_dir = line.split()
- results.append((library_id, result_dir))
- return results
-
-
def make_condor_archive_script(name, files, outdir=None):
script = """Universe = vanilla
return fragments
-def get_library_info(host, apidata, library_id):
- url = api.library_url(host, library_id)
- contents = api.retrieve_info(url, apidata)
- return contents
-
-
def make_base_name(pathname):
base = os.path.basename(pathname)
name, ext = os.path.splitext(base)
self.log_path = log_path
self.force = force
- def create_scripts(self, library_result_map ):
+ def create_scripts(self, result_map ):
"""
Generate condor scripts to build any needed fastq files
Args:
- library_result_map (list): [(library_id, destination directory), ...]
+ result_map: htsworkflow.submission.results.ResultMap()
"""
template_map = {'srf': 'srf.condor',
'qseq': 'qseq.condor',
'split_fastq': 'split_fastq.condor'}
- condor_entries = self.build_condor_arguments(library_result_map)
+ condor_entries = self.build_condor_arguments(result_map)
for script_type in template_map.keys():
template = loader.get_template(template_map[script_type])
variables = {'python': sys.executable,
with open(script_type + '.condor','w+') as outstream:
outstream.write(template.render(context))
- def build_condor_arguments(self, library_result_map):
+ def build_condor_arguments(self, result_map):
condor_entries = {'srf': [],
'qseq': [],
'split_fastq': []}
'qseq': self.condor_qseq_to_fastq,
'split_fastq': self.condor_desplit_fastq
}
- lib_db = self.find_archive_sequence_files(library_result_map)
- needed_targets = self.find_missing_targets(library_result_map, lib_db)
+ lib_db = self.find_archive_sequence_files(result_map)
+ needed_targets = self.find_missing_targets(result_map, lib_db)
for target_pathname, available_sources in needed_targets.items():
LOGGER.debug(' target : %s' % (target_pathname,))
return condor_entries
- def find_archive_sequence_files(self, library_result_map):
+ def find_archive_sequence_files(self, result_map):
"""
Find archived sequence files associated with our results.
"""
lib_db = {}
seq_dirs = set()
candidate_lanes = {}
- for lib_id, result_dir in library_result_map:
+ for lib_id in result_map.keys():
lib_info = self.api.get_library(lib_id)
lib_info['lanes'] = {}
lib_db[lib_id] = lib_info
return lib_db
- def find_missing_targets(self, library_result_map, lib_db):
+ def find_missing_targets(self, result_map, lib_db):
"""
Check if the sequence file exists.
This requires computing what the sequence name is and checking
fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
# find what targets we're missing
needed_targets = {}
- for lib_id, result_dir in library_result_map:
+ for lib_id in result_map.keys():
+ result_dir = result_map[lib_id]
lib = lib_db[lib_id]
lane_dict = make_lane_dict(lib_db, lib_id)
raise ValueError("srf to fastq can only handle one file")
return {
- 'sources': [sources[0].path],
+ 'sources': [os.path.abspath(sources[0].path)],
'pyscript': srf2fastq.__file__,
'flowcell': sources[0].flowcell,
'ispaired': sources[0].paired,
return viewNS
-class DAFMapper(object):
- """Convert filenames to views in the UCSC Daf
+class UCSCSubmission(object):
+ """Build a submission by examining the DAF for what we need to submit
"""
def __init__(self, name, daf_file=None, model=None):
"""Construct a RDF backed model of a UCSC DAF
if hasattr(daf_file, 'next'):
# its some kind of stream
- fromstream_into_model(self.model, self.submissionSet, daf_file)
+ self.daf = daf_file.read()
else:
# file
- parse_into_model(self.model, self.submissionSet, daf_file)
+ stream = open(daf_file, 'r')
+ self.daf = stream.read()
+ stream.close()
+
+ fromstring_into_model(self.model, self.submissionSet, self.daf)
self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
self.__view_map = None
+ def _get_daf_name(self):
+ return self.name + '.daf'
+ daf_name = property(_get_daf_name,doc="construct name for DAF file")
+
def add_pattern(self, view_name, filename_pattern):
"""Map a filename regular expression to a view name
"""
dafTermOntology['filename_re'],
obj))
+ def scan_submission_dirs(self, result_map):
+ """Examine files in our result directory
+ """
+ for lib_id, result_dir in result_map.items():
+ logger.info("Importing %s from %s" % (lib_id, result_dir))
+ try:
+ self.import_submission_dir(result_dir, lib_id)
+ except MetadataLookupException, e:
+ logger.error("Skipping %s: %s" % (lib_id, str(e)))
+
def import_submission_dir(self, submission_dir, library_id):
"""Import a submission directories and update our model as needed
"""
return False
+
+ def link_daf(self, result_map):
+ if self.daf is None or len(self.daf) == 0:
+ raise RuntimeError(
+ "DAF data does not exist, how can I link to it?")
+
+ base_daf = self.daf_name
+
+ for result_dir in result_map.values():
+ if not os.path.exists(result_dir):
+ raise RuntimeError(
+ "Couldn't find target directory %s" %(result_dir,))
+ submission_daf = os.path.join(result_dir, base_daf)
+ if os.path.exists(submission_daf):
+ previous_daf = open(submission_daf, 'r').read()
+ if self.daf != previous_daf:
+ LOGGER.info("Old daf is different, overwriting it.")
+ stream = open(submission_daf, 'w')
+ stream.write(self.daf)
+ stream.close()
+
+
if __name__ == "__main__":
example_daf = """# Lab and general info
grant Hardison
name = "test_rep"
mapper = DAFMapper(name, daf_file = example_daf_stream, model=model)
dump_model(model)
+
+
--- /dev/null
+"""Help collect and process results for submission
+"""
+import os
+import logging
+
+from collections import namedtuple
+
+LOGGER = logging.getLogger(__name__)
+
+class ResultMap(object):
+ """Store list of results
+ """
+ def __init__(self):
+ self.results_order = []
+ self.results = {}
+
+ def keys(self):
+ return self.results_order
+
+ def values(self):
+ return ( self.results[r] for r in self.results_order )
+
+ def items(self):
+ return ( (r, self.results[r]) for r in self.results_order )
+
+ def __getitem__(self, key):
+ return self.results[key]
+
+ def add_results_from_file(self, filename):
+ pathname = os.path.abspath(filename)
+ basepath, name = os.path.split(pathname)
+ results = read_result_list(filename)
+ for lib_id, lib_path in results:
+ if not os.path.isabs(lib_path):
+ lib_path = os.path.join(basepath, lib_path)
+ self.add_result(lib_id, lib_path)
+
+ def add_result(self, lib_id, lib_path):
+ self.results_order.append(lib_id)
+ self.results[lib_id] = lib_path
+
+ def make_tree_from(self, source_path, destpath = None):
+ """Create a tree using data files from source path.
+ """
+ print source_path, destpath
+ if destpath is None:
+ destpath = os.getcwd()
+
+ for lib_id in self.results_order:
+ lib_path = self.results[lib_id]
+ lib_destination = os.path.join(destpath, lib_path)
+ if not os.path.exists(lib_destination):
+ LOGGER.info("Making dir {0}".format(lib_destination))
+ os.mkdir(lib_destination)
+
+ source_rel_dir = os.path.join(source_path, lib_path)
+ source_lib_dir = os.path.abspath(source_rel_dir)
+
+ print "source_lib_dir", source_lib_dir
+ for filename in os.listdir(source_lib_dir):
+ source_pathname = os.path.join(source_lib_dir, filename)
+ target_pathname = os.path.join(lib_destination, filename)
+ if not os.path.exists(source_pathname):
+ raise IOError(
+ "{0} does not exist".format(source_pathname))
+ print target_pathname
+ if not os.path.exists(target_pathname):
+ os.symlink(source_pathname, target_pathname)
+ LOGGER.info(
+ 'LINK {0} to {1}'.format(source_pathname,
+ target_pathname))
+
+def read_result_list(filename):
+ """
+ Read a file that maps library id to result directory.
+ Does not support spaces in filenames.
+
+ For example:
+ 10000 result/foo/bar
+ """
+ stream = open(filename, 'r')
+ results = parse_result_list(stream)
+ stream.close()
+ return results
+
+
+def parse_result_list(stream):
+ results = []
+ for line in stream:
+ line = line.rstrip()
+ if not line.startswith('#') and len(line) > 0:
+ library_id, result_dir = line.split()
+ results.append((library_id, result_dir))
+ return results
import unittest
from htsworkflow.submission import condorfastq
+from htsworkflow.submission.results import ResultMap
FCDIRS = [
'C02F9ACXX',
self.subdir = os.path.join(self.tempdir, self.subname)
os.mkdir(self.subdir)
+ self.result_map = ResultMap()
+ self.result_map.add_result('11154', self.subname)
+
def tearDown(self):
shutil.rmtree(self.tempdir)
os.chdir(self.cwd)
self.tempdir,
self.logdir)
extract.api = FakeApi()
- result_map = [('11154', self.subname)]
- lib_db = extract.find_archive_sequence_files(result_map)
+
+ lib_db = extract.find_archive_sequence_files(self.result_map)
self.failUnlessEqual(len(lib_db['11154']['lanes']), 5)
lanes = [
self.tempdir,
self.logdir)
extract.api = FakeApi()
- result_map = [('11154', self.subname)]
- lib_db = extract.find_archive_sequence_files(result_map)
+ lib_db = extract.find_archive_sequence_files(self.result_map)
- needed_targets = extract.find_missing_targets(result_map,
+ needed_targets = extract.find_missing_targets(self.result_map,
lib_db)
self.failUnlessEqual(len(needed_targets), 7)
srf_30221 = needed_targets[
self.tempdir,
self.logdir)
extract.api = FakeApi()
- result_map = [('11154', self.subdir)]
- commands = extract.build_condor_arguments(result_map)
+ commands = extract.build_condor_arguments(self.result_map)
srf = commands['srf']
qseq = commands['qseq']
self.failUnlessEqual(len(split), 2)
srf_data = {
- os.path.join(self.subdir, '11154_30221AAXX_c33_l4.fastq'): {
+ os.path.join(self.subname, '11154_30221AAXX_c33_l4.fastq'): {
'mid': None,
'ispaired': False,
'sources': [u'woldlab_090425_HWI-EAS229_0110_30221AAXX_4.srf'],
'flowcell': u'30221AAXX',
- 'target': os.path.join(self.subdir,
+ 'target': os.path.join(self.subname,
u'11154_30221AAXX_c33_l4.fastq'),
},
- os.path.join(self.subdir, '11154_30DY0AAXX_c151_l8_r1.fastq'): {
+ os.path.join(self.subname, '11154_30DY0AAXX_c151_l8_r1.fastq'): {
'mid': None,
'ispaired': True,
'flowcell': u'30DY0AAXX',
'sources': [u'woldlab_090725_HWI-EAS229_0110_30DY0AAXX_8.srf'],
'mid': 76,
'target':
- os.path.join(self.subdir,
+ os.path.join(self.subname,
u'11154_30DY0AAXX_c151_l8_r1.fastq'),
'target_right':
- os.path.join(self.subdir,
+ os.path.join(self.subname,
u'11154_30DY0AAXX_c151_l8_r2.fastq'),
}
}
self.failUnlessEqual(args['mid'], expected['mid'])
qseq_data = {
- os.path.join(self.subdir, '11154_42JUYAAXX_c76_l5_r1.fastq'): {
+ os.path.join(self.subname, '11154_42JUYAAXX_c76_l5_r1.fastq'): {
'istar': True,
'ispaired': True,
'sources': [
u'woldlab_100826_HSI-123_0001_42JUYAAXX_l5_r1.tar.bz2']
},
- os.path.join(self.subdir, '11154_42JUYAAXX_c76_l5_r2.fastq'): {
+ os.path.join(self.subname, '11154_42JUYAAXX_c76_l5_r2.fastq'): {
'istar': True,
'ispaired': True,
'sources': [
u'woldlab_100826_HSI-123_0001_42JUYAAXX_l5_r2.tar.bz2']
},
- os.path.join(self.subdir, '11154_61MJTAAXX_c76_l6.fastq'): {
+ os.path.join(self.subname, '11154_61MJTAAXX_c76_l6.fastq'): {
'istar': True,
'ispaired': False,
'sources': [
self.tempdir,
self.logdir)
extract.api = FakeApi()
- result_map = [('11154', self.subname)]
- extract.create_scripts(result_map)
+ extract.create_scripts(self.result_map)
self.failUnless(os.path.exists('srf.condor'))
with open('srf.condor', 'r') as srf:
import tempfile
import unittest
-from htsworkflow.submission import daf
+from htsworkflow.submission import daf, results
from htsworkflow.util.rdfhelp import \
dafTermOntology, \
fromTypedNode, \
get_model, \
get_serializer
+from htsworkflow.submission.test import test_results
import RDF
test_daf = """# Lab and general info
ns)
test_daf_stream = StringIO(test_daf)
- mapper = daf.DAFMapper(name, daf_file = test_daf_stream, model=model)
+ mapper = daf.UCSCSubmission(name, daf_file = test_daf_stream, model=model)
return mapper
def dump_model(model):
print turtle
-class TestDAFMapper(unittest.TestCase):
+class TestUCSCSubmission(unittest.TestCase):
+ def setUp(self):
+ test_results.generate_sample_results_tree(self)
+
+ def tearDown(self):
+ # see things created by temp_results.generate_sample_results_tree
+ shutil.rmtree(self.tempdir)
+
def test_create_mapper_add_pattern(self):
name = 'testsub'
mapper = load_daf_mapper(name)
self.failUnless('controlId' in variables)
+ def test_link_daf(self):
+ name = 'testsub'
+ submission = load_daf_mapper(name, test_daf=test_daf)
+ result_map = results.ResultMap()
+ result_dir = os.path.join(self.sourcedir,
+ test_results.S1_NAME)
+ result_map.add_result('1000', result_dir)
+
+ submission.link_daf(result_map)
+
+ # make sure daf gets linked
+ created_daf = os.path.join(result_dir, name+'.daf')
+ self.failUnless(os.path.exists(created_daf))
+ stream = open(created_daf,'r')
+ daf_body = stream.read()
+ stream.close()
+
+ self.failUnlessEqual(test_daf, daf_body)
+
+
@contextmanager
def mktempdir(prefix='tmp'):
d = tempfile.mkdtemp(prefix=prefix)
def suite():
suite = unittest.makeSuite(TestDAF, 'test')
- suite.addTest(unittest.makeSuite(TestDAFMapper, 'test'))
+ suite.addTest(unittest.makeSuite(TestUCSCSubmission, 'test'))
return suite
if __name__ == "__main__":
--- /dev/null
+#!/usr/bin/env python
+
+import copy
+import os
+from pprint import pprint
+import shutil
+import tempfile
+import unittest
+
+from htsworkflow.submission.results import ResultMap
+
+S1_NAME = '1000-sample'
+S2_NAME = '2000-sample'
+
+S1_FILES = [
+ os.path.join(S1_NAME, 'file1_l8_r1.fastq'),
+ os.path.join(S1_NAME, 'file1_l8_r2.fastq'),
+]
+
+S2_FILES = [
+ os.path.join(S2_NAME, 'file1.bam'),
+ os.path.join(S2_NAME, 'file1_l5.fastq'),
+]
+
+def generate_sample_results_tree(obj):
+ obj.tempdir = tempfile.mkdtemp(prefix="results_test")
+ obj.sourcedir = os.path.join(obj.tempdir, 'source')
+ obj.resultdir = os.path.join(obj.tempdir, 'results')
+
+ for d in [obj.sourcedir,
+ os.path.join(obj.sourcedir, S1_NAME),
+ os.path.join(obj.sourcedir, S2_NAME),
+ obj.resultdir]:
+ os.mkdir(os.path.join(obj.tempdir, d))
+
+ tomake = []
+ tomake.extend(S1_FILES)
+ tomake.extend(S2_FILES)
+ for f in tomake:
+ stream = open(os.path.join(obj.sourcedir, f), 'w')
+ stream.write(f)
+ stream.close()
+
+class TestResultMap(unittest.TestCase):
+ def setUp(self):
+ generate_sample_results_tree(self)
+
+ def tearDown(self):
+ shutil.rmtree(self.tempdir)
+
+
+ def test_dict_like(self):
+ """Make sure the result map works like an ordered dictionary
+ """
+ results = ResultMap()
+ results.add_result('1000', 'dir1000')
+ results.add_result('2000', 'dir2000')
+ results.add_result('1500', 'dir1500')
+
+ self.failUnlessEqual(results.keys(), ['1000', '2000', '1500'])
+ self.failUnlessEqual(list(results.values()),
+ ['dir1000', 'dir2000', 'dir1500'])
+ self.failUnlessEqual(list(results.items()),
+ [('1000', 'dir1000'),
+ ('2000', 'dir2000'),
+ ('1500', 'dir1500')])
+
+ self.failUnlessEqual(results['1000'], 'dir1000')
+ self.failUnlessEqual(results['1500'], 'dir1500')
+ self.failUnlessEqual(results['2000'], 'dir2000')
+
+ def test_make_from(self):
+ results = ResultMap()
+ results.add_result('1000', S1_NAME)
+ results.add_result('2000', S2_NAME)
+
+ results.make_tree_from(self.sourcedir, self.resultdir)
+
+ sample1_dir = os.path.join(self.resultdir, S1_NAME)
+ sample2_dir = os.path.join(self.resultdir, S2_NAME)
+ self.failUnless(os.path.isdir(sample1_dir))
+ self.failUnless(os.path.isdir(sample2_dir))
+
+ for f in S1_FILES + S2_FILES:
+ self.failUnless(
+ os.path.islink(
+ os.path.join(self.resultdir, f)))
+
+
+
+
+def suite():
+ suite = unittest.makeSuite(TestResultMap, 'test')
+ return suite
+
+if __name__ == "__main__":
+ unittest.main(defaultTest='suite')
+