Add mode to connect to the DCC and make sure the files with a .upload file are actual...
authorDiane Trout <diane@caltech.edu>
Tue, 20 Oct 2015 23:49:54 +0000 (16:49 -0700)
committerDiane Trout <diane@caltech.edu>
Tue, 20 Oct 2015 23:49:54 +0000 (16:49 -0700)
encode_submission/encode3.py
htsworkflow/submission/aws_submission.py

index 1b7ae045281a07152623c34d5190ee48fd6a56bd..e3faea4a1b881e88608a4af94df176b2c10a5d45 100644 (file)
@@ -115,6 +115,9 @@ def main(cmdline=None):
     if args.upload:
         mapper.upload(results, args.dry_run)
 
+    if args.check_upload:
+        mapper.check_upload(results)
+
     if args.sparql:
         sparql_query(model, args.sparql)
 
@@ -152,6 +155,8 @@ def make_parser():
                         help="cache md5 sums")
     commands.add_argument('--upload', default=False, action="store_true",
                         help="Upload files")
+    commands.add_argument('--check-upload', default=False, action='store_true',
+                          help='check to see files are actually uploaded')
 
     parser.add_argument('--force', default=False, action="store_true",
                       help="Force regenerating fastqs")
index 0b76b04d42d53e4b859bbde2004de66a90454825..f484ebef208fd2e1f8147e8e4afd2376635a474b 100644 (file)
@@ -12,6 +12,7 @@ import re
 
 import jsonschema
 import RDF
+from requests.exceptions import HTTPError
 
 from htsworkflow.submission.submission import Submission
 from .encoded import ENCODED
@@ -39,6 +40,79 @@ class AWSSubmission(Submission):
         self.encode = ENCODED(encode_host)
         self.encode.load_netrc()
 
+        self._replicates = {}
+        self._files = {}
+
+
+    def check_upload(self, results_map):
+        tocheck = []
+        # phase one download data
+        for an_analysis in self.analysis_nodes(results_map):
+            for metadata in self.get_metadata(an_analysis):
+                filename = self.make_upload_filename(metadata)
+                if os.path.exists(filename):
+                    with open(filename, 'rt') as instream:
+                        uploaded = json.load(instream)
+                    tocheck.append({
+                        'submitted_file_name': uploaded['submitted_file_name'],
+                        'md5sum': uploaded['md5sum']
+                    })
+                    self.update_replicates(uploaded)
+
+        # phase 2 make sure submitted file is there
+        md5sums = set((self._files[f]['md5sum'] for f in self._files))
+        submitted_file_names = set(
+            (self._files[f]['submitted_file_name'] for f in self._files)
+        )
+        errors_detected = 0
+        for row in tocheck:
+            error = []
+            if row['submitted_file_name'] not in submitted_file_names:
+                error.append('!file_name')
+            if row['md5sum'] not in md5sums:
+                error.append('!md5sum')
+            if error:
+                print("{} failed {} checks".format(
+                    row['submitted_file_name'],
+                    ', '.join(error)
+                ))
+                errors_detected += 1
+
+        if not errors_detected:
+            print('No errors detected')
+
+    def update_replicates(self, metadata):
+        replicate_id = metadata['replicate']
+        if replicate_id not in self._replicates:
+            LOGGER.debug("Downloading %s", replicate_id)
+            try:
+                rep = self.encode.get_json(replicate_id)
+
+                self._replicates[replicate_id] = rep
+                for file_id in rep['experiment']['files']:
+                    self.update_files(file_id)
+            except HTTPError as err:
+                print(err.response, dir(err.response))
+                if err.response.status_code == 404:
+                    print('Unable to find {} {}'.format(
+                        replicate_id,
+                        metadata['submitted_file_name'])
+                    )
+                else:
+                    raise err
+
+    def update_files(self, file_id):
+        if file_id not in self._files:
+            LOGGER.debug("Downloading %s", file_id)
+            try:
+                file_object = self.encode.get_json(file_id)
+                self._files[file_id] = file_object
+            except HTTPError as err:
+                if err.response.status_code == 404:
+                    print('unable to find {}'.format(file_id))
+                else:
+                    raise err
+
     def upload(self, results_map, dry_run=False):
         for an_analysis in self.analysis_nodes(results_map):
             for metadata in self.get_metadata(an_analysis):
@@ -50,7 +124,7 @@ class AWSSubmission(Submission):
                     LOGGER.info(json.dumps(metadata, indent=4, sort_keys=True))
                     continue
 
-                upload = metadata['submitted_file_name'] + '.upload'
+                upload = self.make_upload_filename(metadata)
                 if not os.path.exists(upload):
                     with open(upload, 'w') as outstream:
                         outstream.write(json.dumps(metadata, indent=4, sort_keys=True))
@@ -99,6 +173,9 @@ class AWSSubmission(Submission):
 
         return results
 
+    def make_upload_filename(self, metadata):
+        return metadata['submitted_file_name'] + '.upload'
+
 def run_aws_cp(pathname, creds):
     env = os.environ.copy()
     env.update({