From 1126fd038d07ac61292178892fa0bc6d8616cf89 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Tue, 20 Oct 2015 16:49:54 -0700 Subject: [PATCH] Add mode to connect to the DCC and make sure the files with a .upload file are actually present at the DCC --- encode_submission/encode3.py | 5 ++ htsworkflow/submission/aws_submission.py | 79 +++++++++++++++++++++++- 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/encode_submission/encode3.py b/encode_submission/encode3.py index 1b7ae04..e3faea4 100644 --- a/encode_submission/encode3.py +++ b/encode_submission/encode3.py @@ -115,6 +115,9 @@ def main(cmdline=None): if args.upload: mapper.upload(results, args.dry_run) + if args.check_upload: + mapper.check_upload(results) + if args.sparql: sparql_query(model, args.sparql) @@ -152,6 +155,8 @@ def make_parser(): help="cache md5 sums") commands.add_argument('--upload', default=False, action="store_true", help="Upload files") + commands.add_argument('--check-upload', default=False, action='store_true', + help='check to see files are actually uploaded') parser.add_argument('--force', default=False, action="store_true", help="Force regenerating fastqs") diff --git a/htsworkflow/submission/aws_submission.py b/htsworkflow/submission/aws_submission.py index 0b76b04..f484ebe 100644 --- a/htsworkflow/submission/aws_submission.py +++ b/htsworkflow/submission/aws_submission.py @@ -12,6 +12,7 @@ import re import jsonschema import RDF +from requests.exceptions import HTTPError from htsworkflow.submission.submission import Submission from .encoded import ENCODED @@ -39,6 +40,79 @@ class AWSSubmission(Submission): self.encode = ENCODED(encode_host) self.encode.load_netrc() + self._replicates = {} + self._files = {} + + + def check_upload(self, results_map): + tocheck = [] + # phase one download data + for an_analysis in self.analysis_nodes(results_map): + for metadata in self.get_metadata(an_analysis): + filename = self.make_upload_filename(metadata) + if os.path.exists(filename): + with open(filename, 'rt') as instream: + uploaded = json.load(instream) + tocheck.append({ + 'submitted_file_name': uploaded['submitted_file_name'], + 'md5sum': uploaded['md5sum'] + }) + self.update_replicates(uploaded) + + # phase 2 make sure submitted file is there + md5sums = set((self._files[f]['md5sum'] for f in self._files)) + submitted_file_names = set( + (self._files[f]['submitted_file_name'] for f in self._files) + ) + errors_detected = 0 + for row in tocheck: + error = [] + if row['submitted_file_name'] not in submitted_file_names: + error.append('!file_name') + if row['md5sum'] not in md5sums: + error.append('!md5sum') + if error: + print("{} failed {} checks".format( + row['submitted_file_name'], + ', '.join(error) + )) + errors_detected += 1 + + if not errors_detected: + print('No errors detected') + + def update_replicates(self, metadata): + replicate_id = metadata['replicate'] + if replicate_id not in self._replicates: + LOGGER.debug("Downloading %s", replicate_id) + try: + rep = self.encode.get_json(replicate_id) + + self._replicates[replicate_id] = rep + for file_id in rep['experiment']['files']: + self.update_files(file_id) + except HTTPError as err: + print(err.response, dir(err.response)) + if err.response.status_code == 404: + print('Unable to find {} {}'.format( + replicate_id, + metadata['submitted_file_name']) + ) + else: + raise err + + def update_files(self, file_id): + if file_id not in self._files: + LOGGER.debug("Downloading %s", file_id) + try: + file_object = self.encode.get_json(file_id) + self._files[file_id] = file_object + except HTTPError as err: + if err.response.status_code == 404: + print('unable to find {}'.format(file_id)) + else: + raise err + def upload(self, results_map, dry_run=False): for an_analysis in self.analysis_nodes(results_map): for metadata in self.get_metadata(an_analysis): @@ -50,7 +124,7 @@ class AWSSubmission(Submission): LOGGER.info(json.dumps(metadata, indent=4, sort_keys=True)) continue - upload = metadata['submitted_file_name'] + '.upload' + upload = self.make_upload_filename(metadata) if not os.path.exists(upload): with open(upload, 'w') as outstream: outstream.write(json.dumps(metadata, indent=4, sort_keys=True)) @@ -99,6 +173,9 @@ class AWSSubmission(Submission): return results + def make_upload_filename(self, metadata): + return metadata['submitted_file_name'] + '.upload' + def run_aws_cp(pathname, creds): env = os.environ.copy() env.update({ -- 2.30.2