import shlex
from StringIO import StringIO
import stat
-from subprocess import Popen, PIPE
import sys
import time
import types
import urllib2
import urlparse
+import RDF
+
from htsworkflow.util import api
from htsworkflow.util.rdfhelp import \
dafTermOntology, \
get_model, \
get_serializer, \
load_into_model, \
+ sparql_query, \
submissionOntology
from htsworkflow.submission.daf import \
DAFMapper, \
if opts.make_ddf and opts.daf is None:
parser.error("Please specify your daf when making ddf files")
- if len(args) == 0:
- parser.error("I need at least one library submission-dir input file")
-
library_result_map = []
for a in args:
library_result_map.extend(read_library_result_map(a))
if opts.make_ddf:
make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
+ if opts.sparql:
+ sparql_query(model, opts.sparql)
+
if opts.print_rdf:
writer = get_serializer()
print writer.serialize_model_to_string(model)
help="Load model database")
model.add_option('--load-rdf', default=None,
help="load rdf statements into model")
+ model.add_option('--sparql', default=None, help="execute sparql query")
model.add_option('--print-rdf', action="store_true", default=False,
help="print ending model state")
parser.add_option_group(model)
"""
Make ddf files, and bonus condor file
"""
+ query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
+PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
+
+select ?submitView ?filename ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol
+WHERE {
+ ?file ucscDaf:filename ?filename ;
+ ucscDaf:md5sum ?md5sum .
+ ?submitView ucscDaf:has_file ?file ;
+ ucscDaf:view ?dafView ;
+ ucscDaf:submission %(submission)s .
+ ?dafView ucscDaf:name ?view .
+ %(submission)s submissionOntology:library ?library .
+
+ OPTIONAL { ?submitView ucscDaf:antibody ?antibody }
+ OPTIONAL { ?submitView ucscDaf:cell ?cell }
+ OPTIONAL { ?submitView ucscDaf:control ?control }
+ OPTIONAL { ?library ucscDaf:controlId ?controlId }
+ OPTIONAL { ?submitView ucscDaf:sex ?sex }
+ OPTIONAL { ?submitView ucscDaf:labVersion ?labExpId }
+ OPTIONAL { ?submitView ucscDaf:labVersion ?labVersion }
+ OPTIONAL { ?library ucscDaf:treatment ?treatment }
+ OPTIONAL { ?submitView ucscDaf:protocol ?protocol }
+}
+ORDER BY ?submitView"""
dag_fragments = []
name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
output = open(outfile,'w')
else:
output = sys.stdout
-
- # filename goes first
+
+ formatted_query = query_template % {'submission': str(submissionNode)}
+
+ query = RDF.SPARQLQuery(formatted_query)
+ results = query.execute(view_map.model)
+
variables = ['filename']
+ # filename goes first
variables.extend(view_map.get_daf_variables())
+ variables += ['controlId', 'labExpId', 'md5sum']
output.write('\t'.join(variables))
output.write(os.linesep)
- nameTerm = dafTermOntology['name']
-
- submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
- file_list = []
- for viewNode in submission_views:
- record = []
+ all_views = {}
+ all_files = []
+ for row in results:
+ viewname = fromTypedNode(row['view'])
+ current = all_views.setdefault(viewname, {})
for variable_name in variables:
- varNode = dafTermOntology[variable_name]
- values = list(view_map.model.get_targets(viewNode, varNode))
-
- if variable_name == 'view':
- nameNode = view_map.model.get_target(values[0], nameTerm)
- values = [fromTypedNode(nameNode)]
+ value = str(fromTypedNode(row[variable_name]))
+ if variable_name in ('filename', 'md5sum'):
+ current.setdefault(variable_name,[]).append(value)
else:
- values = [ fromTypedNode(v) for v in values ]
- if variable_name == 'filename':
- file_list.extend(values)
-
- if len(values) == 0:
- attribute = "#None#"
- elif len(values) == 1:
- attribute = values[0]
+ current[variable_name] = value
+
+ for view in all_views.keys():
+ line = []
+ for variable_name in variables:
+ if variable_name in ('filename', 'md5sum'):
+ line.append(','.join(all_views[view][variable_name]))
else:
- attribute = ",".join(values)
- record.append(attribute)
- output.write('\t'.join(record))
+ line.append(all_views[view][variable_name])
+ output.write("\t".join(line))
output.write(os.linesep)
-
+ all_files.extend(all_views[view]['filename'])
+
logging.info(
"Examined {0}, found files: {1}".format(
- str(submissionNode), ", ".join(file_list)))
+ str(submissionNode), ", ".join(all_files)))
+
+ all_files.append(daf_name)
+ all_files.append(ddf_name)
- file_list.append(daf_name)
- file_list.append(ddf_name)
-
if make_condor:
- print name, file_list
- archive_condor = make_condor_archive_script(name, file_list, outdir)
+ archive_condor = make_condor_archive_script(name, all_files, outdir)
upload_condor = make_condor_upload_script(name, outdir)
dag_fragments.extend(
Executable = /bin/tar
arguments = czvhf ../%(archivename)s %(filelist)s
-Error = compress.err.$(Process).log
+Error = compress.out.$(Process).log
Output = compress.out.$(Process).log
Log = /tmp/submission-compress-%(user)s.log
initialdir = %(initialdir)s
context = {'archivename': make_submission_name(name),
'filelist': " ".join(files),
- 'initialdir': os.getcwd(),
+ 'initialdir': os.path.abspath(outdir),
'user': os.getlogin()}
condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
Executable = /usr/bin/lftp
arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
-Error = upload.err.$(Process).log
+Error = upload.out.$(Process).log
Output = upload.out.$(Process).log
Log = /tmp/submission-upload-%(user)s.log
initialdir = %(initialdir)s
ftpuser = auth.hosts[encodeftp][0]
ftppassword = auth.hosts[encodeftp][2]
context = {'archivename': make_submission_name(name),
- 'initialdir': outdir,
+ 'initialdir': os.path.abspath(outdir),
'user': os.getlogin(),
'ftpuser': ftpuser,
'ftppassword': ftppassword,
if not os.path.exists(f):
raise RuntimeError("%s does not exist" % (f,))
-def make_md5sum(filename):
- """Quickly find the md5sum of a file
- """
- md5_cache = os.path.join(filename+".md5")
- print md5_cache
- if os.path.exists(md5_cache):
- logging.debug("Found md5sum in {0}".format(md5_cache))
- stream = open(md5_cache,'r')
- lines = stream.readlines()
- md5sum = parse_md5sum_line(lines, filename)
- else:
- md5sum = make_md5sum_unix(filename, md5_cache)
- return md5sum
-
-def make_md5sum_unix(filename, md5_cache):
- cmd = ["md5sum", filename]
- logging.debug("Running {0}".format(" ".join(cmd)))
- p = Popen(cmd, stdout=PIPE)
- stdin, stdout = p.communicate()
- retcode = p.wait()
- logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
- if retcode != 0:
- logging.error("Trouble with md5sum for {0}".format(filename))
- return None
- lines = stdin.split(os.linesep)
- md5sum = parse_md5sum_line(lines, filename)
- if md5sum is not None:
- logging.debug("Caching sum in {0}".format(md5_cache))
- stream = open(md5_cache, "w")
- stream.write(stdin)
- stream.close()
- return md5sum
-
-def parse_md5sum_line(lines, filename):
- md5sum, md5sum_filename = lines[0].split()
- if md5sum_filename != filename:
- errmsg = "MD5sum and I disagre about filename. {0} != {1}"
- logging.error(errmsg.format(filename, md5sum_filename))
- return None
- return md5sum
-
if __name__ == "__main__":
main()