2e91a60506e3f3ccbaba37c28e6351c9fccf8b26
[htsworkflow.git] / htsworkflow / submission / encoded.py
1 """Interface with encoded software for ENCODE3 data submission & warehouse
2
3 This allows retrieving blocks
4 """
5 from __future__ import print_function
6 import pandas
7 import base64
8 import collections
9 import hashlib
10 import logging
11 import json
12 import jsonschema
13 import numpy
14 import os
15 import re
16 import requests
17 import six
18 from six.moves.urllib.parse import urljoin, urlparse, urlunparse
19
20 LOGGER = logging.getLogger(__name__)
21
22 ENCODED_CONTEXT = {
23     # The None context will get added to the root of the tree and will
24     # provide common defaults.
25     None: {
26         # terms in multiple encoded objects
27         'award': {'@type': '@id'},
28         'dataset': {'@type': '@id'},
29         'description': 'rdf:description',
30         'documents': {'@type': '@id'},
31         'experiment': {'@type': '@id'},
32         'href': {'@type': '@id'},
33         'lab': {'@type': '@id'},
34         'library': {'@type': '@id'},
35         'pi': {'@type': '@id'},
36         'platform': {'@type': '@id'},
37         'replicates': {'@type': '@id'},
38         'submitted_by': {'@type': '@id'},
39         'url': {'@type': '@id'},
40     },
41     # Identify and markup contained classes.
42     # e.g. in the tree there was a sub-dictionary named 'biosample'
43     # That dictionary had a term 'biosample_term_id, which is the
44     # term that should be used as the @id.
45     'Biosample': {
46         'biosample_term_id': {'@type': '@id'},
47     },
48     'Experiment': {
49         "assay_term_id": {"@type": "@id"},
50         "files": {"@type": "@id"},
51         "original_files": {"@type": "@id"},
52     },
53     # I tried to use the JSON-LD mapping capabilities to convert the lab
54     # contact information into a vcard record, but the encoded model
55     # didn't lend itself well to the vcard schema
56     #'lab': {
57     #    "address1": "vcard:street-address",
58     #    "address2": "vcard:street-address",
59     #    "city": "vcard:locality",
60     #    "state": "vcard:region",
61     #    "country": "vcard:country"
62     #},
63     'Library': {
64         'nucleic_acid_term_id': {'@type': '@id'}
65     }
66 }
67
68 #FIXME: this needs to be initialized from rdfns
69 ENCODED_NAMESPACES = {
70     # JSON-LD lets you define namespaces so you can used the shorted url syntax.
71     # (instead of http://www.w3.org/2000/01/rdf-schema#label you can do
72     # rdfs:label)
73     "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
74     "rdfs": "http://www.w3.org/2000/01/rdf-schema#",
75     "owl": "http://www.w3.org/2002/07/owl#",
76     "dc": "htp://purl.org/dc/elements/1.1/",
77     "xsd": "http://www.w3.org/2001/XMLSchema#",
78     "vcard": "http://www.w3.org/2006/vcard/ns#",
79
80     # for some namespaces I made a best guess for the ontology root.
81     "EFO": "http://www.ebi.ac.uk/efo/",  # EFO ontology
82     "OBO": "http://purl.obolibrary.org/obo/",  # OBO ontology
83     "OBI": "http://purl.obolibrary.org/obo/OBI_",  # Ontology for Biomedical Investigations
84     # OBI: available from http://svn.code.sf.net/p/obi/code/releases/2012-07-01/merged/merged-obi-comments.owl
85     "SO": "http://purl.obolibrary.org/obo/SO_",  # Sequence ontology
86     # SO: available from http://www.berkeleybop.org/ontologies/so.owl
87     # NTR: New Term Request space for DCC to implement new ontology terms
88
89 }
90
91 ENCODED_SCHEMA_ROOT = '/profiles/'
92
93
94 class ENCODED:
95     '''Programatic access encoded, the software powering ENCODE3's submit site.
96     '''
97     def __init__(self, server, contexts=None, namespaces=None):
98         self.server = server
99         self.scheme = 'https'
100         self.username = None
101         self.password = None
102         self.contexts = contexts if contexts else ENCODED_CONTEXT
103         self.namespaces = namespaces if namespaces else ENCODED_NAMESPACES
104         self.json_headers = {'content-type': 'application/json', 'accept': 'application/json'}
105         self.schemas = {}
106
107     def get_auth(self):
108         return (self.username, self.password)
109     auth = property(get_auth)
110
111     def load_netrc(self):
112         import netrc
113         session = netrc.netrc()
114         authenticators = session.authenticators(self.server)
115         if authenticators:
116             self.username = authenticators[0]
117             self.password = authenticators[2]
118
119     def add_jsonld_context(self, tree, default_base):
120         """Add contexts to various objects in the tree.
121
122         tree is a json tree returned from the DCC's encoded database.
123         contexts is a dictionary of dictionaries containing contexts
124                 for the various  possible encoded classes.
125         base, if supplied allows setting the base url that relative
126             urls will be resolved against.
127         """
128         self.add_jsonld_child_context(tree, default_base)
129         self.add_jsonld_namespaces(tree['@context'])
130
131     def add_jsonld_child_context(self, obj, default_base):
132         '''Add JSON-LD context to the encoded JSON.
133
134         This is recursive because some of the IDs were relative URLs
135         and I needed a way to properly compute a the correct base URL.
136         '''
137         # pretend strings aren't iterable
138         if isinstance(obj, six.string_types):
139             return
140
141         # recurse on container types
142         if isinstance(obj, collections.Sequence):
143             # how should I update lists?
144             for v in obj:
145                 self.add_jsonld_child_context(v, default_base)
146             return
147
148         if isinstance(obj, collections.Mapping):
149             for v in obj.values():
150                 self.add_jsonld_child_context(v, default_base)
151
152         # we have an object. attach a context to it.
153         if self._is_encoded_object(obj):
154             context = self.create_jsonld_context(obj, default_base)
155             if len(context) > 0:
156                 # this is a total hack for relese 33 of
157                 # encoded. They changed their model and
158                 # i'm not sure what to do about it.
159                 if obj.get('@context') == '/terms/':
160                     del obj['@context']
161                 obj.setdefault('@context', {}).update(context)
162
163     def add_jsonld_namespaces(self, context):
164         '''Add shortcut namespaces to a context
165
166         Only needs to be run on the top-most context
167         '''
168         context.update(self.namespaces)
169
170     def create_jsonld_context(self, obj, default_base):
171         '''Synthesize the context for a encoded type
172
173         self.contexts[None] = default context attributes added to any type
174         self.contexts[type] = context attributes for this type.
175         '''
176         obj_type = self.get_object_type(obj)
177         context = {'@base': urljoin(default_base, obj['@id']),
178                    '@vocab': self.get_schema_url(obj_type)}
179         # add in defaults
180         context.update(self.contexts[None])
181         for t in obj['@type']:
182             if t in self.contexts:
183                 context.update(self.contexts[t])
184         return context
185
186     def get_json(self, obj_id, **kwargs):
187         '''GET an ENCODE object as JSON and return as dict
188
189         Uses prepare_url to allow url short-cuts
190         if no keyword arguments are specified it will default to adding limit=all
191         Alternative keyword arguments can be passed in and will be sent to the host.
192
193         Known keywords are:
194           limit - (integer or 'all') how many records to return, all for all of them
195           embed - (bool) if true expands linking ids into their associated object.
196           format - text/html or application/json
197         '''
198         if len(kwargs) == 0:
199             kwargs['limit'] = 'all'
200
201         response = self.get_response(obj_id, **kwargs)
202         data = response.json()
203         response.close()
204         return data
205
206     def get_jsonld(self, obj_id, **kwargs):
207         '''Get ENCODE object as JSONLD annotated with classses contexts
208
209         see get_json for documentation about what keywords can be passed.
210         '''
211         url = self.prepare_url(obj_id)
212         json = self.get_json(obj_id, **kwargs)
213         self.add_jsonld_context(json, url)
214         return json
215
216     def get_object_type(self, obj):
217         """Return type for a encoded object
218         """
219         obj_type = obj.get('@type')
220         if not obj_type:
221             raise ValueError('None type')
222         if isinstance(obj_type, six.string_types):
223             raise ValueError('@type should be a list, not a string')
224         if not isinstance(obj_type, collections.Sequence):
225             raise ValueError('@type is not a sequence')
226         return obj_type[0]
227
228     def get_response(self, fragment, **kwargs):
229         '''GET an ENCODED url and return the requests request
230
231         Uses prepare_url to allow url short-cuts
232         if no keyword arguments are specified it will default to adding limit=all
233         Alternative keyword arguments can be passed in and will be sent to the host.
234
235         Known keywords are:
236           limit - (integer or 'all') how many records to return, all for all of them
237           embed - (bool) if true expands linking ids into their associated object.
238           format - text/html or application/json
239         '''
240         url = self.prepare_url(fragment)
241         LOGGER.info('requesting url: {}'.format(url))
242
243         # do the request
244         LOGGER.debug('username: %s, password: %s', self.username, self.password)
245         arguments = {}
246         if self.username and self.password:
247             arguments['auth'] = self.auth
248
249         if 'stream' in kwargs:
250             arguments['stream'] = kwargs['stream']
251             del kwargs['stream']
252
253         response = requests.get(url, headers=self.json_headers,
254                                 params=kwargs,
255                                 **arguments)
256         if not response.status_code == requests.codes.ok:
257             LOGGER.error("Error http status: {}".format(response.status_code))
258             response.raise_for_status()
259
260         return response
261
262     def get_schema_url(self, object_type):
263         """Create the ENCODED jsonschema url.
264
265         Return the ENCODED object schema url be either
266         object type name or the collection name one posts to.
267
268         For example
269            server.get_schema_url('experiment') and
270            server.get_schema_url('/experiments/') both resolve to
271            SERVER/profiles/experiment.json
272
273         Arguments:
274            object_type (str): either ENCODED object name or collection
275
276         Returns:
277            Schema URL
278         """
279         collection_to_type = {
280             '/biosamples/': 'biosample',
281             '/datasets/': 'dataset',
282             '/documents/': 'document',
283             '/experiments/': 'experiment',
284             '/libraries/': 'library',
285             '/replicates/': 'replicate',
286             '/file/': 'file',
287         }
288         object_type = collection_to_type.get(object_type, object_type)
289
290         return self.prepare_url(ENCODED_SCHEMA_ROOT + object_type + '.json') + '#'
291
292     def get_accession_name(self, collection):
293         """Lookup common object accession name given a collection name.
294         """
295         collection_to_accession_name = {
296             '/experiments/': 'experiment_accession',
297             '/biosamples/': 'biosample_accession',
298             '/libraries/': 'library_accession',
299             '/replicates/': 'uuid',
300         }
301
302         accession_name = collection_to_accession_name.get(collection, None)
303         if accession_name is None:
304             raise RuntimeError("Update list of collection to accession names for %s",
305                                collection)
306
307         return accession_name
308
309     def _is_encoded_object(self, obj):
310         '''Test to see if an object is a JSON-LD object
311
312         Some of the nested dictionaries lack the @id or @type
313         information necessary to convert them.
314         '''
315         if not isinstance(obj, collections.Iterable):
316             return False
317
318         if '@id' in obj and '@type' in obj:
319             return True
320         return False
321
322     def patch_json(self, obj_id, changes):
323         """Given a dictionary of changes push them as a HTTP patch request
324         """
325         url = self.prepare_url(obj_id)
326         LOGGER.info('PATCHing to %s', url)
327         payload = json.dumps(changes)
328         response = requests.patch(url, auth=self.auth, headers=self.json_headers, data=payload)
329         if response.status_code != requests.codes.ok:
330             LOGGER.error("Error http status: {}".format(response.status_code))
331             LOGGER.error("Response: %s", response.text)
332             response.raise_for_status()
333         return response.json()
334
335     def put_json(self, obj_id, new_object):
336         url = self.prepare_url(obj_id)
337         LOGGER.info('PUTing to %s', url)
338         payload = json.dumps(new_object)
339         response = requests.put(url, auth=self.auth, headers=self.json_headers, data=payload)
340         if response.status_code != requests.codes.created:
341             LOGGER.error("Error http status: {}".format(response.status_code))
342             response.raise_for_status()
343         return response.json()
344
345     def post_json(self, collection_id, new_object):
346         url = self.prepare_url(collection_id)
347         LOGGER.info('POSTing to %s', url)
348         payload = json.dumps(new_object)
349
350         response = requests.post(url, auth=self.auth, headers=self.json_headers, data=payload)
351         if response.status_code != requests.codes.created:
352             LOGGER.error("http status: {}".format(response.status_code))
353             LOGGER.error("message: {}".format(response.content))
354             response.raise_for_status()
355         return response.json()
356
357     def post_sheet(self, collection, sheet, dry_run=True, verbose=False):
358         """Create new ENCODED objects using metadata encoded in pandas DataFrame
359
360         The DataFrame column names need to encode the attribute names,
361         and in some cases also include some additional type information.
362         (see TypedColumnParser)
363
364         Arguments:
365            collection (str): name of collection to create new objects in
366            sheet (pandas.DataFrame): DataFrame with objects to create,
367                assuming the appropriate accession number is empty.
368                additional the accession number and uuid is updated if the object
369                is created.
370            dry_run (bool): whether or not to skip the code to post the objects
371            verbose (bool): print the http responses.
372
373         Returns:
374            list of created objects.
375
376         Raises:
377            jsonschema.ValidationError if the object doesn't validate against
378               the encoded jsonschema.
379         """
380         accession_name = self.get_accession_name(collection)
381
382         to_create = self.prepare_objects_from_sheet(collection, sheet)
383
384         created = []
385         accessions = []
386         uuids = []
387         for i, new_object in to_create:
388             if new_object:
389                 accession = new_object.get('accession')
390                 uuid = new_object.get('uuid')
391                 description = new_object.get('description')
392
393                 posted_object = self.post_object_from_row(
394                     collection, i, new_object, dry_run, verbose
395                 )
396                 created.append(posted_object)
397
398                 if posted_object:
399                     accession = posted_object.get('accession')
400                     uuid = posted_object.get('uuid')
401                     description = posted_object.get('description')
402
403                 accessions.append(accession)
404                 uuids.append(uuid)
405
406                 LOGGER.info('row {} ({}) -> {}'.format(
407                     (i+2), description, accession))
408                 # +2 comes from python row index + 1 to convert to
409                 # one based indexing + 1 to account for
410                 # row removed by header parsing
411             else:
412                 accessions.append(numpy.nan)
413                 uuids.append(numpy.nan)
414
415         if accession_name in sheet.columns:
416             sheet[accession_name] = accessions
417         if 'uuid' in sheet.columns:
418             sheet['uuid'] = uuids
419
420         return created
421
422     def prepare_objects_from_sheet(self, collection, sheet):
423         accession_name = self.get_accession_name(collection)
424         to_create = []
425         for i, row in sheet.iterrows():
426             new_object = {}
427             for name, value in row.items():
428                 if pandas.notnull(value):
429                     name, value = typed_column_parser(name, value)
430                     if name is None:
431                         continue
432                     new_object[name] = value
433
434             if new_object and new_object.get(accession_name) is None:
435                 try:
436                     self.validate(new_object, collection)
437                 except jsonschema.ValidationError as e:
438                     LOGGER.error("Validation error row %s", i)
439                     raise e
440                 to_create.append((i, new_object))
441
442             else:
443                 to_create.append((i, None))
444
445         return to_create
446
447     def post_object_from_row(self, collection, i, new_object,
448                              dry_run=True, verbose=True):
449         accession_name = self.get_accession_name(collection)
450
451         if not dry_run:
452             response = self.post_json(collection, new_object)
453             if verbose:
454                 print("Reponse {}".format(response))
455
456             obj = response['@graph'][0]
457
458             accession = obj.get(accession_name)
459             if not accession:
460                 accession = obj.get('uuid')
461
462             print("row {} created: {}".format(i, accession))
463             return obj
464         else:
465             new_object[accession_name] = 'would create'
466             return new_object
467
468     def prepare_url(self, request_url):
469         '''This attempts to provide some convienence for accessing a URL
470
471         Given a url fragment it will default to :
472         * requests over http
473         * requests to self.server
474
475         This allows fairly flexible urls. e.g.
476
477         prepare_url('/experiments/ENCSR000AEG')
478         prepare_url('submit.encodedcc.org/experiments/ENCSR000AEG')
479         prepare_url('http://submit.encodedcc.org/experiments/ENCSR000AEG?limit=all')
480
481         should all return the same url
482         '''
483         # clean up potentially messy urls
484         url = urlparse(request_url)._asdict()
485         if not url['scheme']:
486             url['scheme'] = self.scheme
487         if not url['netloc']:
488             url['netloc'] = self.server
489         url = urlunparse(url.values())
490         return url
491
492     def search_jsonld(self, **kwargs):
493         '''Send search request to ENCODED
494
495         to do a general search do
496             searchTerm=term
497         '''
498         url = self.prepare_url('/search/')
499         result = self.get_json(url, **kwargs)
500         self.convert_search_to_jsonld(result)
501         return result
502
503     def convert_search_to_jsonld(self, result):
504         '''Add the context to search result
505
506         Also remove hard to handle nested attributes
507           e.g. remove object.term when we have no id
508         '''
509         graph = result['@graph']
510         for i, obj in enumerate(graph):
511             # suppress nested attributes
512             graph[i] = {k: v for k, v in obj.items() if '.' not in k}
513
514         self.add_jsonld_context(result, self.prepare_url(result['@id']))
515         return result
516
517     def validate(self, obj, object_type=None):
518         """Validate an object against the ENCODED schema
519
520         Args:
521             obj (dictionary): object attributes to be submitted to encoded
522             object_type (string): ENCODED object name.
523
524         Raises:
525             ValidationError: if the object does not conform to the schema.
526         """
527         object_type = object_type if object_type else self.get_object_type(obj)
528         schema_url = self.get_schema_url(object_type)
529         if not schema_url:
530             raise ValueError("Unable to construct schema url")
531
532         schema = self.schemas.setdefault(object_type, self.get_json(schema_url))
533         hidden = obj.copy()
534         if '@id' in hidden:
535             del hidden['@id']
536         if '@type' in hidden:
537             del hidden['@type']
538         jsonschema.validate(hidden, schema)
539
540 class TypedColumnParser(object):
541     @staticmethod
542     def parse_sheet_array_type(value):
543         """Helper function to parse :array columns in sheet
544         """
545         return re.split(',\s*', value)
546
547     @staticmethod
548     def parse_sheet_integer_type(value):
549         """Helper function to parse :integer columns in sheet
550         """
551         return int(value)
552
553     @staticmethod
554     def parse_sheet_boolean_type(value):
555         """Helper function to parse :boolean columns in sheet
556         """
557         return bool(value)
558
559     @staticmethod
560     def parse_sheet_timestamp_type(value):
561         """Helper function to parse :date columns in sheet
562         """
563         if isinstance(value, str):
564             return value
565         return value.strftime('%Y-%m-%d')
566
567     @staticmethod
568     def parse_sheet_string_type(value):
569         """Helper function to parse :string columns in sheet (the default)
570         """
571         return str(value)
572
573     def __getitem__(self, name):
574         parser = {
575             'array': self.parse_sheet_array_type,
576             'boolean': self.parse_sheet_boolean_type,
577             'integer': self.parse_sheet_integer_type,
578             'date': self.parse_sheet_timestamp_type,
579             'string': self.parse_sheet_string_type
580         }.get(name)
581         if parser:
582             return parser
583         else:
584             raise RuntimeError("unrecognized column type")
585
586     def __call__(self, header, value):
587         header = header.split(':')
588         column_type = 'string'
589         if len(header) > 1:
590             if header[1] == 'skip':
591                 return None, None
592             else:
593                 column_type = header[1]
594         return header[0], self[column_type](value)
595
596 typed_column_parser = TypedColumnParser()
597
598 class Document(object):
599     """Helper class for registering documents
600
601     Usage:
602     lysis_uuid = 'f0cc5a7f-96a5-4970-9f46-317cc8e2d6a4'
603     lysis = Document(url_to_pdf, 'extraction protocol', 'Lysis Protocol')
604     lysis.create_if_needed(server, lysis_uuid)
605     """
606     award = 'U54HG006998'
607     lab = '/labs/barbara-wold'
608
609     def __init__(self, url, document_type, description, aliases=None):
610         self.url = url
611         self.filename = os.path.basename(url)
612         self.document_type = document_type
613         self.description = description
614
615         self.references = []
616         self.aliases = None
617         if aliases:
618             if isinstance(aliases, list):
619                 self.aliases = aliases
620             else:
621                 raise ValueError("Aliases needs to be a list")
622         self.content_type = None
623         self.document = None
624         self.md5sum = None
625         self.urls = None
626         self.uuid = None
627
628         self.get_document()
629
630     def get_document(self):
631         if os.path.exists(self.url):
632             with open(self.url, 'rb') as instream:
633                 assert self.url.endswith('pdf')
634                 self.content_type = 'application/pdf'
635                 self.document = instream.read()
636                 self.md5sum = hashlib.md5(self.document)
637         else:
638             req = requests.get(self.url)
639             if req.status_code == 200:
640                 self.content_type = req.headers['content-type']
641                 self.document = req.content
642                 self.md5sum = hashlib.md5(self.document)
643                 self.urls = [self.url]
644
645     def create_payload(self):
646         document_payload = {
647             'attachment': {
648               'download': self.filename,
649               'type': self.content_type,
650               'href': 'data:'+self.content_type+';base64,' + base64.b64encode(self.document).decode('ascii'),
651               'md5sum': self.md5sum.hexdigest()
652             },
653             'document_type': self.document_type,
654             'description': self.description,
655             'award': self.award,
656             'lab': self.lab,
657         }
658         if self.aliases:
659             document_payload['aliases'] = self.aliases
660         if self.references:
661             document_payload['references'] = self.references
662         if self.urls:
663             document_payload['urls'] = self.urls
664
665         return document_payload
666
667     def post(self, server):
668         document_payload = self.create_payload()
669         server.validate(document_payload, 'document')
670         return server.post_json('/documents/', document_payload)
671
672     def save(self, filename):
673         payload = self.create_payload()
674         with open(filename, 'w') as outstream:
675             outstream.write(pformat(payload))
676
677     def create_if_needed(self, server, uuid):
678         self.uuid = uuid
679         if uuid is None:
680             return self.post(server)
681         else:
682             return server.get_json(uuid, embed=False)
683
684 if __name__ == '__main__':
685     # try it
686     from htsworkflow.util.rdfhelp import get_model, dump_model
687     from htsworkflow.util.rdfjsonld import load_into_model
688     from pprint import pprint
689     model = get_model()
690     logging.basicConfig(level=logging.DEBUG)
691     encoded = ENCODED('test.encodedcc.org')
692     encoded.load_netrc()
693     body = encoded.get_jsonld('/experiments/ENCSR000AEC/')
694     pprint(body)
695     load_into_model(model, body)
696     #dump_model(model)