import json
import logging
import netrc
-from optparse import OptionParser
+from optparse import OptionParser, OptionGroup
import os
from pprint import pprint, pformat
import shlex
from StringIO import StringIO
import stat
-from subprocess import Popen, PIPE
import sys
import time
import types
import urllib2
import urlparse
+import RDF
+
from htsworkflow.util import api
from htsworkflow.util.rdfhelp import \
dafTermOntology, \
get_model, \
get_serializer, \
load_into_model, \
+ sparql_query, \
submissionOntology
-from htsworkflow.submission.daf import DAFMapper, get_submission_uri
+from htsworkflow.submission.daf import \
+ DAFMapper, \
+ MetadataLookupException, \
+ get_submission_uri
from htsworkflow.submission.condorfastq import CondorFastqExtract
logger = logging.getLogger('ucsc_gather')
apidata = api.make_auth_from_opts(opts, parser)
model = get_model(opts.load_model)
- mapper = DAFMapper(opts.name, opts.daf, model)
- submission_uri = get_submission_uri(opts.name)
+ if opts.name:
+ mapper = DAFMapper(opts.name, opts.daf, model)
+ submission_uri = get_submission_uri(opts.name)
+
+ if opts.library_url is not None:
+ mapper.library_url = opts.library_url
+
if opts.load_rdf is not None:
load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
- if opts.makeddf and opts.daf is None:
+ if opts.make_ddf and opts.daf is None:
parser.error("Please specify your daf when making ddf files")
- if len(args) == 0:
- parser.error("I need at least one library submission-dir input file")
-
library_result_map = []
for a in args:
library_result_map.extend(read_library_result_map(a))
if opts.make_tree_from is not None:
make_tree_from(opts.make_tree_from, library_result_map)
- #if opts.daf is not None:
- # link_daf(opts.daf, library_result_map)
+ if opts.link_daf:
+ link_daf(opts.daf, library_result_map)
if opts.fastq:
extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
if opts.scan_submission:
scan_submission_dirs(mapper, library_result_map)
- if opts.makeddf:
- make_all_ddfs(mapper, library_result_map, force=opts.force)
+ if opts.make_ddf:
+ make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
+ if opts.sparql:
+ sparql_query(model, opts.sparql)
+
if opts.print_rdf:
writer = get_serializer()
print writer.serialize_model_to_string(model)
def make_parser():
parser = OptionParser()
- parser.add_option('--name', help="Set submission name")
- parser.add_option('--load-model', default=None,
+ model = OptionGroup(parser, 'model')
+ model.add_option('--name', help="Set submission name")
+ model.add_option('--load-model', default=None,
help="Load model database")
- parser.add_option('--load-rdf', default=None,
+ model.add_option('--load-rdf', default=None,
help="load rdf statements into model")
- parser.add_option('--print-rdf', action="store_true", default=False,
+ model.add_option('--sparql', default=None, help="execute sparql query")
+ model.add_option('--print-rdf', action="store_true", default=False,
help="print ending model state")
-
+ parser.add_option_group(model)
# commands
- parser.add_option('--make-tree-from',
+ commands = OptionGroup(parser, 'commands')
+ commands.add_option('--make-tree-from',
help="create directories & link data files",
default=None)
- parser.add_option('--fastq', help="generate scripts for making fastq files",
- default=False, action="store_true")
-
- parser.add_option('--scan-submission', default=False, action="store_true",
+ commands.add_option('--fastq', default=False, action="store_true",
+ help="generate scripts for making fastq files")
+ commands.add_option('--scan-submission', default=False, action="store_true",
help="Import metadata for submission into our model")
-
- parser.add_option('--makeddf', help='make the ddfs', default=False,
+ commands.add_option('--link-daf', default=False, action="store_true",
+ help="link daf into submission directories")
+ commands.add_option('--make-ddf', help='make the ddfs', default=False,
action="store_true")
+ parser.add_option_group(commands)
- parser.add_option('--daf', default=None, help='specify daf name')
parser.add_option('--force', default=False, action="store_true",
help="Force regenerating fastqs")
-
+ parser.add_option('--daf', default=None, help='specify daf name')
+ parser.add_option('--library-url', default=None,
+ help="specify an alternate source for library information")
# debugging
parser.add_option('--verbose', default=False, action="store_true",
help='verbose logging')
if not os.path.exists(lib_path):
logging.info("Making dir {0}".format(lib_path))
os.mkdir(lib_path)
- source_lib_dir = os.path.join(source_path, lib_path)
+ source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
if os.path.exists(source_lib_dir):
pass
for filename in os.listdir(source_lib_dir):
"""Look through our submission directories and collect needed information
"""
for lib_id, result_dir in library_result_map:
- view_map.import_submission_dir(result_dir, lib_id)
+ logging.info("Importing %s from %s" % (lib_id, result_dir))
+ try:
+ view_map.import_submission_dir(result_dir, lib_id)
+ except MetadataLookupException, e:
+ logging.error("Skipping %s: %s" % (lib_id, str(e)))
-def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False):
+def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
dag_fragment = []
for lib_id, result_dir in library_result_map:
submissionNode = view_map.get_submission_node(result_dir)
dag_fragment.extend(
- make_ddf(view_map, submissionNode, make_condor, result_dir)
+ make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
)
if make_condor and len(dag_fragment) > 0:
f.close()
-def make_ddf(view_map, submissionNode, make_condor=False, outdir=None):
+def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
"""
Make ddf files, and bonus condor file
"""
+ query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
+PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
+
+select ?submitView ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol
+WHERE {
+ ?file ucscDaf:filename ?files ;
+ ucscDaf:md5sum ?md5sum .
+ ?submitView ucscDaf:has_file ?file ;
+ ucscDaf:view ?dafView ;
+ ucscDaf:submission <%(submission)s> .
+ ?dafView ucscDaf:name ?view .
+ <%(submission)s> submissionOntology:library ?library .
+
+ OPTIONAL { ?submitView ucscDaf:antibody ?antibody }
+ OPTIONAL { ?submitView ucscDaf:cell ?cell }
+ OPTIONAL { ?submitView ucscDaf:control ?control }
+ OPTIONAL { ?library ucscDaf:controlId ?controlId }
+ OPTIONAL { ?submitView ucscDaf:sex ?sex }
+ OPTIONAL { ?submitView ucscDaf:labVersion ?labExpId }
+ OPTIONAL { ?submitView ucscDaf:labVersion ?labVersion }
+ OPTIONAL { ?library ucscDaf:treatment ?treatment }
+ OPTIONAL { ?submitView ucscDaf:protocol ?protocol }
+}
+ORDER BY ?submitView"""
dag_fragments = []
- curdir = os.getcwd()
- if outdir is not None:
- os.chdir(outdir)
- output = sys.stdout
name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
if name is None:
logging.error("Need name for %s" % (str(submissionNode)))
return []
-
+
ddf_name = name + '.ddf'
- output = sys.stdout
- # output = open(ddf_name,'w')
+ if outdir is not None:
+ outfile = os.path.join(outdir, ddf_name)
+ output = open(outfile,'w')
+ else:
+ output = sys.stdout
+
+ formatted_query = query_template % {'submission': str(submissionNode.uri)}
+ query = RDF.SPARQLQuery(formatted_query)
+ results = query.execute(view_map.model)
+
+ variables = ['files']
# filename goes first
- variables = ['filename']
variables.extend(view_map.get_daf_variables())
+ variables += ['controlId', 'labExpId', 'md5sum']
output.write('\t'.join(variables))
output.write(os.linesep)
- submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
- file_list = []
- for viewNode in submission_views:
- record = []
+ all_views = {}
+ all_files = []
+ for row in results:
+ viewname = fromTypedNode(row['view'])
+ current = all_views.setdefault(viewname, {})
+ for variable_name in variables:
+ value = str(fromTypedNode(row[variable_name]))
+ if variable_name in ('files', 'md5sum'):
+ current.setdefault(variable_name,[]).append(value)
+ else:
+ current[variable_name] = value
+
+ for view in all_views.keys():
+ line = []
for variable_name in variables:
- varNode = dafTermOntology[variable_name]
- values = [fromTypedNode(v) for v in list(view_map.model.get_targets(viewNode, varNode))]
- if variable_name == 'filename':
- file_list.extend(values)
- if len(values) == 0:
- attribute = "#None#"
- elif len(values) == 1:
- attribute = values[0]
+ if variable_name in ('files', 'md5sum'):
+ line.append(','.join(all_views[view][variable_name]))
else:
- attribute = ",".join(values)
- record.append(attribute)
- output.write('\t'.join(record))
+ line.append(all_views[view][variable_name])
+ output.write("\t".join(line))
output.write(os.linesep)
-
+ all_files.extend(all_views[view]['files'])
+
logging.info(
"Examined {0}, found files: {1}".format(
- str(submissionNode), ", ".join(file_list)))
-
- #file_list.append(daf_name)
- #if ddf_name is not None:
- # file_list.append(ddf_name)
- #
- #if make_condor:
- # archive_condor = make_condor_archive_script(ininame, file_list)
- # upload_condor = make_condor_upload_script(ininame)
- #
- # dag_fragments.extend(
- # make_dag_fragment(ininame, archive_condor, upload_condor)
- # )
+ str(submissionNode), ", ".join(all_files)))
+
+ all_files.append(daf_name)
+ all_files.append(ddf_name)
+
+ if make_condor:
+ archive_condor = make_condor_archive_script(name, all_files, outdir)
+ upload_condor = make_condor_upload_script(name, outdir)
+
+ dag_fragments.extend(
+ make_dag_fragment(name, archive_condor, upload_condor)
+ )
- os.chdir(curdir)
-
return dag_fragments
return results
-def make_condor_archive_script(ininame, files):
+def make_condor_archive_script(name, files, outdir=None):
script = """Universe = vanilla
Executable = /bin/tar
arguments = czvhf ../%(archivename)s %(filelist)s
-Error = compress.err.$(Process).log
+Error = compress.out.$(Process).log
Output = compress.out.$(Process).log
Log = /tmp/submission-compress-%(user)s.log
initialdir = %(initialdir)s
queue
"""
+ if outdir is None:
+ outdir = os.getcwd()
for f in files:
- if not os.path.exists(f):
+ pathname = os.path.join(outdir, f)
+ if not os.path.exists(pathname):
raise RuntimeError("Missing %s" % (f,))
- context = {'archivename': make_submission_name(ininame),
+ context = {'archivename': make_submission_name(name),
'filelist': " ".join(files),
- 'initialdir': os.getcwd(),
+ 'initialdir': os.path.abspath(outdir),
'user': os.getlogin()}
- condor_script = make_condor_name(ininame, 'archive')
+ condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
condor_stream = open(condor_script,'w')
condor_stream.write(script % context)
condor_stream.close()
return condor_script
-def make_condor_upload_script(ininame):
+def make_condor_upload_script(name, outdir=None):
script = """Universe = vanilla
Executable = /usr/bin/lftp
arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
-Error = upload.err.$(Process).log
+Error = upload.out.$(Process).log
Output = upload.out.$(Process).log
Log = /tmp/submission-upload-%(user)s.log
initialdir = %(initialdir)s
queue
"""
+ if outdir is None:
+ outdir = os.getcwd()
+
auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
encodeftp = 'encodeftp.cse.ucsc.edu'
ftpuser = auth.hosts[encodeftp][0]
ftppassword = auth.hosts[encodeftp][2]
- context = {'archivename': make_submission_name(ininame),
- 'initialdir': os.getcwd(),
+ context = {'archivename': make_submission_name(name),
+ 'initialdir': os.path.abspath(outdir),
'user': os.getlogin(),
'ftpuser': ftpuser,
'ftppassword': ftppassword,
'ftphost': encodeftp}
- condor_script = make_condor_name(ininame, 'upload')
+ condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
condor_stream = open(condor_script,'w')
condor_stream.write(script % context)
condor_stream.close()
if not os.path.exists(f):
raise RuntimeError("%s does not exist" % (f,))
-def make_md5sum(filename):
- """Quickly find the md5sum of a file
- """
- md5_cache = os.path.join(filename+".md5")
- print md5_cache
- if os.path.exists(md5_cache):
- logging.debug("Found md5sum in {0}".format(md5_cache))
- stream = open(md5_cache,'r')
- lines = stream.readlines()
- md5sum = parse_md5sum_line(lines, filename)
- else:
- md5sum = make_md5sum_unix(filename, md5_cache)
- return md5sum
-
-def make_md5sum_unix(filename, md5_cache):
- cmd = ["md5sum", filename]
- logging.debug("Running {0}".format(" ".join(cmd)))
- p = Popen(cmd, stdout=PIPE)
- stdin, stdout = p.communicate()
- retcode = p.wait()
- logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
- if retcode != 0:
- logging.error("Trouble with md5sum for {0}".format(filename))
- return None
- lines = stdin.split(os.linesep)
- md5sum = parse_md5sum_line(lines, filename)
- if md5sum is not None:
- logging.debug("Caching sum in {0}".format(md5_cache))
- stream = open(md5_cache, "w")
- stream.write(stdin)
- stream.close()
- return md5sum
-
-def parse_md5sum_line(lines, filename):
- md5sum, md5sum_filename = lines[0].split()
- if md5sum_filename != filename:
- errmsg = "MD5sum and I disagre about filename. {0} != {1}"
- logging.error(errmsg.format(filename, md5sum_filename))
- return None
- return md5sum
-
if __name__ == "__main__":
main()