Further clean up ddf generation.
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
index ddd1fad63512c79f0ea9394931f0a381ad099579..d52b11bc122cf523a06253091e461848ce33a6de 100755 (executable)
@@ -11,7 +11,6 @@ from pprint import pprint, pformat
 import shlex
 from StringIO import StringIO
 import stat
-from subprocess import Popen, PIPE
 import sys
 import time
 import types
@@ -19,6 +18,8 @@ import urllib
 import urllib2
 import urlparse
 
+import RDF
+
 from htsworkflow.util import api
 from htsworkflow.util.rdfhelp import \
      dafTermOntology, \
@@ -26,6 +27,7 @@ from htsworkflow.util.rdfhelp import \
      get_model, \
      get_serializer, \
      load_into_model, \
+     sparql_query, \
      submissionOntology 
 from htsworkflow.submission.daf import \
      DAFMapper, \
@@ -62,9 +64,6 @@ def main(cmdline=None):
     if opts.make_ddf and opts.daf is None:
         parser.error("Please specify your daf when making ddf files")
 
-    if len(args) == 0:
-        parser.error("I need at least one library submission-dir input file")
-        
     library_result_map = []
     for a in args:
         library_result_map.extend(read_library_result_map(a))
@@ -86,6 +85,9 @@ def main(cmdline=None):
     if opts.make_ddf:
         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
 
+    if opts.sparql:
+        sparql_query(model, opts.sparql)
+        
     if opts.print_rdf:
         writer = get_serializer()
         print writer.serialize_model_to_string(model)
@@ -100,6 +102,7 @@ def make_parser():
       help="Load model database")
     model.add_option('--load-rdf', default=None,
       help="load rdf statements into model")
+    model.add_option('--sparql', default=None, help="execute sparql query")
     model.add_option('--print-rdf', action="store_true", default=False,
       help="print ending model state")
     parser.add_option_group(model)
@@ -203,6 +206,31 @@ def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None)
     """
     Make ddf files, and bonus condor file
     """
+    query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
+PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
+
+select ?submitView  ?filename ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol
+WHERE {
+  ?file ucscDaf:filename ?filename ;
+        ucscDaf:md5sum ?md5sum .
+  ?submitView ucscDaf:has_file ?file ;
+              ucscDaf:view ?dafView ;
+              ucscDaf:submission %(submission)s .
+  ?dafView ucscDaf:name ?view .
+  %(submission)s submissionOntology:library ?library .
+
+  OPTIONAL { ?submitView ucscDaf:antibody ?antibody }
+  OPTIONAL { ?submitView ucscDaf:cell ?cell }
+  OPTIONAL { ?submitView ucscDaf:control ?control }
+  OPTIONAL { ?library ucscDaf:controlId ?controlId }
+  OPTIONAL { ?submitView ucscDaf:sex ?sex }
+  OPTIONAL { ?submitView ucscDaf:labVersion ?labExpId }
+  OPTIONAL { ?submitView ucscDaf:labVersion ?labVersion }
+  OPTIONAL { ?library ucscDaf:treatment ?treatment }
+  OPTIONAL { ?submitView ucscDaf:protocol ?protocol }
+}
+ORDER BY  ?submitView""" 
     dag_fragments = []
 
     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
@@ -216,51 +244,51 @@ def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None)
         output = open(outfile,'w')
     else:
         output = sys.stdout
-    
-    # filename goes first
+
+    formatted_query = query_template % {'submission': str(submissionNode)}
+
+    query = RDF.SPARQLQuery(formatted_query)
+    results = query.execute(view_map.model)
+
     variables = ['filename']
+    # filename goes first
     variables.extend(view_map.get_daf_variables())
+    variables += ['controlId', 'labExpId', 'md5sum']
     output.write('\t'.join(variables))
     output.write(os.linesep)
     
-    nameTerm = dafTermOntology['name']
-
-    submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
-    file_list = []
-    for viewNode in submission_views:
-        record = []
+    all_views = {}
+    all_files = []
+    for row in results:
+        viewname = fromTypedNode(row['view'])
+        current = all_views.setdefault(viewname, {})
         for variable_name in variables:
-            varNode = dafTermOntology[variable_name]
-            values = list(view_map.model.get_targets(viewNode, varNode))
-            
-            if variable_name == 'view':
-                nameNode = view_map.model.get_target(values[0], nameTerm)
-                values = [fromTypedNode(nameNode)]
+            value = str(fromTypedNode(row[variable_name]))
+            if variable_name in ('filename', 'md5sum'):
+                current.setdefault(variable_name,[]).append(value)
             else:
-                values = [ fromTypedNode(v) for v in values ]
-                if variable_name == 'filename':
-                    file_list.extend(values)
-                              
-            if len(values) == 0:
-                attribute = "#None#"
-            elif len(values) == 1:
-                attribute = values[0]
+                current[variable_name] = value
+
+    for view in all_views.keys():
+        line = []
+        for variable_name in variables:
+            if variable_name in ('filename', 'md5sum'):
+                line.append(','.join(all_views[view][variable_name]))
             else:
-                attribute = ",".join(values)
-            record.append(attribute)
-        output.write('\t'.join(record))
+                line.append(all_views[view][variable_name])
+        output.write("\t".join(line))
         output.write(os.linesep)
-            
+        all_files.extend(all_views[view]['filename'])
+        
     logging.info(
         "Examined {0}, found files: {1}".format(
-            str(submissionNode), ", ".join(file_list)))
+            str(submissionNode), ", ".join(all_files)))
+
+    all_files.append(daf_name)
+    all_files.append(ddf_name)
 
-    file_list.append(daf_name)
-    file_list.append(ddf_name)
-    
     if make_condor:
-        print name, file_list
-        archive_condor = make_condor_archive_script(name, file_list, outdir)
+        archive_condor = make_condor_archive_script(name, all_files, outdir)
         upload_condor = make_condor_upload_script(name, outdir)
         
         dag_fragments.extend( 
@@ -295,7 +323,7 @@ def make_condor_archive_script(name, files, outdir=None):
 Executable = /bin/tar
 arguments = czvhf ../%(archivename)s %(filelist)s
 
-Error = compress.err.$(Process).log
+Error = compress.out.$(Process).log
 Output = compress.out.$(Process).log
 Log = /tmp/submission-compress-%(user)s.log
 initialdir = %(initialdir)s
@@ -313,7 +341,7 @@ queue
 
     context = {'archivename': make_submission_name(name),
                'filelist': " ".join(files),
-               'initialdir': os.getcwd(),
+               'initialdir': os.path.abspath(outdir), 
                'user': os.getlogin()}
 
     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
@@ -329,7 +357,7 @@ def make_condor_upload_script(name, outdir=None):
 Executable = /usr/bin/lftp
 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
 
-Error = upload.err.$(Process).log
+Error = upload.out.$(Process).log
 Output = upload.out.$(Process).log
 Log = /tmp/submission-upload-%(user)s.log
 initialdir = %(initialdir)s
@@ -345,7 +373,7 @@ queue
     ftpuser = auth.hosts[encodeftp][0]
     ftppassword = auth.hosts[encodeftp][2]
     context = {'archivename': make_submission_name(name),
-               'initialdir': outdir,
+               'initialdir': os.path.abspath(outdir),
                'user': os.getlogin(),
                'ftpuser': ftpuser,
                'ftppassword': ftppassword,
@@ -432,46 +460,5 @@ def validate_filelist(files):
         if not os.path.exists(f):
             raise RuntimeError("%s does not exist" % (f,))
 
-def make_md5sum(filename):
-    """Quickly find the md5sum of a file
-    """
-    md5_cache = os.path.join(filename+".md5")
-    print md5_cache
-    if os.path.exists(md5_cache):
-        logging.debug("Found md5sum in {0}".format(md5_cache))
-        stream = open(md5_cache,'r')
-        lines = stream.readlines()
-        md5sum = parse_md5sum_line(lines, filename)
-    else:
-        md5sum = make_md5sum_unix(filename, md5_cache)
-    return md5sum
-    
-def make_md5sum_unix(filename, md5_cache):
-    cmd = ["md5sum", filename]
-    logging.debug("Running {0}".format(" ".join(cmd)))
-    p = Popen(cmd, stdout=PIPE)
-    stdin, stdout = p.communicate()
-    retcode = p.wait()
-    logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
-    if retcode != 0:
-        logging.error("Trouble with md5sum for {0}".format(filename))
-        return None
-    lines = stdin.split(os.linesep)
-    md5sum = parse_md5sum_line(lines, filename)
-    if md5sum is not None:
-        logging.debug("Caching sum in {0}".format(md5_cache))
-        stream = open(md5_cache, "w")
-        stream.write(stdin)
-        stream.close()
-    return md5sum
-
-def parse_md5sum_line(lines, filename):
-    md5sum, md5sum_filename = lines[0].split()
-    if md5sum_filename != filename:
-        errmsg = "MD5sum and I disagre about filename. {0} != {1}"
-        logging.error(errmsg.format(filename, md5sum_filename))
-        return None
-    return md5sum
-
 if __name__ == "__main__":
     main()