The proper DDF variable name for files to be submitted is files
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
index 2b9ba85b4edf202fb951c76024711eccd5d76b52..37169c075d7c5dbdfefa908a86247917c92e0aae 100755 (executable)
@@ -5,13 +5,12 @@ from glob import glob
 import json
 import logging
 import netrc
-from optparse import OptionParser
+from optparse import OptionParser, OptionGroup
 import os
 from pprint import pprint, pformat
 import shlex
 from StringIO import StringIO
 import stat
-from subprocess import Popen, PIPE
 import sys
 import time
 import types
@@ -19,6 +18,8 @@ import urllib
 import urllib2
 import urlparse
 
+import RDF
+
 from htsworkflow.util import api
 from htsworkflow.util.rdfhelp import \
      dafTermOntology, \
@@ -26,8 +27,12 @@ from htsworkflow.util.rdfhelp import \
      get_model, \
      get_serializer, \
      load_into_model, \
+     sparql_query, \
      submissionOntology 
-from htsworkflow.submission.daf import DAFMapper, get_submission_uri
+from htsworkflow.submission.daf import \
+     DAFMapper, \
+     MetadataLookupException, \
+     get_submission_uri
 from htsworkflow.submission.condorfastq import CondorFastqExtract
 
 logger = logging.getLogger('ucsc_gather')
@@ -46,17 +51,19 @@ def main(cmdline=None):
     apidata = api.make_auth_from_opts(opts, parser)
 
     model = get_model(opts.load_model)
-    mapper = DAFMapper(opts.name, opts.daf,  model)
-    submission_uri = get_submission_uri(opts.name)
+    if opts.name:
+        mapper = DAFMapper(opts.name, opts.daf,  model)
+        submission_uri = get_submission_uri(opts.name)
+
+    if opts.library_url is not None:
+        mapper.library_url = opts.library_url
+        
     if opts.load_rdf is not None:
         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
 
-    if opts.makeddf and opts.daf is None:
+    if opts.make_ddf and opts.daf is None:
         parser.error("Please specify your daf when making ddf files")
 
-    if len(args) == 0:
-        parser.error("I need at least one library submission-dir input file")
-        
     library_result_map = []
     for a in args:
         library_result_map.extend(read_library_result_map(a))
@@ -64,8 +71,8 @@ def main(cmdline=None):
     if opts.make_tree_from is not None:
         make_tree_from(opts.make_tree_from, library_result_map)
             
-    #if opts.daf is not None:
-    #    link_daf(opts.daf, library_result_map)
+    if opts.link_daf:
+        link_daf(opts.daf, library_result_map)
 
     if opts.fastq:
         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
@@ -75,9 +82,12 @@ def main(cmdline=None):
     if opts.scan_submission:
         scan_submission_dirs(mapper, library_result_map)
 
-    if opts.makeddf:
-        make_all_ddfs(mapper, library_result_map, force=opts.force)
+    if opts.make_ddf:
+        make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
 
+    if opts.sparql:
+        sparql_query(model, opts.sparql)
+        
     if opts.print_rdf:
         writer = get_serializer()
         print writer.serialize_model_to_string(model)
@@ -86,31 +96,36 @@ def main(cmdline=None):
 def make_parser():
     parser = OptionParser()
 
-    parser.add_option('--name', help="Set submission name")
-    parser.add_option('--load-model', default=None,
+    model = OptionGroup(parser, 'model')
+    model.add_option('--name', help="Set submission name")
+    model.add_option('--load-model', default=None,
       help="Load model database")
-    parser.add_option('--load-rdf', default=None,
+    model.add_option('--load-rdf', default=None,
       help="load rdf statements into model")
-    parser.add_option('--print-rdf', action="store_true", default=False,
+    model.add_option('--sparql', default=None, help="execute sparql query")
+    model.add_option('--print-rdf', action="store_true", default=False,
       help="print ending model state")
-
+    parser.add_option_group(model)
     # commands
-    parser.add_option('--make-tree-from',
+    commands = OptionGroup(parser, 'commands')
+    commands.add_option('--make-tree-from',
                       help="create directories & link data files",
                       default=None)
-    parser.add_option('--fastq', help="generate scripts for making fastq files",
-                      default=False, action="store_true")
-
-    parser.add_option('--scan-submission', default=False, action="store_true",
+    commands.add_option('--fastq', default=False, action="store_true",
+                        help="generate scripts for making fastq files")
+    commands.add_option('--scan-submission', default=False, action="store_true",
                       help="Import metadata for submission into our model")
-    
-    parser.add_option('--makeddf', help='make the ddfs', default=False,
+    commands.add_option('--link-daf', default=False, action="store_true",
+                        help="link daf into submission directories")
+    commands.add_option('--make-ddf', help='make the ddfs', default=False,
                       action="store_true")
+    parser.add_option_group(commands)
     
-    parser.add_option('--daf', default=None, help='specify daf name')
     parser.add_option('--force', default=False, action="store_true",
                       help="Force regenerating fastqs")
-
+    parser.add_option('--daf', default=None, help='specify daf name')
+    parser.add_option('--library-url', default=None,
+                      help="specify an alternate source for library information")
     # debugging
     parser.add_option('--verbose', default=False, action="store_true",
                       help='verbose logging')
@@ -128,7 +143,7 @@ def make_tree_from(source_path, library_result_map):
         if not os.path.exists(lib_path):
             logging.info("Making dir {0}".format(lib_path))
             os.mkdir(lib_path)
-        source_lib_dir = os.path.join(source_path, lib_path)
+        source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
         if os.path.exists(source_lib_dir):
             pass
         for filename in os.listdir(source_lib_dir):
@@ -162,14 +177,18 @@ def scan_submission_dirs(view_map, library_result_map):
     """Look through our submission directories and collect needed information
     """
     for lib_id, result_dir in library_result_map:
-        view_map.import_submission_dir(result_dir, lib_id)
+        logging.info("Importing %s from %s" % (lib_id, result_dir))
+        try:
+            view_map.import_submission_dir(result_dir, lib_id)
+        except MetadataLookupException, e:
+            logging.error("Skipping %s: %s" % (lib_id, str(e)))
         
-def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False):
+def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
     dag_fragment = []
     for lib_id, result_dir in library_result_map:
         submissionNode = view_map.get_submission_node(result_dir)
         dag_fragment.extend(
-            make_ddf(view_map, submissionNode, make_condor, result_dir)
+            make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
         )
 
     if make_condor and len(dag_fragment) > 0:
@@ -183,68 +202,99 @@ def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False):
             f.close()
             
 
-def make_ddf(view_map, submissionNode, make_condor=False, outdir=None):
+def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
     """
     Make ddf files, and bonus condor file
     """
+    query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
+PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
+
+select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol
+WHERE {
+  ?file ucscDaf:filename ?files ;
+        ucscDaf:md5sum ?md5sum .
+  ?submitView ucscDaf:has_file ?file ;
+              ucscDaf:view ?dafView ;
+              ucscDaf:submission <%(submission)s> .
+  ?dafView ucscDaf:name ?view .
+  <%(submission)s> submissionOntology:library ?library .
+
+  OPTIONAL { ?submitView ucscDaf:antibody ?antibody }
+  OPTIONAL { ?submitView ucscDaf:cell ?cell }
+  OPTIONAL { ?submitView ucscDaf:control ?control }
+  OPTIONAL { ?library ucscDaf:controlId ?controlId }
+  OPTIONAL { ?submitView ucscDaf:sex ?sex }
+  OPTIONAL { ?submitView ucscDaf:labVersion ?labExpId }
+  OPTIONAL { ?submitView ucscDaf:labVersion ?labVersion }
+  OPTIONAL { ?library ucscDaf:treatment ?treatment }
+  OPTIONAL { ?submitView ucscDaf:protocol ?protocol }
+}
+ORDER BY  ?submitView""" 
     dag_fragments = []
-    curdir = os.getcwd()
-    if outdir is not None:
-        os.chdir(outdir)
-    output = sys.stdout
 
     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
     if name is None:
         logging.error("Need name for %s" % (str(submissionNode)))
         return []
-    
+
     ddf_name = name + '.ddf'
-    output = sys.stdout
-    # output = open(ddf_name,'w')
+    if outdir is not None:
+        outfile = os.path.join(outdir, ddf_name)
+        output = open(outfile,'w')
+    else:
+        output = sys.stdout
+
+    formatted_query = query_template % {'submission': str(submissionNode.uri)}
 
+    query = RDF.SPARQLQuery(formatted_query)
+    results = query.execute(view_map.model)
+
+    variables = ['files']
     # filename goes first
-    variables = ['filename']
     variables.extend(view_map.get_daf_variables())
+    variables += ['controlId', 'labExpId', 'md5sum']
     output.write('\t'.join(variables))
     output.write(os.linesep)
     
-    submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
-    file_list = []
-    for viewNode in submission_views:
-        record = []
+    all_views = {}
+    all_files = []
+    for row in results:
+        viewname = fromTypedNode(row['view'])
+        current = all_views.setdefault(viewname, {})
+        for variable_name in variables:
+            value = str(fromTypedNode(row[variable_name]))
+            if variable_name in ('files', 'md5sum'):
+                current.setdefault(variable_name,[]).append(value)
+            else:
+                current[variable_name] = value
+
+    for view in all_views.keys():
+        line = []
         for variable_name in variables:
-            varNode = dafTermOntology[variable_name]
-            values = [fromTypedNode(v) for v in list(view_map.model.get_targets(viewNode, varNode))]
-            if variable_name == 'filename':
-                file_list.extend(values)
-            if len(values) == 0:
-                attribute = "#None#"
-            elif len(values) == 1:
-                attribute = values[0]
+            if variable_name in ('files', 'md5sum'):
+                line.append(','.join(all_views[view][variable_name]))
             else:
-                attribute = ",".join(values)
-            record.append(attribute)
-        output.write('\t'.join(record))
+                line.append(all_views[view][variable_name])
+        output.write("\t".join(line))
         output.write(os.linesep)
-            
+        all_files.extend(all_views[view]['files'])
+        
     logging.info(
         "Examined {0}, found files: {1}".format(
-            str(submissionNode), ", ".join(file_list)))
-
-    #file_list.append(daf_name)
-    #if ddf_name is not None:
-    #    file_list.append(ddf_name)
-    #
-    #if make_condor:
-    #    archive_condor = make_condor_archive_script(ininame, file_list)
-    #    upload_condor = make_condor_upload_script(ininame)
-    #    
-    #    dag_fragments.extend( 
-    #        make_dag_fragment(ininame, archive_condor, upload_condor)
-    #    ) 
+            str(submissionNode), ", ".join(all_files)))
+
+    all_files.append(daf_name)
+    all_files.append(ddf_name)
+
+    if make_condor:
+        archive_condor = make_condor_archive_script(name, all_files, outdir)
+        upload_condor = make_condor_upload_script(name, outdir)
+        
+        dag_fragments.extend( 
+            make_dag_fragment(name, archive_condor, upload_condor)
+        ) 
         
-    os.chdir(curdir)
-    
     return dag_fragments
 
 
@@ -267,13 +317,13 @@ def read_library_result_map(filename):
     return results
 
 
-def make_condor_archive_script(ininame, files):
+def make_condor_archive_script(name, files, outdir=None):
     script = """Universe = vanilla
 
 Executable = /bin/tar
 arguments = czvhf ../%(archivename)s %(filelist)s
 
-Error = compress.err.$(Process).log
+Error = compress.out.$(Process).log
 Output = compress.out.$(Process).log
 Log = /tmp/submission-compress-%(user)s.log
 initialdir = %(initialdir)s
@@ -282,48 +332,54 @@ request_memory = 20
 
 queue 
 """
+    if outdir is None:
+        outdir = os.getcwd()
     for f in files:
-        if not os.path.exists(f):
+        pathname = os.path.join(outdir, f)
+        if not os.path.exists(pathname):
             raise RuntimeError("Missing %s" % (f,))
 
-    context = {'archivename': make_submission_name(ininame),
+    context = {'archivename': make_submission_name(name),
                'filelist': " ".join(files),
-               'initialdir': os.getcwd(),
+               'initialdir': os.path.abspath(outdir), 
                'user': os.getlogin()}
 
-    condor_script = make_condor_name(ininame, 'archive')
+    condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
     condor_stream = open(condor_script,'w')
     condor_stream.write(script % context)
     condor_stream.close()
     return condor_script
 
 
-def make_condor_upload_script(ininame):
+def make_condor_upload_script(name, outdir=None):
     script = """Universe = vanilla
 
 Executable = /usr/bin/lftp
 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
 
-Error = upload.err.$(Process).log
+Error = upload.out.$(Process).log
 Output = upload.out.$(Process).log
 Log = /tmp/submission-upload-%(user)s.log
 initialdir = %(initialdir)s
 
 queue 
 """
+    if outdir is None:
+        outdir = os.getcwd()
+        
     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
     
     encodeftp = 'encodeftp.cse.ucsc.edu'
     ftpuser = auth.hosts[encodeftp][0]
     ftppassword = auth.hosts[encodeftp][2]
-    context = {'archivename': make_submission_name(ininame),
-               'initialdir': os.getcwd(),
+    context = {'archivename': make_submission_name(name),
+               'initialdir': os.path.abspath(outdir),
                'user': os.getlogin(),
                'ftpuser': ftpuser,
                'ftppassword': ftppassword,
                'ftphost': encodeftp}
 
-    condor_script = make_condor_name(ininame, 'upload')
+    condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
     condor_stream = open(condor_script,'w')
     condor_stream.write(script % context)
     condor_stream.close()
@@ -404,46 +460,5 @@ def validate_filelist(files):
         if not os.path.exists(f):
             raise RuntimeError("%s does not exist" % (f,))
 
-def make_md5sum(filename):
-    """Quickly find the md5sum of a file
-    """
-    md5_cache = os.path.join(filename+".md5")
-    print md5_cache
-    if os.path.exists(md5_cache):
-        logging.debug("Found md5sum in {0}".format(md5_cache))
-        stream = open(md5_cache,'r')
-        lines = stream.readlines()
-        md5sum = parse_md5sum_line(lines, filename)
-    else:
-        md5sum = make_md5sum_unix(filename, md5_cache)
-    return md5sum
-    
-def make_md5sum_unix(filename, md5_cache):
-    cmd = ["md5sum", filename]
-    logging.debug("Running {0}".format(" ".join(cmd)))
-    p = Popen(cmd, stdout=PIPE)
-    stdin, stdout = p.communicate()
-    retcode = p.wait()
-    logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
-    if retcode != 0:
-        logging.error("Trouble with md5sum for {0}".format(filename))
-        return None
-    lines = stdin.split(os.linesep)
-    md5sum = parse_md5sum_line(lines, filename)
-    if md5sum is not None:
-        logging.debug("Caching sum in {0}".format(md5_cache))
-        stream = open(md5_cache, "w")
-        stream.write(stdin)
-        stream.close()
-    return md5sum
-
-def parse_md5sum_line(lines, filename):
-    md5sum, md5sum_filename = lines[0].split()
-    if md5sum_filename != filename:
-        errmsg = "MD5sum and I disagre about filename. {0} != {1}"
-        logging.error(errmsg.format(filename, md5sum_filename))
-        return None
-    return md5sum
-
 if __name__ == "__main__":
     main()