dc546ea6d2fc7ae5c241b78e4a084f45a3a72708
[htsworkflow.git] / htsworkflow / submission / encoded.py
1 """Interface with encoded software for ENCODE3 data submission & warehouse
2
3 This allows retrieving blocks
4 """
5 from __future__ import print_function
6 import pandas
7 import base64
8 import collections
9 import hashlib
10 import logging
11 import json
12 import jsonschema
13 import numpy
14 import os
15 import re
16 import requests
17 import six
18 from six.moves.urllib.parse import urljoin, urlparse, urlunparse
19
20 LOGGER = logging.getLogger(__name__)
21
22 ENCODED_CONTEXT = {
23     # The None context will get added to the root of the tree and will
24     # provide common defaults.
25     None: {
26         # terms in multiple encoded objects
27         'award': {'@type': '@id'},
28         'dataset': {'@type': '@id'},
29         'description': 'rdf:description',
30         'documents': {'@type': '@id'},
31         'experiment': {'@type': '@id'},
32         'href': {'@type': '@id'},
33         'lab': {'@type': '@id'},
34         'library': {'@type': '@id'},
35         'pi': {'@type': '@id'},
36         'platform': {'@type': '@id'},
37         'replicates': {'@type': '@id'},
38         'submitted_by': {'@type': '@id'},
39         'url': {'@type': '@id'},
40     },
41     # Identify and markup contained classes.
42     # e.g. in the tree there was a sub-dictionary named 'biosample'
43     # That dictionary had a term 'biosample_term_id, which is the
44     # term that should be used as the @id.
45     'Biosample': {
46         'biosample_term_id': {'@type': '@id'},
47     },
48     'Experiment': {
49         "assay_term_id": {"@type": "@id"},
50         "files": {"@type": "@id"},
51         "original_files": {"@type": "@id"},
52     },
53     # I tried to use the JSON-LD mapping capabilities to convert the lab
54     # contact information into a vcard record, but the encoded model
55     # didn't lend itself well to the vcard schema
56     #'lab': {
57     #    "address1": "vcard:street-address",
58     #    "address2": "vcard:street-address",
59     #    "city": "vcard:locality",
60     #    "state": "vcard:region",
61     #    "country": "vcard:country"
62     #},
63     'Library': {
64         'nucleic_acid_term_id': {'@type': '@id'}
65     }
66 }
67
68 #FIXME: this needs to be initialized from rdfns
69 ENCODED_NAMESPACES = {
70     # JSON-LD lets you define namespaces so you can used the shorted url syntax.
71     # (instead of http://www.w3.org/2000/01/rdf-schema#label you can do
72     # rdfs:label)
73     "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
74     "rdfs": "http://www.w3.org/2000/01/rdf-schema#",
75     "owl": "http://www.w3.org/2002/07/owl#",
76     "dc": "htp://purl.org/dc/elements/1.1/",
77     "xsd": "http://www.w3.org/2001/XMLSchema#",
78     "vcard": "http://www.w3.org/2006/vcard/ns#",
79
80     # for some namespaces I made a best guess for the ontology root.
81     "EFO": "http://www.ebi.ac.uk/efo/",  # EFO ontology
82     "OBO": "http://purl.obolibrary.org/obo/",  # OBO ontology
83     "OBI": "http://purl.obolibrary.org/obo/OBI_",  # Ontology for Biomedical Investigations
84     # OBI: available from http://svn.code.sf.net/p/obi/code/releases/2012-07-01/merged/merged-obi-comments.owl
85     "SO": "http://purl.obolibrary.org/obo/SO_",  # Sequence ontology
86     # SO: available from http://www.berkeleybop.org/ontologies/so.owl
87     # NTR: New Term Request space for DCC to implement new ontology terms
88
89 }
90
91 ENCODED_SCHEMA_ROOT = '/profiles/'
92
93
94 class ENCODED:
95     '''Programatic access encoded, the software powering ENCODE3's submit site.
96     '''
97     def __init__(self, server, contexts=None, namespaces=None):
98         self.server = server
99         self.scheme = 'https'
100         self.username = None
101         self.password = None
102         self.contexts = contexts if contexts else ENCODED_CONTEXT
103         self.namespaces = namespaces if namespaces else ENCODED_NAMESPACES
104         self.json_headers = {'content-type': 'application/json', 'accept': 'application/json'}
105         self.schemas = {}
106
107     def get_auth(self):
108         return (self.username, self.password)
109     auth = property(get_auth)
110
111     def load_netrc(self):
112         import netrc
113         session = netrc.netrc()
114         authenticators = session.authenticators(self.server)
115         if authenticators:
116             self.username = authenticators[0]
117             self.password = authenticators[2]
118
119     def add_jsonld_context(self, tree, default_base):
120         """Add contexts to various objects in the tree.
121
122         tree is a json tree returned from the DCC's encoded database.
123         contexts is a dictionary of dictionaries containing contexts
124                 for the various  possible encoded classes.
125         base, if supplied allows setting the base url that relative
126             urls will be resolved against.
127         """
128         self.add_jsonld_child_context(tree, default_base)
129         self.add_jsonld_namespaces(tree['@context'])
130
131     def add_jsonld_child_context(self, obj, default_base):
132         '''Add JSON-LD context to the encoded JSON.
133
134         This is recursive because some of the IDs were relative URLs
135         and I needed a way to properly compute a the correct base URL.
136         '''
137         # pretend strings aren't iterable
138         if isinstance(obj, six.string_types):
139             return
140
141         # recurse on container types
142         if isinstance(obj, collections.Sequence):
143             # how should I update lists?
144             for v in obj:
145                 self.add_jsonld_child_context(v, default_base)
146             return
147
148         if isinstance(obj, collections.Mapping):
149             for v in obj.values():
150                 self.add_jsonld_child_context(v, default_base)
151
152         # we have an object. attach a context to it.
153         if self._is_encoded_object(obj):
154             context = self.create_jsonld_context(obj, default_base)
155             if len(context) > 0:
156                 # this is a total hack for relese 33 of
157                 # encoded. They changed their model and
158                 # i'm not sure what to do about it.
159                 if obj.get('@context') == '/terms/':
160                     del obj['@context']
161                 obj.setdefault('@context', {}).update(context)
162
163     def add_jsonld_namespaces(self, context):
164         '''Add shortcut namespaces to a context
165
166         Only needs to be run on the top-most context
167         '''
168         context.update(self.namespaces)
169
170     def create_jsonld_context(self, obj, default_base):
171         '''Synthesize the context for a encoded type
172
173         self.contexts[None] = default context attributes added to any type
174         self.contexts[type] = context attributes for this type.
175         '''
176         obj_type = self.get_object_type(obj)
177         context = {'@base': urljoin(default_base, obj['@id']),
178                    '@vocab': self.get_schema_url(obj_type)}
179         # add in defaults
180         context.update(self.contexts[None])
181         for t in obj['@type']:
182             if t in self.contexts:
183                 context.update(self.contexts[t])
184         return context
185
186     def get_json(self, obj_id, **kwargs):
187         '''GET an ENCODE object as JSON and return as dict
188
189         Uses prepare_url to allow url short-cuts
190         if no keyword arguments are specified it will default to adding limit=all
191         Alternative keyword arguments can be passed in and will be sent to the host.
192
193         Known keywords are:
194           limit - (integer or 'all') how many records to return, all for all of them
195           embed - (bool) if true expands linking ids into their associated object.
196           format - text/html or application/json
197         '''
198         if len(kwargs) == 0:
199             kwargs['limit'] = 'all'
200
201         response = self.get_response(obj_id, **kwargs)
202         data = response.json()
203         response.close()
204         return data
205
206     def get_jsonld(self, obj_id, **kwargs):
207         '''Get ENCODE object as JSONLD annotated with classses contexts
208
209         see get_json for documentation about what keywords can be passed.
210         '''
211         url = self.prepare_url(obj_id)
212         json = self.get_json(obj_id, **kwargs)
213         self.add_jsonld_context(json, url)
214         return json
215
216     def get_object_type(self, obj):
217         """Return type for a encoded object
218         """
219         obj_type = obj.get('@type')
220         if not obj_type:
221             raise ValueError('None type')
222         if isinstance(obj_type, six.string_types):
223             raise ValueError('@type should be a list, not a string')
224         if not isinstance(obj_type, collections.Sequence):
225             raise ValueError('@type is not a sequence')
226         return obj_type[0]
227
228     def get_response(self, fragment, **kwargs):
229         '''GET an ENCODED url and return the requests request
230
231         Uses prepare_url to allow url short-cuts
232         if no keyword arguments are specified it will default to adding limit=all
233         Alternative keyword arguments can be passed in and will be sent to the host.
234
235         Known keywords are:
236           limit - (integer or 'all') how many records to return, all for all of them
237           embed - (bool) if true expands linking ids into their associated object.
238           format - text/html or application/json
239         '''
240         url = self.prepare_url(fragment)
241         LOGGER.info('requesting url: {}'.format(url))
242
243         # do the request
244         LOGGER.debug('username: %s, password: %s', self.username, self.password)
245         arguments = {}
246         if self.username and self.password:
247             arguments['auth'] = self.auth
248
249         if 'stream' in kwargs:
250             arguments['stream'] = kwargs['stream']
251             del kwargs['stream']
252
253         response = requests.get(url, headers=self.json_headers,
254                                 params=kwargs,
255                                 **arguments)
256         if not response.status_code == requests.codes.ok:
257             LOGGER.error("Error http status: {}".format(response.status_code))
258             response.raise_for_status()
259
260         return response
261
262     def get_schema_url(self, object_type):
263         """Create the ENCODED jsonschema url.
264
265         Return the ENCODED object schema url be either
266         object type name or the collection name one posts to.
267
268         For example
269            server.get_schema_url('experiment') and
270            server.get_schema_url('/experiments/') both resolve to
271            SERVER/profiles/experiment.json
272
273         Arguments:
274            object_type (str): either ENCODED object name or collection
275
276         Returns:
277            Schema URL
278         """
279         collection_to_type = {
280             '/biosamples/': 'biosample',
281             '/datasets/': 'dataset',
282             '/documents/': 'document',
283             '/experiments/': 'experiment',
284             '/libraries/': 'library',
285             '/replicates/': 'replicate',
286             '/file/': 'file',
287         }
288         object_type = collection_to_type.get(object_type, object_type)
289
290         return self.prepare_url(ENCODED_SCHEMA_ROOT + object_type + '.json') + '#'
291
292     def get_accession_name(self, collection):
293         """Lookup common object accession name given a collection name.
294         """
295         collection_to_accession_name = {
296             '/experiments/': 'experiment_accession',
297             '/biosamples/': 'biosample_accession',
298             '/libraries/': 'library_accession',
299             '/replicates/': 'uuid',
300         }
301
302         accession_name = collection_to_accession_name.get(collection, None)
303         if accession_name is None:
304             raise RuntimeError("Update list of collection to accession names for %s",
305                                collection)
306
307         return accession_name
308
309     def _is_encoded_object(self, obj):
310         '''Test to see if an object is a JSON-LD object
311
312         Some of the nested dictionaries lack the @id or @type
313         information necessary to convert them.
314         '''
315         if not isinstance(obj, collections.Iterable):
316             return False
317
318         if '@id' in obj and '@type' in obj:
319             return True
320         return False
321
322     def patch_json(self, obj_id, changes):
323         """Given a dictionary of changes push them as a HTTP patch request
324         """
325         url = self.prepare_url(obj_id)
326         LOGGER.info('PATCHing to %s', url)
327         payload = json.dumps(changes)
328         response = requests.patch(url, auth=self.auth, headers=self.json_headers, data=payload)
329         if response.status_code != requests.codes.ok:
330             LOGGER.error("Error http status: {}".format(response.status_code))
331             LOGGER.error("Response: %s", response.text)
332             response.raise_for_status()
333         return response.json()
334
335     def put_json(self, obj_id, new_object):
336         url = self.prepare_url(obj_id)
337         LOGGER.info('PUTing to %s', url)
338         payload = json.dumps(new_object)
339         response = requests.put(url, auth=self.auth, headers=self.json_headers, data=payload)
340         if response.status_code != requests.codes.created:
341             LOGGER.error("Error http status: {}".format(response.status_code))
342             response.raise_for_status()
343         return response.json()
344
345     def post_json(self, collection_id, new_object):
346         url = self.prepare_url(collection_id)
347         LOGGER.info('POSTing to %s', url)
348         payload = json.dumps(new_object)
349
350         response = requests.post(url, auth=self.auth, headers=self.json_headers, data=payload)
351         if response.status_code != requests.codes.created:
352             LOGGER.error("http status: {}".format(response.status_code))
353             LOGGER.error("message: {}".format(response.content))
354             response.raise_for_status()
355         return response.json()
356
357     def post_sheet(self, collection, sheet, dry_run=True, verbose=False):
358         """Create new ENCODED objects using metadata encoded in pandas DataFrame
359
360         The DataFrame column names need to encode the attribute names,
361         and in some cases also include some additional type information.
362         (see TypedColumnParser)
363
364         Arguments:
365            collection (str): name of collection to create new objects in
366            sheet (pandas.DataFrame): DataFrame with objects to create,
367                assuming the appropriate accession number is empty.
368                additional the accession number and uuid is updated if the object
369                is created.
370            dry_run (bool): whether or not to skip the code to post the objects
371            verbose (bool): print the http responses.
372
373         Returns:
374            list of created objects.
375
376         Raises:
377            jsonschema.ValidationError if the object doesn't validate against
378               the encoded jsonschema.
379         """
380         accession_name = self.get_accession_name(collection)
381
382         to_create = self.prepare_objects_from_sheet(collection, sheet)
383
384         created = []
385         accessions = []
386         uuids = []
387         for i, new_object in to_create:
388             if new_object:
389                 accession = new_object.get('accession')
390                 uuid = new_object.get('uuid')
391                 description = new_object.get('description')
392
393                 posted_object = self.post_object_from_row(
394                     collection, i, new_object, dry_run, verbose
395                 )
396                 created.append(posted_object)
397
398                 if posted_object:
399                     accession = posted_object.get('accession')
400                     uuid = posted_object.get('uuid')
401                     description = posted_object.get('description')
402
403                 accessions.append(accession)
404                 uuids.append(uuid)
405
406                 LOGGER.info('row {} ({}) -> {}'.format(
407                     (i+2), description, accession))
408                 # +2 comes from python row index + 1 to convert to
409                 # one based indexing + 1 to account for
410                 # row removed by header parsing
411             else:
412                 accessions.append(numpy.nan)
413                 uuids.append(numpy.nan)
414
415         if accession_name in sheet.columns:
416             sheet[accession_name] = accessions
417         if 'uuid' in sheet.columns:
418             sheet['uuid'] = uuids
419
420         return created
421
422     def prepare_objects_from_sheet(self, collection, sheet):
423         accession_name = self.get_accession_name(collection)
424         to_create = []
425         for i, row in sheet.iterrows():
426             new_object = {}
427             for name, value in row.items():
428                 if pandas.notnull(value):
429                     name, value = typed_column_parser(name, value)
430                     if name is None:
431                         continue
432                     new_object[name] = value
433
434             if new_object and new_object.get(accession_name) is None:
435                 try:
436                     self.validate(new_object, collection)
437                 except jsonschema.ValidationError as e:
438                     LOGGER.error("Validation error row %s", i)
439                     raise e
440                 to_create.append((i, new_object))
441
442             else:
443                 to_create.append((i, None))
444
445         return to_create
446
447     def post_object_from_row(self, collection, i, new_object,
448                              dry_run=True, verbose=True):
449         accession_name = self.get_accession_name(collection)
450
451         if not dry_run:
452             response = self.post_json(collection, new_object)
453             if verbose:
454                 print("Reponse {}".format(response))
455
456             obj = response['@graph'][0]
457
458             accession = obj.get(accession_name)
459             if not accession:
460                 accession = obj.get('uuid')
461
462             print("row {} created: {}".format(i, accession))
463             return obj
464         else:
465             new_object[accession_name] = 'would create'
466             return new_object
467
468     def prepare_url(self, request_url):
469         '''This attempts to provide some convienence for accessing a URL
470
471         Given a url fragment it will default to :
472         * requests over http
473         * requests to self.server
474
475         This allows fairly flexible urls. e.g.
476
477         prepare_url('/experiments/ENCSR000AEG')
478         prepare_url('submit.encodedcc.org/experiments/ENCSR000AEG')
479         prepare_url('http://submit.encodedcc.org/experiments/ENCSR000AEG?limit=all')
480
481         should all return the same url
482         '''
483         # clean up potentially messy urls
484         url = urlparse(request_url)._asdict()
485         if not url['scheme']:
486             url['scheme'] = self.scheme
487         if not url['netloc']:
488             url['netloc'] = self.server
489         url = urlunparse(url.values())
490         return url
491
492     def search_jsonld(self, **kwargs):
493         '''Send search request to ENCODED
494
495         to do a general search do
496             searchTerm=term
497         '''
498         url = self.prepare_url('/search/')
499         result = self.get_json(url, **kwargs)
500         self.convert_search_to_jsonld(result)
501         return result
502
503     def convert_search_to_jsonld(self, result):
504         '''Add the context to search result
505
506         Also remove hard to handle nested attributes
507           e.g. remove object.term when we have no id
508         '''
509         graph = result['@graph']
510         for i, obj in enumerate(graph):
511             # suppress nested attributes
512             graph[i] = {k: v for k, v in obj.items() if '.' not in k}
513
514         self.add_jsonld_context(result, self.prepare_url(result['@id']))
515         return result
516
517     def validate(self, obj, object_type=None):
518         """Validate an object against the ENCODED schema
519
520         Args:
521             obj (dictionary): object attributes to be submitted to encoded
522             object_type (string): ENCODED object name.
523
524         Raises:
525             ValidationError: if the object does not conform to the schema.
526         """
527         object_type = object_type if object_type else self.get_object_type(obj)
528         schema_url = self.get_schema_url(object_type)
529         if not schema_url:
530             raise ValueError("Unable to construct schema url")
531
532         schema = self.schemas.setdefault(object_type, self.get_json(schema_url))
533         hidden = obj.copy()
534         if '@id' in hidden:
535             del hidden['@id']
536         if '@type' in hidden:
537             del hidden['@type']
538         jsonschema.validate(hidden, schema)
539
540         # Additional validation rules passed down from the DCC for our grant
541         assay_term_name = hidden.get('assay_term_name')
542         if assay_term_name is not None:
543             if assay_term_name.lower() == 'rna-seq':
544                 if assay_term_name != 'RNA-seq':
545                     raise jsonschema.ValidationError('Incorrect capitialization of RNA-seq')
546
547         species = hidden.get('species')
548         if species == '/organisms/human/':
549             model_age_terms = ['model_organism_age', 'model_organism_age_units']
550             for term in model_age_terms:
551                 if term in obj:
552                     raise jsonschema.ValidationError('model age terms not needed in human')
553
554 class TypedColumnParser(object):
555     @staticmethod
556     def parse_sheet_array_type(value):
557         """Helper function to parse :array columns in sheet
558         """
559         return re.split(',\s*', value)
560
561     @staticmethod
562     def parse_sheet_integer_type(value):
563         """Helper function to parse :integer columns in sheet
564         """
565         return int(value)
566
567     @staticmethod
568     def parse_sheet_boolean_type(value):
569         """Helper function to parse :boolean columns in sheet
570         """
571         return bool(value)
572
573     @staticmethod
574     def parse_sheet_timestamp_type(value):
575         """Helper function to parse :date columns in sheet
576         """
577         if isinstance(value, str):
578             return value
579         return value.strftime('%Y-%m-%d')
580
581     @staticmethod
582     def parse_sheet_string_type(value):
583         """Helper function to parse :string columns in sheet (the default)
584         """
585         return str(value)
586
587     def __getitem__(self, name):
588         parser = {
589             'array': self.parse_sheet_array_type,
590             'boolean': self.parse_sheet_boolean_type,
591             'integer': self.parse_sheet_integer_type,
592             'date': self.parse_sheet_timestamp_type,
593             'string': self.parse_sheet_string_type
594         }.get(name)
595         if parser:
596             return parser
597         else:
598             raise RuntimeError("unrecognized column type")
599
600     def __call__(self, header, value):
601         header = header.split(':')
602         column_type = 'string'
603         if len(header) > 1:
604             if header[1] == 'skip':
605                 return None, None
606             else:
607                 column_type = header[1]
608         return header[0], self[column_type](value)
609
610 typed_column_parser = TypedColumnParser()
611
612 class Document(object):
613     """Helper class for registering documents
614
615     Usage:
616     lysis_uuid = 'f0cc5a7f-96a5-4970-9f46-317cc8e2d6a4'
617     lysis = Document(url_to_pdf, 'extraction protocol', 'Lysis Protocol')
618     lysis.create_if_needed(server, lysis_uuid)
619     """
620     award = 'U54HG006998'
621     lab = '/labs/barbara-wold'
622
623     def __init__(self, url, document_type, description, aliases=None):
624         self.url = url
625         self.filename = os.path.basename(url)
626         self.document_type = document_type
627         self.description = description
628
629         self.references = []
630         self.aliases = None
631         if aliases:
632             if isinstance(aliases, list):
633                 self.aliases = aliases
634             else:
635                 raise ValueError("Aliases needs to be a list")
636         self.content_type = None
637         self.document = None
638         self.md5sum = None
639         self.urls = None
640         self.uuid = None
641
642         self.get_document()
643
644     def get_document(self):
645         if os.path.exists(self.url):
646             with open(self.url, 'rb') as instream:
647                 assert self.url.endswith('pdf')
648                 self.content_type = 'application/pdf'
649                 self.document = instream.read()
650                 self.md5sum = hashlib.md5(self.document)
651         else:
652             req = requests.get(self.url)
653             if req.status_code == 200:
654                 self.content_type = req.headers['content-type']
655                 self.document = req.content
656                 self.md5sum = hashlib.md5(self.document)
657                 self.urls = [self.url]
658
659     def create_payload(self):
660         document_payload = {
661             'attachment': {
662               'download': self.filename,
663               'type': self.content_type,
664               'href': 'data:'+self.content_type+';base64,' + base64.b64encode(self.document).decode('ascii'),
665               'md5sum': self.md5sum.hexdigest()
666             },
667             'document_type': self.document_type,
668             'description': self.description,
669             'award': self.award,
670             'lab': self.lab,
671         }
672         if self.aliases:
673             document_payload['aliases'] = self.aliases
674         if self.references:
675             document_payload['references'] = self.references
676         if self.urls:
677             document_payload['urls'] = self.urls
678
679         return document_payload
680
681     def post(self, server):
682         document_payload = self.create_payload()
683         server.validate(document_payload, 'document')
684         return server.post_json('/documents/', document_payload)
685
686     def save(self, filename):
687         payload = self.create_payload()
688         with open(filename, 'w') as outstream:
689             outstream.write(pformat(payload))
690
691     def create_if_needed(self, server, uuid):
692         self.uuid = uuid
693         if uuid is None:
694             return self.post(server)
695         else:
696             return server.get_json(uuid, embed=False)
697
698 if __name__ == '__main__':
699     # try it
700     from htsworkflow.util.rdfhelp import get_model, dump_model
701     from htsworkflow.util.rdfjsonld import load_into_model
702     from pprint import pprint
703     model = get_model()
704     logging.basicConfig(level=logging.DEBUG)
705     encoded = ENCODED('test.encodedcc.org')
706     encoded.load_netrc()
707     body = encoded.get_jsonld('/experiments/ENCSR000AEC/')
708     pprint(body)
709     load_into_model(model, body)
710     #dump_model(model)