1 """Partially through ENCODE3 the DCC switched to needing to upload via AWS
7 from pprint import pformat, pprint
15 from requests.exceptions import HTTPError
17 from htsworkflow.submission.submission import Submission
18 from .encoded import ENCODED
19 from htsworkflow.util.rdfhelp import \
23 from htsworkflow.util.ucsc import bigWigInfo
25 from django.conf import settings
26 from django.template import Context, loader
28 LOGGER = logging.getLogger(__name__)
30 class AWSSubmission(Submission):
31 def __init__(self, name, model, encode_host, lims_host):
32 """Create a AWS based submission
35 - `name`: Name of submission
36 - `model`: librdf model reference
37 - `host`: hostname for library pages.
39 super(AWSSubmission, self).__init__(name, model, lims_host)
40 self.encode = ENCODED(encode_host)
41 self.encode.load_netrc()
47 def check_upload(self, results_map):
49 # phase one download data
50 for an_analysis in self.analysis_nodes(results_map):
51 for metadata in self.get_metadata(an_analysis):
52 filename = self.make_upload_filename(metadata)
53 if os.path.exists(filename):
54 with open(filename, 'rt') as instream:
55 uploaded = json.load(instream)
57 'submitted_file_name': uploaded['submitted_file_name'],
58 'md5sum': uploaded['md5sum']
60 self.update_replicates(uploaded)
62 # phase 2 make sure submitted file is there
63 md5sums = set((self._files[f]['md5sum'] for f in self._files))
64 submitted_file_names = set(
65 (self._files[f]['submitted_file_name'] for f in self._files)
70 if row['submitted_file_name'] not in submitted_file_names:
71 error.append('!file_name')
72 if row['md5sum'] not in md5sums:
73 error.append('!md5sum')
75 print("{} failed {} checks".format(
76 row['submitted_file_name'],
81 if not errors_detected:
82 print('No errors detected')
84 def update_replicates(self, metadata):
85 replicate_id = metadata['replicate']
86 if replicate_id not in self._replicates:
87 LOGGER.debug("Downloading %s", replicate_id)
89 rep = self.encode.get_json(replicate_id)
91 self._replicates[replicate_id] = rep
92 for file_id in rep['experiment']['files']:
93 self.update_files(file_id)
94 except HTTPError as err:
95 print(err.response, dir(err.response))
96 if err.response.status_code == 404:
97 print('Unable to find {} {}'.format(
99 metadata['submitted_file_name'])
104 def update_files(self, file_id):
105 if file_id not in self._files:
106 LOGGER.debug("Downloading %s", file_id)
108 file_object = self.encode.get_json(file_id)
109 self._files[file_id] = file_object
110 except HTTPError as err:
111 if err.response.status_code == 404:
112 print('unable to find {}'.format(file_id))
116 def upload(self, results_map, dry_run=False):
117 for an_analysis in self.analysis_nodes(results_map):
118 for metadata in self.get_metadata(an_analysis):
119 metadata['@type'] = ['file']
120 self.encode.validate(metadata)
121 del metadata['@type']
124 LOGGER.info(json.dumps(metadata, indent=4, sort_keys=True))
127 upload = self.make_upload_filename(metadata)
128 if not os.path.exists(upload):
129 with open(upload, 'w') as outstream:
130 outstream.write(json.dumps(metadata, indent=4, sort_keys=True))
131 LOGGER.debug(json.dumps(metadata, indent=4, sort_keys=True))
133 response = self.encode.post_json('/file', metadata)
134 LOGGER.info(json.dumps(response, indent=4, sort_keys=True))
136 item = response['@graph'][0]
137 creds = item['upload_credentials']
138 run_aws_cp(metadata['submitted_file_name'], creds)
140 LOGGER.info('%s already uploaded',
141 metadata['submitted_file_name'])
144 def get_metadata(self, analysis_node):
145 # convert our model names to encode project aliases
147 'Illumina HiSeq 2500': 'ENCODE:HiSeq2500'
149 query_template = loader.get_template('aws_metadata.sparql')
152 'submission': str(analysis_node.uri),
153 'submissionSet': str(self.submissionSetNS[''].uri),
155 results = self.execute_query(query_template, context)
156 LOGGER.info("scanned %s for results found %s",
157 str(analysis_node), len(results))
159 # need to adjust the results of the query slightly.
161 if 'platform' in row:
162 row['platform'] = platform_alias[row['platform']]
163 if 'read_length' in row:
164 row['read_length'] = int(row['read_length'])
165 flowcell_details = {}
166 for term in ['machine', 'flowcell', 'lane', 'barcode']:
168 value = str(row[term])
169 flowcell_details[term] = value
171 if len(flowcell_details) > 0:
172 row['flowcell_details'] = [flowcell_details]
176 def make_upload_filename(self, metadata):
177 return metadata['submitted_file_name'] + '.upload'
179 def run_aws_cp(pathname, creds):
180 env = os.environ.copy()
182 'AWS_ACCESS_KEY_ID': creds['access_key'],
183 'AWS_SECRET_ACCESS_KEY': creds['secret_key'],
184 'AWS_SECURITY_TOKEN': creds['session_token'],
188 subprocess.check_call(['aws', 's3', 'cp', pathname, creds['upload_url']], env=env)
189 except subprocess.CalledProcessError as e:
190 LOGGER.error('Upload of %s failed with exit code %d', pathname, e.returncode)
194 LOGGER.info('Upload of %s finished in %.2f seconds',