from htsworkflow.submission.daf import get_submission_uri
from htsworkflow.submission.submission import list_submissions
from htsworkflow.submission.results import ResultMap
-from htsworkflow.submission.trackhub_submission import TrackHubSubmission
from htsworkflow.submission.condorfastq import CondorFastqExtract
-
+from htsworkflow.submission.aws_submission import AWSSubmission
logger = logging.getLogger(__name__)
INDENTED = " " + os.linesep
if name:
submission_uri = get_submission_uri(name)
- logger.info('Submission URI: %s', name)
- else:
- logger.debug('No name, unable to create submission ur')
-
- mapper = None
- if opts.make_track_hub:
- mapper = TrackHubSubmission(name,
- model,
- baseurl=opts.make_track_hub,
- baseupload=opts.track_hub_upload,
- host=opts.host)
+ logger.info('Submission URI: %s', submission_uri)
+
+ mapper = AWSSubmission(name, model, encode_host=opts.encoded, lims_host=opts.host)
if opts.load_rdf is not None:
if submission_uri is None:
if opts.scan_submission:
if name is None:
parser.error("Please define a submission name")
- if mapper is None:
- parser.error("Scan submission needs --make-track-hub=public-url")
mapper.scan_submission_dirs(results)
- if opts.make_track_hub:
- trackdb = mapper.make_hub(results)
-
- if opts.make_manifest:
- make_manifest(mapper, results, opts.make_manifest)
+ if opts.upload:
+ mapper.upload(results, opts.dry_run)
if opts.sparql:
sparql_query(model, opts.sparql)
print writer.serialize_model_to_string(model)
-def make_manifest(mapper, results, filename=None):
- manifest = mapper.make_manifest(results)
-
- if filename is None or filename == '-':
- sys.stdout.write(manifest)
- else:
- with open(filename, 'w') as mainifeststream:
- mainifeststream.write(manifest)
-
-
def make_parser():
parser = OptionParser()
commands.add_option('--fastq', default=False, action="store_true",
help="generate scripts for making fastq files")
commands.add_option('--scan-submission', default=False, action="store_true",
- help="Import metadata for submission into our model")
- commands.add_option('--make-track-hub', default=None,
- help='web root that will host the trackhub.')
- commands.add_option('--track-hub-upload', default=None,
- help='where to upload track hub <host>:<path>')
- commands.add_option('--make-manifest',
- help='name the manifest file name or - for stdout to create it',
- default=None)
+ help="cache md5 sums")
+ commands.add_option('--upload', default=False, action="store_true",
+ help="Upload files")
parser.add_option_group(commands)
parser.add_option('--compression', default=None, type='choice',
choices=['gzip'],
help='select compression type for fastq files')
- parser.add_option('--daf', default=None, help='specify daf name')
parser.add_option('--library-url', default=None,
help="specify an alternate source for library information")
+ parser.add_option('--encoded', default='www.encodeproject.org',
+ help='base url for talking to encode server')
+ parser.add_option('--dry-run', default=False, action='store_true',
+ help='avoid making changes to encoded')
# debugging
parser.add_option('--verbose', default=False, action="store_true",
help='verbose logging')
--- /dev/null
+"""Partially through ENCODE3 the DCC switched to needing to upload via AWS
+"""
+
+import logging
+import json
+import os
+from pprint import pformat, pprint
+import string
+import subprocess
+import time
+import re
+
+import jsonschema
+import RDF
+
+from htsworkflow.submission.submission import Submission
+from .encoded import ENCODED
+from htsworkflow.util.rdfhelp import \
+ fromTypedNode, \
+ geoSoftNS, \
+ submissionOntology
+from htsworkflow.util.ucsc import bigWigInfo
+
+from django.conf import settings
+from django.template import Context, loader
+
+LOGGER = logging.getLogger(__name__)
+
+class AWSSubmission(Submission):
+ def __init__(self, name, model, encode_host, lims_host):
+ """Create a AWS based submission
+
+ :Parameters:
+ - `name`: Name of submission
+ - `model`: librdf model reference
+ - `host`: hostname for library pages.
+ """
+ super(AWSSubmission, self).__init__(name, model, lims_host)
+ self.encode = ENCODED(encode_host)
+ self.encode.load_netrc()
+
+ def upload(self, results_map, dry_run=False):
+ for an_analysis in self.analysis_nodes(results_map):
+ for metadata in self.get_metadata(an_analysis):
+ metadata['@type'] = ['file']
+ self.encode.validate(metadata)
+ del metadata['@type']
+
+ if dry_run:
+ LOGGER.info(json.dumps(metadata, indent=4, sort_keys=True))
+ continue
+
+ upload = metadata['submitted_file_name'] + '.upload'
+ if not os.path.exists(upload):
+ with open(upload, 'w') as outstream:
+ outstream.write(json.dumps(metadata, indent=4, sort_keys=True))
+ LOGGER.debug(json.dumps(metadata, indent=4, sort_keys=True))
+
+ response = self.encode.post_json('/file', metadata)
+ LOGGER.info(json.dumps(response, indent=4, sort_keys=True))
+
+ item = response['@graph'][0]
+ creds = item['upload_credentials']
+ self.run_aws_cp(metadata['submitted_file_name'], creds)
+ else:
+ LOGGER.info('%s already uploaded',
+ metadata['submitted_file_name'])
+
+
+ def run_aws_cp(self, pathname, creds):
+ env = os.environ.copy()
+ env.update({
+ 'AWS_ACCESS_KEY_ID': creds['access_key'],
+ 'AWS_SECRET_ACCESS_KEY': creds['secret_key'],
+ 'AWS_SECURITY_TOKEN': creds['session_token'],
+ })
+ start = time.time()
+ try:
+ subprocess.check_call(['aws', 's3', 'cp', pathname, creds['upload_url']], env=env)
+ except subprocess.CalledProcessError as e:
+ LOGGER.error('Upload of %s failed with exit code %d', pathname, e.returncode)
+ return
+ else:
+ end = time.time()
+ LOGGER.info('Upload of %s finished in %.2f seconds',
+ pathname,
+ end-start)
+
+
+ def get_metadata(self, analysis_node):
+ # convert our model names to encode project aliases
+ platform_alias = {
+ 'Illumina HiSeq 2500': 'ENCODE:HiSeq2500'
+ }
+ query_template = loader.get_template('aws_metadata.sparql')
+
+ context = Context({
+ 'submission': str(analysis_node.uri),
+ 'submissionSet': str(self.submissionSetNS[''].uri),
+ })
+ results = self.execute_query(query_template, context)
+ LOGGER.info("scanned %s for results found %s",
+ str(analysis_node), len(results))
+
+ # need to adjust the results of the query slightly.
+ for row in results:
+ if 'platform' in row:
+ row['platform'] = platform_alias[row['platform']]
+ flowcell_details = {}
+ for term in ['machine', 'flowcell', 'lane', 'barcode']:
+ if term in row:
+ value = str(row[term])
+ flowcell_details[term] = value
+ del row[term]
+ if len(flowcell_details) > 0:
+ row['flowcell_details'] = [flowcell_details]
+
+ return results
--- /dev/null
+PREFIX htswlib: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
+PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
+PREFIX encode3: <http://jumpgate.caltech.edu/wiki/Encode3#>
+PREFIX ncbiTaxon: <http://www.ncbi.nlm.nih.gov/Taxonomy/Browser/wwwtax.cgi?id=>
+PREFIX geoSoft: <http://www.ncbi.nlm.nih.gov/geo/info/soft2.html#>
+PREFIX cells: <http://encodewiki.ucsc.edu/EncodeDCC/index.php/Cell_lines#>
+
+select distinct ?dataset ?replicate ?award ?lab ?lane ?barcode ?submitted_file_name ?file_format ?file_size ?output_type ?md5sum ?flowcell ?machine ?platform
+WHERE {
+ <{{submission}}> a submissionOntology:submission ;
+ encode3:dataset ?dataset ;
+ encode3:replicate ?replicate ;
+ encode3:award ?award ;
+ encode3:lab ?lab ;
+ ucscDaf:has_file ?file .
+
+ ?file ucscDaf:relative_path ?submitted_file_name ;
+ ucscDaf:md5sum ?md5sum ;
+ encode3:file_format ?file_format ;
+ ucscDaf:file_size ?file_size ;
+ encode3:output_type ?output_type ;
+ htswlib:library ?library ;
+ htswlib:flowcell ?flowcell_url ;
+ htswlib:lane_number ?lane ;
+ a ?fileClass .
+
+ ?flowcell_url htswlib:sequenced_by ?machine ;
+ htswlib:flowcell_id ?flowcell .
+ ?library htswlib:multiplex_index ?barcode .
+
+ ?machine htswlib:sequencer_model ?platform .
+}