"""Return type for a encoded object
"""
obj_type = obj.get('@type')
- if obj_type and isinstance(obj_type, collections.Sequence):
- return obj_type[0]
+ if not obj_type:
+ raise ValueError('None type')
+ if type(obj_type) in types.StringTypes:
+ raise ValueError('@type should be a list, not a string')
+ if not isinstance(obj_type, collections.Sequence):
+ raise ValueError('@type is not a sequence')
+ return obj_type[0]
- def get_schema_url(self, obj):
- obj_type = self.get_object_type(obj)
- if obj_type:
- return self.prepare_url(ENCODED_SCHEMA_ROOT + obj_type + '.json') + '#'
+ def get_schema_url(self, object_type):
+ return self.prepare_url(ENCODED_SCHEMA_ROOT + object_type + '.json') + '#'
def _is_encoded_object(self, obj):
'''Test to see if an object is a JSON-LD object
if not schema_url:
raise ValueError("Unable to construct schema url")
- schema = self.schemas.setdefault(obj_type, self.get_json(schema_url))
+ schema = self.schemas.setdefault(object_type, self.get_json(schema_url))
hidden = obj.copy()
- if '@id' in hidden: del hidden['@id']
- if '@type' in hidden: del hidden['@type']
+ if '@id' in hidden:
+ del hidden['@id']
+ if '@type' in hidden:
+ del hidden['@type']
jsonschema.validate(hidden, schema)
+ class TypedColumnParser(object):
+ @staticmethod
+ def parse_sheet_array_type(value):
+ """Helper function to parse :array columns in sheet
+ """
+ return value.split(', ')
+
+ @staticmethod
+ def parse_sheet_integer_type(value):
+ """Helper function to parse :integer columns in sheet
+ """
+ return int(value)
+
+ @staticmethod
+ def parse_sheet_boolean_type(value):
+ """Helper function to parse :boolean columns in sheet
+ """
+ return bool(value)
+
+ @staticmethod
+ def parse_sheet_timestamp_type(value):
+ """Helper function to parse :date columns in sheet
+ """
+ return value.strftime('%Y-%m-%d')
+
+ @staticmethod
+ def parse_sheet_string_type(value):
+ """Helper function to parse :string columns in sheet (the default)
+ """
+ return unicode(value)
+
+ def __getitem__(self, name):
+ parser = {
+ 'array': self.parse_sheet_array_type,
+ 'boolean': self.parse_sheet_boolean_type,
+ 'integer': self.parse_sheet_integer_type,
+ 'date': self.parse_sheet_timestamp_type,
+ 'string': self.parse_sheet_string_type
+ }.get(name)
+ if parser:
+ return parser
+ else:
+ raise RuntimeError("unrecognized column type")
+
+ def __call__(self, header, value):
+ header = header.split(':')
+ column_type = 'string'
+ if len(header) > 1:
+ if header[1] == 'skip':
+ return None, None
+ else:
+ column_type = header[1]
+ return header[0], self[column_type](value)
+
+ typed_column_parser = TypedColumnParser()
+class Document(object):
+ """Helper class for registering documents
+
+ Usage:
+ lysis_uuid = 'f0cc5a7f-96a5-4970-9f46-317cc8e2d6a4'
+ lysis = Document(url_to_pdf, 'extraction protocol', 'Lysis Protocol')
+ lysis.create_if_needed(server, lysis_uuid)
+ """
+ award = 'U54HG006998'
+ lab = '/labs/barbara-wold'
+
+ def __init__(self, url, document_type, description, aliases=None):
+ self.url = url
+ self.filename = os.path.basename(url)
+ self.document_type = document_type
+ self.description = description
+
+ self.references = []
+ self.aliases = aliases if aliases is not None else []
+ self.content_type = None
+ self.document = None
+ self.md5sum = None
+ self.urls = None
+ self.uuid = None
+
+ self.get_document()
+
+ def get_document(self):
+ if os.path.exists(self.url):
+ with open(self.url, 'r') as instream:
+ assert self.url.endswith('pdf')
+ self.content_type = 'application/pdf'
+ self.document = instream.read()
+ self.md5sum = hashlib.md5(self.document)
+ else:
+ req = requests.get(self.url)
+ if req.status_code == 200:
+ self.content_type = req.headers['content-type']
+ self.document = req.content
+ self.md5sum = hashlib.md5(self.document)
+ self.urls = [self.url]
+
+ def create_payload(self):
+ document_payload = {
+ 'attachment': {
+ 'download': self.filename,
+ 'type': self.content_type,
+ 'href': 'data:'+self.content_type+';base64,' + base64.b64encode(self.document),
+ 'md5sum': self.md5sum.hexdigest()
+ },
+ 'document_type': self.document_type,
+ 'description': self.description,
+ 'award': self.award,
+ 'lab': self.lab,
+ }
+ if self.aliases:
+ document_payload['aliases'] = self.aliases
+ if self.references:
+ document_payload['references'] = self.references
+ if self.urls:
+ document_payload['urls'] = self.urls
+
+ return document_payload
+
+ def post(self, server):
+ document_payload = self.create_payload()
+ return server.post_json('/documents/', document_payload)
+
+ def save(self, filename):
+ payload = self.create_payload()
+ with open(filename, 'w') as outstream:
+ outstream.write(pformat(payload))
+
+ def create_if_needed(self, server, uuid):
+ self.uuid = uuid
+ if uuid is None:
+ return self.post(server)
+ else:
+ return server.get_json(uuid, embed=False)
+
if __name__ == '__main__':
# try it
from htsworkflow.util.rdfhelp import get_model, dump_model