Add new paths for spreadsheet ENCODE submission.
[htsworkflow.git] / htsworkflow / submission / encoded.py
1 """Interface with encoded software for ENCODE3 data submission & warehouse
2
3 This allows retrieving blocks
4 """
5 from __future__ import print_function
6 import pandas
7 import base64
8 import collections
9 import hashlib
10 import logging
11 import json
12 import jsonschema
13 import numpy
14 import os
15 import re
16 import requests
17 import six
18 from six.moves.urllib.parse import urljoin, urlparse, urlunparse
19
20 LOGGER = logging.getLogger(__name__)
21
22 ENCODED_CONTEXT = {
23     # The None context will get added to the root of the tree and will
24     # provide common defaults.
25     None: {
26         # terms in multiple encoded objects
27         'award': {'@type': '@id'},
28         'dataset': {'@type': '@id'},
29         'description': 'rdf:description',
30         'documents': {'@type': '@id'},
31         'experiment': {'@type': '@id'},
32         'href': {'@type': '@id'},
33         'lab': {'@type': '@id'},
34         'library': {'@type': '@id'},
35         'pi': {'@type': '@id'},
36         'platform': {'@type': '@id'},
37         'replicates': {'@type': '@id'},
38         'submitted_by': {'@type': '@id'},
39         'url': {'@type': '@id'},
40     },
41     # Identify and markup contained classes.
42     # e.g. in the tree there was a sub-dictionary named 'biosample'
43     # That dictionary had a term 'biosample_term_id, which is the
44     # term that should be used as the @id.
45     'Biosample': {
46         'biosample_term_id': {'@type': '@id'},
47     },
48     'Experiment': {
49         "assay_term_id": {"@type": "@id"},
50         "files": {"@type": "@id"},
51         "original_files": {"@type": "@id"},
52     },
53     # I tried to use the JSON-LD mapping capabilities to convert the lab
54     # contact information into a vcard record, but the encoded model
55     # didn't lend itself well to the vcard schema
56     #'lab': {
57     #    "address1": "vcard:street-address",
58     #    "address2": "vcard:street-address",
59     #    "city": "vcard:locality",
60     #    "state": "vcard:region",
61     #    "country": "vcard:country"
62     #},
63     'Library': {
64         'nucleic_acid_term_id': {'@type': '@id'}
65     }
66 }
67
68 #FIXME: this needs to be initialized from rdfns
69 ENCODED_NAMESPACES = {
70     # JSON-LD lets you define namespaces so you can used the shorted url syntax.
71     # (instead of http://www.w3.org/2000/01/rdf-schema#label you can do
72     # rdfs:label)
73     "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
74     "rdfs": "http://www.w3.org/2000/01/rdf-schema#",
75     "owl": "http://www.w3.org/2002/07/owl#",
76     "dc": "htp://purl.org/dc/elements/1.1/",
77     "xsd": "http://www.w3.org/2001/XMLSchema#",
78     "vcard": "http://www.w3.org/2006/vcard/ns#",
79
80     # for some namespaces I made a best guess for the ontology root.
81     "EFO": "http://www.ebi.ac.uk/efo/",  # EFO ontology
82     "OBO": "http://purl.obolibrary.org/obo/",  # OBO ontology
83     "OBI": "http://purl.obolibrary.org/obo/OBI_",  # Ontology for Biomedical Investigations
84     # OBI: available from http://svn.code.sf.net/p/obi/code/releases/2012-07-01/merged/merged-obi-comments.owl
85     "SO": "http://purl.obolibrary.org/obo/SO_",  # Sequence ontology
86     # SO: available from http://www.berkeleybop.org/ontologies/so.owl
87     # NTR: New Term Request space for DCC to implement new ontology terms
88
89 }
90
91 ENCODED_SCHEMA_ROOT = '/profiles/'
92
93
94 class ENCODED:
95     '''Programatic access encoded, the software powering ENCODE3's submit site.
96     '''
97     def __init__(self, server, contexts=None, namespaces=None):
98         self.server = server
99         self.scheme = 'https'
100         self.username = None
101         self.password = None
102         self.contexts = contexts if contexts else ENCODED_CONTEXT
103         self.namespaces = namespaces if namespaces else ENCODED_NAMESPACES
104         self.json_headers = {'content-type': 'application/json', 'accept': 'application/json'}
105         self.schemas = {}
106
107     def get_auth(self):
108         return (self.username, self.password)
109     auth = property(get_auth)
110
111     def load_netrc(self):
112         import netrc
113         session = netrc.netrc()
114         authenticators = session.authenticators(self.server)
115         if authenticators:
116             self.username = authenticators[0]
117             self.password = authenticators[2]
118
119     def add_jsonld_context(self, tree, default_base):
120         """Add contexts to various objects in the tree.
121
122         tree is a json tree returned from the DCC's encoded database.
123         contexts is a dictionary of dictionaries containing contexts
124                 for the various  possible encoded classes.
125         base, if supplied allows setting the base url that relative
126             urls will be resolved against.
127         """
128         self.add_jsonld_child_context(tree, default_base)
129         self.add_jsonld_namespaces(tree['@context'])
130
131     def add_jsonld_child_context(self, obj, default_base):
132         '''Add JSON-LD context to the encoded JSON.
133
134         This is recursive because some of the IDs were relative URLs
135         and I needed a way to properly compute a the correct base URL.
136         '''
137         # pretend strings aren't iterable
138         if isinstance(obj, six.string_types):
139             return
140
141         # recurse on container types
142         if isinstance(obj, collections.Sequence):
143             # how should I update lists?
144             for v in obj:
145                 self.add_jsonld_child_context(v, default_base)
146             return
147
148         if isinstance(obj, collections.Mapping):
149             for v in obj.values():
150                 self.add_jsonld_child_context(v, default_base)
151
152         # we have an object. attach a context to it.
153         if self._is_encoded_object(obj):
154             context = self.create_jsonld_context(obj, default_base)
155             if len(context) > 0:
156                 # this is a total hack for relese 33 of
157                 # encoded. They changed their model and
158                 # i'm not sure what to do about it.
159                 if obj.get('@context') == '/terms/':
160                     del obj['@context']
161                 obj.setdefault('@context', {}).update(context)
162
163     def add_jsonld_namespaces(self, context):
164         '''Add shortcut namespaces to a context
165
166         Only needs to be run on the top-most context
167         '''
168         context.update(self.namespaces)
169
170     def create_jsonld_context(self, obj, default_base):
171         '''Synthesize the context for a encoded type
172
173         self.contexts[None] = default context attributes added to any type
174         self.contexts[type] = context attributes for this type.
175         '''
176         obj_type = self.get_object_type(obj)
177         context = {'@base': urljoin(default_base, obj['@id']),
178                    '@vocab': self.get_schema_url(obj_type)}
179         # add in defaults
180         context.update(self.contexts[None])
181         for t in obj['@type']:
182             if t in self.contexts:
183                 context.update(self.contexts[t])
184         return context
185
186     def get_json(self, obj_id, **kwargs):
187         '''GET an ENCODE object as JSON and return as dict
188
189         Uses prepare_url to allow url short-cuts
190         if no keyword arguments are specified it will default to adding limit=all
191         Alternative keyword arguments can be passed in and will be sent to the host.
192
193         Known keywords are:
194           limit - (integer or 'all') how many records to return, all for all of them
195           embed - (bool) if true expands linking ids into their associated object.
196           format - text/html or application/json
197         '''
198         if len(kwargs) == 0:
199             kwargs['limit'] = 'all'
200
201         response = self.get_response(obj_id, **kwargs)
202         data = response.json()
203         response.close()
204         return data
205
206     def get_jsonld(self, obj_id, **kwargs):
207         '''Get ENCODE object as JSONLD annotated with classses contexts
208
209         see get_json for documentation about what keywords can be passed.
210         '''
211         url = self.prepare_url(obj_id)
212         json = self.get_json(obj_id, **kwargs)
213         self.add_jsonld_context(json, url)
214         return json
215
216     def get_object_type(self, obj):
217         """Return type for a encoded object
218         """
219         obj_type = obj.get('@type')
220         if not obj_type:
221             raise ValueError('None type')
222         if isinstance(obj_type, six.string_types):
223             raise ValueError('@type should be a list, not a string')
224         if not isinstance(obj_type, collections.Sequence):
225             raise ValueError('@type is not a sequence')
226         return obj_type[0]
227
228     def get_response(self, fragment, **kwargs):
229         '''GET an ENCODED url and return the requests request
230
231         Uses prepare_url to allow url short-cuts
232         if no keyword arguments are specified it will default to adding limit=all
233         Alternative keyword arguments can be passed in and will be sent to the host.
234
235         Known keywords are:
236           limit - (integer or 'all') how many records to return, all for all of them
237           embed - (bool) if true expands linking ids into their associated object.
238           format - text/html or application/json
239         '''
240         url = self.prepare_url(fragment)
241         LOGGER.info('requesting url: {}'.format(url))
242
243         # do the request
244         LOGGER.debug('username: %s, password: %s', self.username, self.password)
245         arguments = {}
246         if self.username and self.password:
247             arguments['auth'] = self.auth
248
249         if 'stream' in kwargs:
250             arguments['stream'] = kwargs['stream']
251             del kwargs['stream']
252
253         response = requests.get(url, headers=self.json_headers,
254                                 params=kwargs,
255                                 **arguments)
256         if not response.status_code == requests.codes.ok:
257             LOGGER.error("Error http status: {}".format(response.status_code))
258             response.raise_for_status()
259
260         return response
261
262     def get_schema_url(self, object_type):
263         """Create the ENCODED jsonschema url.
264
265         Return the ENCODED object schema url be either
266         object type name or the collection name one posts to.
267
268         For example
269            server.get_schema_url('experiment') and
270            server.get_schema_url('/experiments/') both resolve to
271            SERVER/profiles/experiment.json
272
273         Arguments:
274            object_type (str): either ENCODED object name or collection
275
276         Returns:
277            Schema URL
278         """
279         collection_to_type = {
280             '/annotations/': 'annotation',
281             '/biosamples/': 'biosample',
282             '/datasets/': 'dataset',
283             '/documents/': 'document',
284             '/experiments/': 'experiment',
285             '/libraries/': 'library',
286             '/replicates/': 'replicate',
287             '/files/': 'file',
288         }
289         object_type = collection_to_type.get(object_type, object_type)
290
291         return self.prepare_url(ENCODED_SCHEMA_ROOT + object_type + '.json') + '#'
292
293     def get_accession_name(self, collection):
294         """Lookup common object accession name given a collection name.
295         """
296         collection_to_accession_name = {
297             '/annotations/': 'annotation_accession',
298             '/biosamples/': 'biosample_accession',
299             '/experiments/': 'experiment_accession',
300             '/files/': 'file_accession',
301             '/libraries/': 'library_accession',
302             '/replicates/': 'uuid',
303         }
304
305         accession_name = collection_to_accession_name.get(collection, None)
306         if accession_name is None:
307             raise RuntimeError("Update list of collection to accession names for %s",
308                                collection)
309
310         return accession_name
311
312     def _is_encoded_object(self, obj):
313         '''Test to see if an object is a JSON-LD object
314
315         Some of the nested dictionaries lack the @id or @type
316         information necessary to convert them.
317         '''
318         if not isinstance(obj, collections.Iterable):
319             return False
320
321         if '@id' in obj and '@type' in obj:
322             return True
323         return False
324
325     def patch_json(self, obj_id, changes):
326         """Given a dictionary of changes push them as a HTTP patch request
327         """
328         url = self.prepare_url(obj_id)
329         LOGGER.info('PATCHing to %s', url)
330         payload = json.dumps(changes)
331         response = requests.patch(url, auth=self.auth, headers=self.json_headers, data=payload)
332         if response.status_code != requests.codes.ok:
333             LOGGER.error("Error http status: {}".format(response.status_code))
334             LOGGER.error("Response: %s", response.text)
335             response.raise_for_status()
336         return response.json()
337
338     def put_json(self, obj_id, new_object):
339         url = self.prepare_url(obj_id)
340         LOGGER.info('PUTing to %s', url)
341         payload = json.dumps(new_object)
342         response = requests.put(url, auth=self.auth, headers=self.json_headers, data=payload)
343         if response.status_code != requests.codes.created:
344             LOGGER.error("Error http status: {}".format(response.status_code))
345             response.raise_for_status()
346         return response.json()
347
348     def post_json(self, collection_id, new_object):
349         url = self.prepare_url(collection_id)
350         LOGGER.info('POSTing to %s', url)
351         payload = json.dumps(new_object)
352
353         response = requests.post(url, auth=self.auth, headers=self.json_headers, data=payload)
354         if response.status_code != requests.codes.created:
355             LOGGER.error("http status: {}".format(response.status_code))
356             LOGGER.error("message: {}".format(response.content))
357             response.raise_for_status()
358         return response.json()
359
360     def post_sheet(self, collection, sheet, dry_run=True, verbose=False):
361         """Create new ENCODED objects using metadata encoded in pandas DataFrame
362
363         The DataFrame column names need to encode the attribute names,
364         and in some cases also include some additional type information.
365         (see TypedColumnParser)
366
367         Arguments:
368            collection (str): name of collection to create new objects in
369            sheet (pandas.DataFrame): DataFrame with objects to create,
370                assuming the appropriate accession number is empty.
371                additional the accession number and uuid is updated if the object
372                is created.
373            dry_run (bool): whether or not to skip the code to post the objects
374            verbose (bool): print the http responses.
375
376         Returns:
377            list of created objects.
378
379         Raises:
380            jsonschema.ValidationError if the object doesn't validate against
381               the encoded jsonschema.
382         """
383         accession_name = self.get_accession_name(collection)
384
385         to_create = self.prepare_objects_from_sheet(collection, sheet)
386
387         created = []
388         accessions = []
389         uuids = []
390         for i, new_object in to_create:
391             if new_object:
392                 accession = new_object.get('accession')
393                 uuid = new_object.get('uuid')
394                 description = new_object.get('description')
395
396                 posted_object = self.post_object_from_row(
397                     collection, i, new_object, dry_run, verbose
398                 )
399                 created.append(posted_object)
400
401                 if posted_object:
402                     accession = posted_object.get('accession')
403                     uuid = posted_object.get('uuid')
404                     description = posted_object.get('description')
405
406                 accessions.append(accession)
407                 uuids.append(uuid)
408
409                 LOGGER.info('row {} ({}) -> {}'.format(
410                     (i+2), description, accession))
411                 # +2 comes from python row index + 1 to convert to
412                 # one based indexing + 1 to account for
413                 # row removed by header parsing
414             else:
415                 accessions.append(numpy.nan)
416                 uuids.append(numpy.nan)
417
418         if accession_name in sheet.columns:
419             sheet[accession_name] = accessions
420         if 'uuid' in sheet.columns:
421             sheet['uuid'] = uuids
422
423         return created
424
425     def prepare_objects_from_sheet(self, collection, sheet):
426         accession_name = self.get_accession_name(collection)
427         to_create = []
428         for i, row in sheet.iterrows():
429             new_object = {}
430             for name, value in row.items():
431                 if pandas.notnull(value):
432                     name, value = typed_column_parser(name, value)
433                     if name is None:
434                         continue
435                     new_object[name] = value
436
437             if new_object and new_object.get(accession_name) is None:
438                 try:
439                     self.validate(new_object, collection)
440                 except jsonschema.ValidationError as e:
441                     LOGGER.error("Validation error row %s", i)
442                     raise e
443                 to_create.append((i, new_object))
444
445             else:
446                 to_create.append((i, None))
447
448         return to_create
449
450     def post_object_from_row(self, collection, i, new_object,
451                              dry_run=True, verbose=True):
452         accession_name = self.get_accession_name(collection)
453
454         if not dry_run:
455             response = self.post_json(collection, new_object)
456             if verbose:
457                 print("Reponse {}".format(response))
458
459             obj = response['@graph'][0]
460
461             accession = obj.get(accession_name)
462             if not accession:
463                 accession = obj.get('uuid')
464
465             print("row {} created: {}".format(i, accession))
466             return obj
467         else:
468             new_object[accession_name] = 'would create'
469             return new_object
470
471     def prepare_url(self, request_url):
472         '''This attempts to provide some convienence for accessing a URL
473
474         Given a url fragment it will default to :
475         * requests over http
476         * requests to self.server
477
478         This allows fairly flexible urls. e.g.
479
480         prepare_url('/experiments/ENCSR000AEG')
481         prepare_url('submit.encodedcc.org/experiments/ENCSR000AEG')
482         prepare_url('http://submit.encodedcc.org/experiments/ENCSR000AEG?limit=all')
483
484         should all return the same url
485         '''
486         # clean up potentially messy urls
487         url = urlparse(request_url)._asdict()
488         if not url['scheme']:
489             url['scheme'] = self.scheme
490         if not url['netloc']:
491             url['netloc'] = self.server
492         url = urlunparse(url.values())
493         return url
494
495     def search_jsonld(self, **kwargs):
496         '''Send search request to ENCODED
497
498         to do a general search do
499             searchTerm=term
500         '''
501         url = self.prepare_url('/search/')
502         result = self.get_json(url, **kwargs)
503         self.convert_search_to_jsonld(result)
504         return result
505
506     def convert_search_to_jsonld(self, result):
507         '''Add the context to search result
508
509         Also remove hard to handle nested attributes
510           e.g. remove object.term when we have no id
511         '''
512         graph = result['@graph']
513         for i, obj in enumerate(graph):
514             # suppress nested attributes
515             graph[i] = {k: v for k, v in obj.items() if '.' not in k}
516
517         self.add_jsonld_context(result, self.prepare_url(result['@id']))
518         return result
519
520     def validate(self, obj, object_type=None):
521         """Validate an object against the ENCODED schema
522
523         Args:
524             obj (dictionary): object attributes to be submitted to encoded
525             object_type (string): ENCODED object name.
526
527         Raises:
528             ValidationError: if the object does not conform to the schema.
529         """
530         object_type = object_type if object_type else self.get_object_type(obj)
531         schema_url = self.get_schema_url(object_type)
532         if not schema_url:
533             raise ValueError("Unable to construct schema url")
534
535         schema = self.schemas.setdefault(object_type, self.get_json(schema_url))
536         hidden = obj.copy()
537         if '@id' in hidden:
538             del hidden['@id']
539         if '@type' in hidden:
540             del hidden['@type']
541         jsonschema.validate(hidden, schema)
542
543         # Additional validation rules passed down from the DCC for our grant
544         assay_term_name = hidden.get('assay_term_name')
545         if assay_term_name is not None:
546             if assay_term_name.lower() == 'rna-seq':
547                 if assay_term_name != 'RNA-seq':
548                     raise jsonschema.ValidationError('Incorrect capitialization of RNA-seq')
549
550         species = hidden.get('species')
551         if species == '/organisms/human/':
552             model_age_terms = ['model_organism_age', 'model_organism_age_units']
553             for term in model_age_terms:
554                 if term in obj:
555                     raise jsonschema.ValidationError('model age terms not needed in human')
556
557 class TypedColumnParser(object):
558     @staticmethod
559     def parse_sheet_array_type(value):
560         """Helper function to parse :array columns in sheet
561         """
562         return re.split(',\s*', value)
563
564     @staticmethod
565     def parse_sheet_integer_type(value):
566         """Helper function to parse :integer columns in sheet
567         """
568         return int(value)
569
570     @staticmethod
571     def parse_sheet_boolean_type(value):
572         """Helper function to parse :boolean columns in sheet
573         """
574         return bool(value)
575
576     @staticmethod
577     def parse_sheet_timestamp_type(value):
578         """Helper function to parse :date columns in sheet
579         """
580         if isinstance(value, str):
581             return value
582         return value.strftime('%Y-%m-%d')
583
584     @staticmethod
585     def parse_sheet_string_type(value):
586         """Helper function to parse :string columns in sheet (the default)
587         """
588         return str(value)
589
590     def __getitem__(self, name):
591         parser = {
592             'array': self.parse_sheet_array_type,
593             'boolean': self.parse_sheet_boolean_type,
594             'integer': self.parse_sheet_integer_type,
595             'date': self.parse_sheet_timestamp_type,
596             'string': self.parse_sheet_string_type
597         }.get(name)
598         if parser:
599             return parser
600         else:
601             raise RuntimeError("unrecognized column type")
602
603     def __call__(self, header, value):
604         header = header.split(':')
605         column_type = 'string'
606         if len(header) > 1:
607             if header[1] == 'skip':
608                 return None, None
609             else:
610                 column_type = header[1]
611         return header[0], self[column_type](value)
612
613 typed_column_parser = TypedColumnParser()
614
615 class Document(object):
616     """Helper class for registering documents
617
618     Usage:
619     lysis_uuid = 'f0cc5a7f-96a5-4970-9f46-317cc8e2d6a4'
620     lysis = Document(url_to_pdf, 'extraction protocol', 'Lysis Protocol')
621     lysis.create_if_needed(server, lysis_uuid)
622     """
623     award = 'U54HG006998'
624     lab = '/labs/barbara-wold'
625
626     def __init__(self, url, document_type, description, aliases=None):
627         self.url = url
628         self.filename = os.path.basename(url)
629         self.document_type = document_type
630         self.description = description
631
632         self.references = []
633         self.aliases = None
634         if aliases:
635             if isinstance(aliases, list):
636                 self.aliases = aliases
637             else:
638                 raise ValueError("Aliases needs to be a list")
639         self.content_type = None
640         self.document = None
641         self.md5sum = None
642         self.urls = None
643         self.uuid = None
644
645         self.get_document()
646
647     def get_document(self):
648         if os.path.exists(self.url):
649             with open(self.url, 'rb') as instream:
650                 assert self.url.endswith('pdf')
651                 self.content_type = 'application/pdf'
652                 self.document = instream.read()
653                 self.md5sum = hashlib.md5(self.document)
654         else:
655             req = requests.get(self.url)
656             if req.status_code == 200:
657                 self.content_type = req.headers['content-type']
658                 self.document = req.content
659                 self.md5sum = hashlib.md5(self.document)
660                 self.urls = [self.url]
661
662     def create_payload(self):
663         document_payload = {
664             'attachment': {
665               'download': self.filename,
666               'type': self.content_type,
667               'href': 'data:'+self.content_type+';base64,' + base64.b64encode(self.document).decode('ascii'),
668               'md5sum': self.md5sum.hexdigest()
669             },
670             'document_type': self.document_type,
671             'description': self.description,
672             'award': self.award,
673             'lab': self.lab,
674         }
675         if self.aliases:
676             document_payload['aliases'] = self.aliases
677         if self.references:
678             document_payload['references'] = self.references
679         if self.urls:
680             document_payload['urls'] = self.urls
681
682         return document_payload
683
684     def post(self, server):
685         document_payload = self.create_payload()
686         server.validate(document_payload, 'document')
687         return server.post_json('/documents/', document_payload)
688
689     def save(self, filename):
690         payload = self.create_payload()
691         with open(filename, 'w') as outstream:
692             outstream.write(pformat(payload))
693
694     def create_if_needed(self, server, uuid):
695         self.uuid = uuid
696         if uuid is None:
697             return self.post(server)
698         else:
699             return server.get_json(uuid, embed=False)
700
701 if __name__ == '__main__':
702     # try it
703     from htsworkflow.util.rdfhelp import get_model, dump_model
704     from htsworkflow.util.rdfjsonld import load_into_model
705     from pprint import pprint
706     model = get_model()
707     logging.basicConfig(level=logging.DEBUG)
708     encoded = ENCODED('test.encodedcc.org')
709     encoded.load_netrc()
710     body = encoded.get_jsonld('/experiments/ENCSR000AEC/')
711     pprint(body)
712     load_into_model(model, body)
713     #dump_model(model)