Merge branch 'django1.7' of mus.cacr.caltech.edu:htsworkflow into django1.7
[htsworkflow.git] / htsworkflow / submission / aws_submission.py
1 """Partially through ENCODE3 the DCC switched to needing to upload via AWS
2 """
3
4 import logging
5 import json
6 import os
7 from pprint import pformat, pprint
8 import string
9 import subprocess
10 import time
11 import re
12
13 import jsonschema
14 import RDF
15
16 from htsworkflow.submission.submission import Submission
17 from .encoded import ENCODED
18 from htsworkflow.util.rdfhelp import \
19      fromTypedNode, \
20      geoSoftNS, \
21      submissionOntology
22 from htsworkflow.util.ucsc import bigWigInfo
23
24 from django.conf import settings
25 from django.template import Context, loader
26
27 LOGGER = logging.getLogger(__name__)
28
29 class AWSSubmission(Submission):
30     def __init__(self, name, model, encode_host, lims_host):
31         """Create a AWS based submission
32
33         :Parameters:
34           - `name`: Name of submission
35           - `model`: librdf model reference
36           - `host`: hostname for library pages.
37         """
38         super(AWSSubmission, self).__init__(name, model, lims_host)
39         self.encode = ENCODED(encode_host)
40         self.encode.load_netrc()
41
42     def upload(self, results_map, dry_run=False):
43         for an_analysis in self.analysis_nodes(results_map):
44             for metadata in self.get_metadata(an_analysis):
45                 metadata['@type'] = ['file']
46                 self.encode.validate(metadata)
47                 del metadata['@type']
48
49                 if dry_run:
50                     LOGGER.info(json.dumps(metadata, indent=4, sort_keys=True))
51                     continue
52
53                 upload = metadata['submitted_file_name'] + '.upload'
54                 if not os.path.exists(upload):
55                     with open(upload, 'w') as outstream:
56                         outstream.write(json.dumps(metadata, indent=4, sort_keys=True))
57                     LOGGER.debug(json.dumps(metadata, indent=4, sort_keys=True))
58
59                     response = self.encode.post_json('/file', metadata)
60                     LOGGER.info(json.dumps(response, indent=4, sort_keys=True))
61
62                     item = response['@graph'][0]
63                     creds = item['upload_credentials']
64                     run_aws_cp(metadata['submitted_file_name'], creds)
65                 else:
66                     LOGGER.info('%s already uploaded',
67                                 metadata['submitted_file_name'])
68
69
70     def get_metadata(self, analysis_node):
71         # convert our model names to encode project aliases
72         platform_alias = {
73             'Illumina HiSeq 2500': 'ENCODE:HiSeq2500'
74         }
75         query_template = loader.get_template('aws_metadata.sparql')
76
77         context = Context({
78             'submission': str(analysis_node.uri),
79             'submissionSet': str(self.submissionSetNS[''].uri),
80             })
81         results = self.execute_query(query_template, context)
82         LOGGER.info("scanned %s for results found %s",
83                     str(analysis_node), len(results))
84
85         # need to adjust the results of the query slightly.
86         for row in results:
87             if 'platform' in row:
88                 row['platform'] = platform_alias[row['platform']]
89             if 'read_length' in row:
90                 row['read_length'] = int(row['read_length'])
91             flowcell_details = {}
92             for term in ['machine', 'flowcell', 'lane', 'barcode']:
93                 if term in row:
94                     value = str(row[term])
95                     flowcell_details[term] = value
96                     del row[term]
97             if len(flowcell_details) > 0:
98                 row['flowcell_details'] = [flowcell_details]
99
100         return results
101
102 def run_aws_cp(pathname, creds):
103     env = os.environ.copy()
104     env.update({
105         'AWS_ACCESS_KEY_ID': creds['access_key'],
106         'AWS_SECRET_ACCESS_KEY': creds['secret_key'],
107         'AWS_SECURITY_TOKEN': creds['session_token'],
108     })
109     start = time.time()
110     try:
111         subprocess.check_call(['aws', 's3', 'cp', pathname, creds['upload_url']], env=env)
112     except subprocess.CalledProcessError as e:
113         LOGGER.error('Upload of %s failed with exit code %d', pathname, e.returncode)
114         return
115     else:
116         end = time.time()
117         LOGGER.info('Upload of %s finished in %.2f seconds',
118                     pathname,
119                     end-start)