import shlex
from StringIO import StringIO
import stat
-from subprocess import Popen, PIPE
import sys
import time
import types
import urllib2
import urlparse
+import RDF
+
from htsworkflow.util import api
from htsworkflow.util.rdfhelp import \
dafTermOntology, \
get_model, \
get_serializer, \
load_into_model, \
+ sparql_query, \
submissionOntology
from htsworkflow.submission.daf import \
DAFMapper, \
if opts.make_ddf and opts.daf is None:
parser.error("Please specify your daf when making ddf files")
- if len(args) == 0:
- parser.error("I need at least one library submission-dir input file")
-
library_result_map = []
for a in args:
library_result_map.extend(read_library_result_map(a))
if opts.make_ddf:
make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
+ if opts.sparql:
+ sparql_query(model, opts.sparql)
+
if opts.print_rdf:
writer = get_serializer()
print writer.serialize_model_to_string(model)
help="Load model database")
model.add_option('--load-rdf', default=None,
help="load rdf statements into model")
+ model.add_option('--sparql', default=None, help="execute sparql query")
model.add_option('--print-rdf', action="store_true", default=False,
help="print ending model state")
parser.add_option_group(model)
"""
Make ddf files, and bonus condor file
"""
+ query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
+PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
+
+select ?submitView ?filename ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol
+WHERE {
+ ?file ucscDaf:filename ?filename ;
+ ucscDaf:md5sum ?md5sum .
+ ?submitView ucscDaf:has_file ?file ;
+ ucscDaf:view ?dafView ;
+ ucscDaf:submission %(submission)s .
+ ?dafView ucscDaf:name ?view .
+ %(submission)s submissionOntology:library ?library .
+
+ OPTIONAL { ?submitView ucscDaf:antibody ?antibody }
+ OPTIONAL { ?submitView ucscDaf:cell ?cell }
+ OPTIONAL { ?submitView ucscDaf:control ?control }
+ OPTIONAL { ?library ucscDaf:controlId ?controlId }
+ OPTIONAL { ?submitView ucscDaf:sex ?sex }
+ OPTIONAL { ?submitView ucscDaf:labVersion ?labExpId }
+ OPTIONAL { ?submitView ucscDaf:labVersion ?labVersion }
+ OPTIONAL { ?library ucscDaf:treatment ?treatment }
+ OPTIONAL { ?submitView ucscDaf:protocol ?protocol }
+}
+ORDER BY ?submitView"""
dag_fragments = []
name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
output = open(outfile,'w')
else:
output = sys.stdout
-
- # filename goes first
+
+ formatted_query = query_template % {'submission': str(submissionNode)}
+
+ query = RDF.SPARQLQuery(formatted_query)
+ results = query.execute(view_map.model)
+
variables = ['filename']
+ # filename goes first
variables.extend(view_map.get_daf_variables())
+ variables += ['controlId', 'labExpId', 'md5sum']
output.write('\t'.join(variables))
output.write(os.linesep)
- nameTerm = dafTermOntology['name']
-
- submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
- file_list = []
- for viewNode in submission_views:
- record = []
+ all_views = {}
+ all_files = []
+ for row in results:
+ viewname = fromTypedNode(row['view'])
+ current = all_views.setdefault(viewname, {})
for variable_name in variables:
- varNode = dafTermOntology[variable_name]
- values = list(view_map.model.get_targets(viewNode, varNode))
-
- if variable_name == 'view':
- nameNode = view_map.model.get_target(values[0], nameTerm)
- values = [fromTypedNode(nameNode)]
+ value = str(fromTypedNode(row[variable_name]))
+ if variable_name in ('filename', 'md5sum'):
+ current.setdefault(variable_name,[]).append(value)
else:
- values = [ fromTypedNode(v) for v in values ]
- if variable_name == 'filename':
- file_list.extend(values)
-
- if len(values) == 0:
- attribute = "#None#"
- elif len(values) == 1:
- attribute = values[0]
+ current[variable_name] = value
+
+ for view in all_views.keys():
+ line = []
+ for variable_name in variables:
+ if variable_name in ('filename', 'md5sum'):
+ line.append(','.join(all_views[view][variable_name]))
else:
- attribute = ",".join(values)
- record.append(attribute)
- output.write('\t'.join(record))
+ line.append(all_views[view][variable_name])
+ output.write("\t".join(line))
output.write(os.linesep)
-
+ all_files.extend(all_views[view]['filename'])
+
logging.info(
"Examined {0}, found files: {1}".format(
- str(submissionNode), ", ".join(file_list)))
+ str(submissionNode), ", ".join(all_files)))
+
+ all_files.append(daf_name)
+ all_files.append(ddf_name)
- file_list.append(daf_name)
- file_list.append(ddf_name)
-
if make_condor:
- print name, file_list
- archive_condor = make_condor_archive_script(name, file_list, outdir)
+ archive_condor = make_condor_archive_script(name, all_files, outdir)
upload_condor = make_condor_upload_script(name, outdir)
dag_fragments.extend(
Executable = /bin/tar
arguments = czvhf ../%(archivename)s %(filelist)s
-Error = compress.err.$(Process).log
+Error = compress.out.$(Process).log
Output = compress.out.$(Process).log
Log = /tmp/submission-compress-%(user)s.log
initialdir = %(initialdir)s
context = {'archivename': make_submission_name(name),
'filelist': " ".join(files),
- 'initialdir': os.getcwd(),
+ 'initialdir': os.path.abspath(outdir),
'user': os.getlogin()}
condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
Executable = /usr/bin/lftp
arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
-Error = upload.err.$(Process).log
+Error = upload.out.$(Process).log
Output = upload.out.$(Process).log
Log = /tmp/submission-upload-%(user)s.log
initialdir = %(initialdir)s
ftpuser = auth.hosts[encodeftp][0]
ftppassword = auth.hosts[encodeftp][2]
context = {'archivename': make_submission_name(name),
- 'initialdir': outdir,
+ 'initialdir': os.path.abspath(outdir),
'user': os.getlogin(),
'ftpuser': ftpuser,
'ftppassword': ftppassword,
if not os.path.exists(f):
raise RuntimeError("%s does not exist" % (f,))
-def make_md5sum(filename):
- """Quickly find the md5sum of a file
- """
- md5_cache = os.path.join(filename+".md5")
- print md5_cache
- if os.path.exists(md5_cache):
- logging.debug("Found md5sum in {0}".format(md5_cache))
- stream = open(md5_cache,'r')
- lines = stream.readlines()
- md5sum = parse_md5sum_line(lines, filename)
- else:
- md5sum = make_md5sum_unix(filename, md5_cache)
- return md5sum
-
-def make_md5sum_unix(filename, md5_cache):
- cmd = ["md5sum", filename]
- logging.debug("Running {0}".format(" ".join(cmd)))
- p = Popen(cmd, stdout=PIPE)
- stdin, stdout = p.communicate()
- retcode = p.wait()
- logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
- if retcode != 0:
- logging.error("Trouble with md5sum for {0}".format(filename))
- return None
- lines = stdin.split(os.linesep)
- md5sum = parse_md5sum_line(lines, filename)
- if md5sum is not None:
- logging.debug("Caching sum in {0}".format(md5_cache))
- stream = open(md5_cache, "w")
- stream.write(stdin)
- stream.close()
- return md5sum
-
-def parse_md5sum_line(lines, filename):
- md5sum, md5sum_filename = lines[0].split()
- if md5sum_filename != filename:
- errmsg = "MD5sum and I disagre about filename. {0} != {1}"
- logging.error(errmsg.format(filename, md5sum_filename))
- return None
- return md5sum
-
if __name__ == "__main__":
main()
submissionOntology, \
toTypedNode, \
fromTypedNode
+from htsworkflow.util.hashfile import make_md5sum
logger = logging.getLogger(__name__)
for f in submission_files:
self.construct_file_attributes(submission_dir, libNode, f)
- #attributes['md5sum'] = "None"
- #
- #ext = attributes["filename_re"]
- #if attributes.get("type", None) == 'fastq':
- # fastqs.setdefault(ext, set()).add(f)
- # fastq_attributes[ext] = attributes
- #else:
- # md5sum = make_md5sum(os.path.join(result_dir,f))
- # if md5sum is not None:
- # attributes['md5sum']=md5sum
- #print attributes
-
def construct_file_attributes(self, submission_dir, libNode, pathname):
"""Looking for the best extension
self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView))
self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name)))
self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission']))
-
-
- self.model.add_statement(
- RDF.Statement(submissionView, dafTermOntology['filename'], toTypedNode(filename)))
+ self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['library'], libNode))
+
+ # add trac specific information
self.model.add_statement(
RDF.Statement(submissionView, dafTermOntology['view'], view))
self.model.add_statement(
RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
self.model.add_statement(
RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode))
-
+
# extra information
terms = [dafTermOntology['type'],
dafTermOntology['filename_re'],
if value is not None:
self.model.add_statement(RDF.Statement(submissionView, term, value))
+ # add file specific information
+ fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
+ submission_pathname = os.path.join(submission_dir, filename)
+ md5 = make_md5sum(submission_pathname)
+ self.model.add_statement(
+ RDF.Statement(submissionView, dafTermOntology['has_file'], fileNode))
+ self.model.add_statement(
+ RDF.Statement(fileNode, dafTermOntology['filename'], filename))
+
+ if md5 is None:
+ logging.warning("Unable to produce md5sum for %s" % ( submission_pathname))
+ else:
+ self.model.add_statement(
+ RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
+
def _add_library_details_to_model(self, libNode):
parser = RDF.Parser(name='rdfa')
+from contextlib import contextmanager
+import os
from StringIO import StringIO
+import shutil
+import tempfile
import unittest
from htsworkflow.submission import daf
species = daf_mapper._get_library_attribute(libNode, 'species')
self.failUnlessEqual(species, "Homo sapiens")
-
- daf_mapper.construct_file_attributes('/tmp/analysis1', libNode, 'filename.bam')
+
+ with mktempdir('analysis') as analysis_dir:
+ path, analysis_name = os.path.split(analysis_dir)
+ with mktempfile('.bam', dir=analysis_dir) as filename:
+ print 'dir', os.listdir(analysis_dir)
+ daf_mapper.construct_file_attributes(analysis_dir,
+ libNode,
+ filename)
+
+ sub_root = "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/"
+ submission_name = sub_root + analysis_name
source = daf_mapper.model.get_source(rdfNS['type'], submissionOntology['submission'])
- self.failUnlessEqual(str(source.uri), "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/analysis1")
+
+ self.failUnlessEqual(str(source.uri), submission_name)
+
+ view_name = submission_name + '/Signal'
view = daf_mapper.model.get_target(source, submissionOntology['has_view'])
- self.failUnlessEqual(str(view.uri), "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/analysis1/Signal")
+ self.failUnlessEqual(str(view.uri), view_name)
def test_library_url(self):
daf_mapper = load_daf_mapper('urltest')
'http://jumpgate.caltech.edu/library/')
daf_mapper.library_url = 'http://google.com'
self.failUnlessEqual(daf_mapper.library_url, 'http://google.com' )
-
+
+@contextmanager
+def mktempdir(prefix='tmp'):
+ d = tempfile.mkdtemp(prefix=prefix)
+ print "made", d
+ yield d
+ shutil.rmtree(d)
+ print "unmade", d
+
+@contextmanager
+def mktempfile(suffix='', prefix='tmp', dir=None):
+ fd, pathname = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
+ yield pathname
+ print "made", pathname
+ os.close(fd)
+ os.unlink(pathname)
+ print "unmade", pathname
+
def suite():
suite = unittest.makeSuite(TestDAF, 'test')
suite.addTest(unittest.makeSuite(TestDAFMapper, 'test'))
--- /dev/null
+"""Utility to make md5sums of a file caching as a parallel file
+"""
+import logging
+import os
+from subprocess import Popen, PIPE
+
+logger = logging.getLogger(__name__)
+
+def make_md5sum(filename):
+ """Quickly find the md5sum of a file
+ """
+ md5_cache = os.path.join(filename+".md5")
+ print md5_cache
+ if os.path.exists(md5_cache):
+ logger.debug("Found md5sum in {0}".format(md5_cache))
+ stream = open(md5_cache,'r')
+ lines = stream.readlines()
+ md5sum = parse_md5sum_line(lines, filename)
+ else:
+ md5sum = make_md5sum_unix(filename, md5_cache)
+ return md5sum
+
+def make_md5sum_unix(filename, md5_cache):
+ cmd = ["md5sum", filename]
+ logger.debug("Running {0}".format(" ".join(cmd)))
+ p = Popen(cmd, stdout=PIPE)
+ stdin, stdout = p.communicate()
+ retcode = p.wait()
+ logger.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
+ if retcode != 0:
+ logger.error("Trouble with md5sum for {0}".format(filename))
+ return None
+ lines = stdin.split(os.linesep)
+ md5sum = parse_md5sum_line(lines, filename)
+ if md5sum is not None:
+ logger.debug("Caching sum in {0}".format(md5_cache))
+ stream = open(md5_cache, "w")
+ stream.write(stdin)
+ stream.close()
+ return md5sum
+
+def parse_md5sum_line(lines, filename):
+ md5sum, md5sum_filename = lines[0].split()
+ if md5sum_filename != filename:
+ errmsg = "MD5sum and I disagre about filename. {0} != {1}"
+ logger.error(errmsg.format(filename, md5sum_filename))
+ return None
+ return md5sum
+