Switch encode3 submitter to use aws.
authorDiane Trout <diane@caltech.edu>
Fri, 27 Mar 2015 18:11:11 +0000 (11:11 -0700)
committerDiane Trout <diane@caltech.edu>
Fri, 27 Mar 2015 18:13:12 +0000 (11:13 -0700)
I needed to write a submission class to support submitting to
aws, and of course there's a slightly different set of metadata
they want to know for submission.

encode_submission/encode3.py
htsworkflow/submission/aws_submission.py [new file with mode: 0644]
htsworkflow/templates/aws_metadata.sparql [new file with mode: 0644]

index 17f4f657d4bb2a7134ad62affb369c0b48bbd836..a3f6d29642bd2e391d8f6802bd138c72a76a86f1 100644 (file)
@@ -36,9 +36,8 @@ from htsworkflow.util.rdfhelp import \
 from htsworkflow.submission.daf import get_submission_uri
 from htsworkflow.submission.submission import list_submissions
 from htsworkflow.submission.results import ResultMap
-from htsworkflow.submission.trackhub_submission import TrackHubSubmission
 from htsworkflow.submission.condorfastq import CondorFastqExtract
-
+from htsworkflow.submission.aws_submission import AWSSubmission
 logger = logging.getLogger(__name__)
 
 INDENTED = "  " + os.linesep
@@ -80,17 +79,9 @@ def main(cmdline=None):
 
     if name:
         submission_uri = get_submission_uri(name)
-        logger.info('Submission URI: %s', name)
-    else:
-        logger.debug('No name, unable to create submission ur')
-
-    mapper = None
-    if opts.make_track_hub:
-        mapper = TrackHubSubmission(name,
-                                    model,
-                                    baseurl=opts.make_track_hub,
-                                    baseupload=opts.track_hub_upload,
-                                    host=opts.host)
+        logger.info('Submission URI: %s', submission_uri)
+
+    mapper = AWSSubmission(name, model, encode_host=opts.encoded, lims_host=opts.host)
 
     if opts.load_rdf is not None:
         if submission_uri is None:
@@ -122,15 +113,10 @@ def main(cmdline=None):
     if opts.scan_submission:
         if name is None:
             parser.error("Please define a submission name")
-        if mapper is None:
-            parser.error("Scan submission needs --make-track-hub=public-url")
         mapper.scan_submission_dirs(results)
 
-    if opts.make_track_hub:
-        trackdb = mapper.make_hub(results)
-
-    if opts.make_manifest:
-        make_manifest(mapper, results, opts.make_manifest)
+    if opts.upload:
+        mapper.upload(results, opts.dry_run)
 
     if opts.sparql:
         sparql_query(model, opts.sparql)
@@ -140,16 +126,6 @@ def main(cmdline=None):
         print writer.serialize_model_to_string(model)
 
 
-def make_manifest(mapper, results, filename=None):
-    manifest = mapper.make_manifest(results)
-
-    if filename is None or filename == '-':
-        sys.stdout.write(manifest)
-    else:
-        with open(filename, 'w') as mainifeststream:
-            mainifeststream.write(manifest)
-
-
 def make_parser():
     parser = OptionParser()
 
@@ -176,14 +152,9 @@ def make_parser():
     commands.add_option('--fastq', default=False, action="store_true",
                         help="generate scripts for making fastq files")
     commands.add_option('--scan-submission', default=False, action="store_true",
-                        help="Import metadata for submission into our model")
-    commands.add_option('--make-track-hub', default=None,
-                        help='web root that will host the trackhub.')
-    commands.add_option('--track-hub-upload', default=None,
-                        help='where to upload track hub <host>:<path>')
-    commands.add_option('--make-manifest',
-                        help='name the manifest file name or - for stdout to create it',
-                        default=None)
+                        help="cache md5 sums")
+    commands.add_option('--upload', default=False, action="store_true",
+                        help="Upload files")
 
     parser.add_option_group(commands)
 
@@ -192,9 +163,12 @@ def make_parser():
     parser.add_option('--compression', default=None, type='choice',
                       choices=['gzip'],
                       help='select compression type for fastq files')
-    parser.add_option('--daf', default=None, help='specify daf name')
     parser.add_option('--library-url', default=None,
                       help="specify an alternate source for library information")
+    parser.add_option('--encoded', default='www.encodeproject.org',
+                      help='base url for talking to encode server')
+    parser.add_option('--dry-run', default=False, action='store_true',
+                      help='avoid making changes to encoded')
     # debugging
     parser.add_option('--verbose', default=False, action="store_true",
                       help='verbose logging')
diff --git a/htsworkflow/submission/aws_submission.py b/htsworkflow/submission/aws_submission.py
new file mode 100644 (file)
index 0000000..f4cd5c0
--- /dev/null
@@ -0,0 +1,118 @@
+"""Partially through ENCODE3 the DCC switched to needing to upload via AWS
+"""
+
+import logging
+import json
+import os
+from pprint import pformat, pprint
+import string
+import subprocess
+import time
+import re
+
+import jsonschema
+import RDF
+
+from htsworkflow.submission.submission import Submission
+from .encoded import ENCODED
+from htsworkflow.util.rdfhelp import \
+     fromTypedNode, \
+     geoSoftNS, \
+     submissionOntology
+from htsworkflow.util.ucsc import bigWigInfo
+
+from django.conf import settings
+from django.template import Context, loader
+
+LOGGER = logging.getLogger(__name__)
+
+class AWSSubmission(Submission):
+    def __init__(self, name, model, encode_host, lims_host):
+        """Create a AWS based submission
+
+        :Parameters:
+          - `name`: Name of submission
+          - `model`: librdf model reference
+          - `host`: hostname for library pages.
+        """
+        super(AWSSubmission, self).__init__(name, model, lims_host)
+        self.encode = ENCODED(encode_host)
+        self.encode.load_netrc()
+
+    def upload(self, results_map, dry_run=False):
+        for an_analysis in self.analysis_nodes(results_map):
+            for metadata in self.get_metadata(an_analysis):
+                metadata['@type'] = ['file']
+                self.encode.validate(metadata)
+                del metadata['@type']
+
+                if dry_run:
+                    LOGGER.info(json.dumps(metadata, indent=4, sort_keys=True))
+                    continue
+
+                upload = metadata['submitted_file_name'] + '.upload'
+                if not os.path.exists(upload):
+                    with open(upload, 'w') as outstream:
+                        outstream.write(json.dumps(metadata, indent=4, sort_keys=True))
+                    LOGGER.debug(json.dumps(metadata, indent=4, sort_keys=True))
+
+                    response = self.encode.post_json('/file', metadata)
+                    LOGGER.info(json.dumps(response, indent=4, sort_keys=True))
+
+                    item = response['@graph'][0]
+                    creds = item['upload_credentials']
+                    self.run_aws_cp(metadata['submitted_file_name'], creds)
+                else:
+                    LOGGER.info('%s already uploaded',
+                                metadata['submitted_file_name'])
+
+
+    def run_aws_cp(self, pathname, creds):
+        env = os.environ.copy()
+        env.update({
+            'AWS_ACCESS_KEY_ID': creds['access_key'],
+            'AWS_SECRET_ACCESS_KEY': creds['secret_key'],
+            'AWS_SECURITY_TOKEN': creds['session_token'],
+        })
+        start = time.time()
+        try:
+            subprocess.check_call(['aws', 's3', 'cp', pathname, creds['upload_url']], env=env)
+        except subprocess.CalledProcessError as e:
+            LOGGER.error('Upload of %s failed with exit code %d', pathname, e.returncode)
+            return
+        else:
+            end = time.time()
+            LOGGER.info('Upload of %s finished in %.2f seconds',
+                        pathname,
+                        end-start)
+
+
+    def get_metadata(self, analysis_node):
+        # convert our model names to encode project aliases
+        platform_alias = {
+            'Illumina HiSeq 2500': 'ENCODE:HiSeq2500'
+        }
+        query_template = loader.get_template('aws_metadata.sparql')
+
+        context = Context({
+            'submission': str(analysis_node.uri),
+            'submissionSet': str(self.submissionSetNS[''].uri),
+            })
+        results = self.execute_query(query_template, context)
+        LOGGER.info("scanned %s for results found %s",
+                    str(analysis_node), len(results))
+
+        # need to adjust the results of the query slightly.
+        for row in results:
+            if 'platform' in row:
+                row['platform'] = platform_alias[row['platform']]
+            flowcell_details = {}
+            for term in ['machine', 'flowcell', 'lane', 'barcode']:
+                if term in row:
+                    value = str(row[term])
+                    flowcell_details[term] = value
+                    del row[term]
+            if len(flowcell_details) > 0:
+                row['flowcell_details'] = [flowcell_details]
+
+        return results
diff --git a/htsworkflow/templates/aws_metadata.sparql b/htsworkflow/templates/aws_metadata.sparql
new file mode 100644 (file)
index 0000000..fd99df8
--- /dev/null
@@ -0,0 +1,33 @@
+PREFIX htswlib: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
+PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
+PREFIX encode3: <http://jumpgate.caltech.edu/wiki/Encode3#>
+PREFIX ncbiTaxon: <http://www.ncbi.nlm.nih.gov/Taxonomy/Browser/wwwtax.cgi?id=>
+PREFIX geoSoft: <http://www.ncbi.nlm.nih.gov/geo/info/soft2.html#>
+PREFIX cells: <http://encodewiki.ucsc.edu/EncodeDCC/index.php/Cell_lines#>
+
+select distinct ?dataset ?replicate ?award ?lab ?lane ?barcode ?submitted_file_name ?file_format ?file_size ?output_type ?md5sum ?flowcell ?machine ?platform
+WHERE {
+  <{{submission}}> a submissionOntology:submission ;
+                   encode3:dataset ?dataset ;
+                   encode3:replicate ?replicate ;
+                   encode3:award ?award ;
+                   encode3:lab ?lab ;
+                   ucscDaf:has_file ?file .
+
+  ?file ucscDaf:relative_path ?submitted_file_name ;
+        ucscDaf:md5sum ?md5sum ;
+        encode3:file_format ?file_format ;
+        ucscDaf:file_size ?file_size ;
+        encode3:output_type ?output_type ;
+        htswlib:library ?library ;
+        htswlib:flowcell ?flowcell_url ;
+        htswlib:lane_number ?lane ;
+        a ?fileClass .
+
+  ?flowcell_url htswlib:sequenced_by ?machine ;
+            htswlib:flowcell_id ?flowcell .
+  ?library htswlib:multiplex_index ?barcode .
+
+  ?machine htswlib:sequencer_model ?platform .
+}