1 """Partially through ENCODE3 the DCC switched to needing to upload via AWS
7 from pprint import pformat, pprint
11 from requests.exceptions import HTTPError
13 from htsworkflow.submission.submission import Submission
14 from .encoded import ENCODED
16 from django.template import Context, loader
18 LOGGER = logging.getLogger(__name__)
20 class AWSSubmission(Submission):
21 def __init__(self, name, model, encode_host, lims_host):
22 """Create a AWS based submission
25 - `name`: Name of submission
26 - `model`: librdf model reference
27 - `host`: hostname for library pages.
29 super(AWSSubmission, self).__init__(name, model, lims_host)
30 self.encode = ENCODED(encode_host)
31 self.encode.load_netrc()
37 def check_upload(self, results_map):
39 # phase one download data
40 for an_analysis in self.analysis_nodes(results_map):
41 for metadata in self.get_metadata(an_analysis):
42 filename = self.make_upload_filename(metadata)
43 if os.path.exists(filename):
44 with open(filename, 'rt') as instream:
45 uploaded = json.load(instream)
47 'submitted_file_name': uploaded['submitted_file_name'],
48 'md5sum': uploaded['md5sum']
50 self.update_replicates(uploaded)
52 # phase 2 make sure submitted file is there
53 md5sums = set((self._files[f]['md5sum'] for f in self._files))
54 submitted_file_names = set(
55 (self._files[f]['submitted_file_name'] for f in self._files)
60 if row['submitted_file_name'] not in submitted_file_names:
61 error.append('!file_name')
62 if row['md5sum'] not in md5sums:
63 error.append('!md5sum')
65 print("{} failed {} checks".format(
66 row['submitted_file_name'],
71 if not errors_detected:
72 print('No errors detected')
74 def update_replicates(self, metadata):
75 replicate_id = metadata['replicate']
76 if replicate_id not in self._replicates:
77 LOGGER.debug("Downloading %s", replicate_id)
79 rep = self.encode.get_json(replicate_id)
81 self._replicates[replicate_id] = rep
82 for file_id in rep['experiment']['files']:
83 self.update_files(file_id)
84 except HTTPError as err:
85 print(err.response, dir(err.response))
86 if err.response.status_code == 404:
87 print('Unable to find {} {}'.format(
89 metadata['submitted_file_name'])
94 def update_files(self, file_id):
95 if file_id not in self._files:
96 LOGGER.debug("Downloading %s", file_id)
98 file_object = self.encode.get_json(file_id)
99 self._files[file_id] = file_object
100 except HTTPError as err:
101 if err.response.status_code == 404:
102 print('unable to find {}'.format(file_id))
106 def upload(self, results_map, dry_run=False):
107 for an_analysis in self.analysis_nodes(results_map):
108 for metadata in self.get_metadata(an_analysis):
109 metadata['@type'] = ['file']
110 self.encode.validate(metadata)
111 del metadata['@type']
114 LOGGER.info(json.dumps(metadata, indent=4, sort_keys=True))
117 upload = self.make_upload_filename(metadata)
118 if not os.path.exists(upload):
119 with open(upload, 'w') as outstream:
120 json.dump(metadata, outstream, indent=4, sort_keys=True)
121 LOGGER.debug(json.dumps(metadata, indent=4, sort_keys=True))
123 response = self.encode.post_json('/file', metadata)
124 LOGGER.info(json.dumps(response, indent=4, sort_keys=True))
126 item = response['@graph'][0]
127 creds = item['upload_credentials']
128 run_aws_cp(metadata['submitted_file_name'], creds)
130 LOGGER.info('%s already uploaded',
131 metadata['submitted_file_name'])
134 def get_metadata(self, analysis_node):
135 # convert our model names to encode project aliases
137 'Illumina HiSeq 2500': 'ENCODE:HiSeq2500'
139 query_template = loader.get_template('aws_metadata.sparql')
142 'submission': str(analysis_node.uri),
143 'submissionSet': str(self.submissionSetNS[''].uri),
145 results = self.execute_query(query_template, context)
146 LOGGER.info("scanned %s for results found %s",
147 str(analysis_node), len(results))
149 # need to adjust the results of the query slightly.
151 if 'platform' in row:
152 row['platform'] = platform_alias[row['platform']]
153 if 'read_length' in row:
154 row['read_length'] = int(row['read_length'])
155 flowcell_details = {}
156 for term in ['machine', 'flowcell', 'lane', 'barcode']:
158 value = str(row[term])
159 flowcell_details[term] = value
161 if len(flowcell_details) > 0:
162 row['flowcell_details'] = [flowcell_details]
166 def make_upload_filename(self, metadata):
167 return metadata['submitted_file_name'] + '.upload'
169 def run_aws_cp(pathname, creds):
170 env = os.environ.copy()
172 'AWS_ACCESS_KEY_ID': creds['access_key'],
173 'AWS_SECRET_ACCESS_KEY': creds['secret_key'],
174 'AWS_SECURITY_TOKEN': creds['session_token'],
178 subprocess.check_call(['aws', 's3', 'cp', pathname, creds['upload_url']], env=env)
179 except subprocess.CalledProcessError as e:
180 LOGGER.error('Upload of %s failed with exit code %d', pathname, e.returncode)
184 LOGGER.info('Upload of %s finished in %.2f seconds',