f4cd5c0b755179f5de40443be27130aa1b137490
[htsworkflow.git] / htsworkflow / submission / aws_submission.py
1 """Partially through ENCODE3 the DCC switched to needing to upload via AWS
2 """
3
4 import logging
5 import json
6 import os
7 from pprint import pformat, pprint
8 import string
9 import subprocess
10 import time
11 import re
12
13 import jsonschema
14 import RDF
15
16 from htsworkflow.submission.submission import Submission
17 from .encoded import ENCODED
18 from htsworkflow.util.rdfhelp import \
19      fromTypedNode, \
20      geoSoftNS, \
21      submissionOntology
22 from htsworkflow.util.ucsc import bigWigInfo
23
24 from django.conf import settings
25 from django.template import Context, loader
26
27 LOGGER = logging.getLogger(__name__)
28
29 class AWSSubmission(Submission):
30     def __init__(self, name, model, encode_host, lims_host):
31         """Create a AWS based submission
32
33         :Parameters:
34           - `name`: Name of submission
35           - `model`: librdf model reference
36           - `host`: hostname for library pages.
37         """
38         super(AWSSubmission, self).__init__(name, model, lims_host)
39         self.encode = ENCODED(encode_host)
40         self.encode.load_netrc()
41
42     def upload(self, results_map, dry_run=False):
43         for an_analysis in self.analysis_nodes(results_map):
44             for metadata in self.get_metadata(an_analysis):
45                 metadata['@type'] = ['file']
46                 self.encode.validate(metadata)
47                 del metadata['@type']
48
49                 if dry_run:
50                     LOGGER.info(json.dumps(metadata, indent=4, sort_keys=True))
51                     continue
52
53                 upload = metadata['submitted_file_name'] + '.upload'
54                 if not os.path.exists(upload):
55                     with open(upload, 'w') as outstream:
56                         outstream.write(json.dumps(metadata, indent=4, sort_keys=True))
57                     LOGGER.debug(json.dumps(metadata, indent=4, sort_keys=True))
58
59                     response = self.encode.post_json('/file', metadata)
60                     LOGGER.info(json.dumps(response, indent=4, sort_keys=True))
61
62                     item = response['@graph'][0]
63                     creds = item['upload_credentials']
64                     self.run_aws_cp(metadata['submitted_file_name'], creds)
65                 else:
66                     LOGGER.info('%s already uploaded',
67                                 metadata['submitted_file_name'])
68
69
70     def run_aws_cp(self, pathname, creds):
71         env = os.environ.copy()
72         env.update({
73             'AWS_ACCESS_KEY_ID': creds['access_key'],
74             'AWS_SECRET_ACCESS_KEY': creds['secret_key'],
75             'AWS_SECURITY_TOKEN': creds['session_token'],
76         })
77         start = time.time()
78         try:
79             subprocess.check_call(['aws', 's3', 'cp', pathname, creds['upload_url']], env=env)
80         except subprocess.CalledProcessError as e:
81             LOGGER.error('Upload of %s failed with exit code %d', pathname, e.returncode)
82             return
83         else:
84             end = time.time()
85             LOGGER.info('Upload of %s finished in %.2f seconds',
86                         pathname,
87                         end-start)
88
89
90     def get_metadata(self, analysis_node):
91         # convert our model names to encode project aliases
92         platform_alias = {
93             'Illumina HiSeq 2500': 'ENCODE:HiSeq2500'
94         }
95         query_template = loader.get_template('aws_metadata.sparql')
96
97         context = Context({
98             'submission': str(analysis_node.uri),
99             'submissionSet': str(self.submissionSetNS[''].uri),
100             })
101         results = self.execute_query(query_template, context)
102         LOGGER.info("scanned %s for results found %s",
103                     str(analysis_node), len(results))
104
105         # need to adjust the results of the query slightly.
106         for row in results:
107             if 'platform' in row:
108                 row['platform'] = platform_alias[row['platform']]
109             flowcell_details = {}
110             for term in ['machine', 'flowcell', 'lane', 'barcode']:
111                 if term in row:
112                     value = str(row[term])
113                     flowcell_details[term] = value
114                     del row[term]
115             if len(flowcell_details) > 0:
116                 row['flowcell_details'] = [flowcell_details]
117
118         return results