"""Common functions for accessing the HTS Workflow REST API
"""
-from ConfigParser import SafeConfigParser
+import base64
+from configparser import SafeConfigParser
+import random
import logging
# try to deal with python <2.6
import os
from optparse import OptionGroup
-import urllib
-import urllib2
-import urlparse
+import urllib.request, urllib.parse, urllib.error
+import urllib.request, urllib.error, urllib.parse
+import urllib.parse
LOGGER = logging.getLogger(__name__)
group.add_option('--sequence', default=sequence_archive,
help="sequence repository")
parser.add_option_group(group)
+ return parser
-def make_auth_from_opts(opts, parser):
+def make_auth_from_opts(opts, parser=None):
"""Create htsw auth info dictionary from optparse info
"""
if opts.host is None or opts.apiid is None or opts.apikey is None:
- parser.error("Please specify host url, apiid, apikey")
+ if parser is not None:
+ parser.error("Please specify host url, apiid, apikey")
+ else:
+ raise RuntimeError("Need host, api id api key")
return {'apiid': opts.apiid, 'apikey': opts.apikey }
"""
url_fragment = '/samples/library/%s/json' % (library_id,)
- url = urlparse.urljoin(root_url, url_fragment)
+ url = urllib.parse.urljoin(root_url, url_fragment)
return url
http://localhost/experiments/config/1234AAXX/json
"""
url_fragment = '/experiments/config/%s/json' % (flowcell_id,)
- url = urlparse.urljoin(root_url, url_fragment)
+ url = urllib.parse.urljoin(root_url, url_fragment)
return url
"""
url_fragment = '/lanes_for/%s/json' % (username,)
- url = urlparse.urljoin(root_url, url_fragment)
+ url = urllib.parse.urljoin(root_url, url_fragment)
return url
Return a dictionary from the HTSworkflow API
"""
try:
- apipayload = urllib.urlencode(apidata)
- web = urllib2.urlopen(url, apipayload)
- except urllib2.URLError, e:
+ apipayload = urllib.parse.urlencode(apidata)
+ web = urllib.request.urlopen(url, apipayload)
+ except urllib.error.URLError as e:
if hasattr(e, 'code') and e.code == 404:
LOGGER.info("%s was not found" % (url,))
return None
return json.loads(contents)
class HtswApi(object):
- def __init__(self, root_url, authdata):
- self.root_url = root_url
- self.authdata = authdata
+ def __init__(self, root_url, authdata):
+ self.root_url = root_url
+ self.authdata = authdata
+
+ def get_flowcell(self, flowcellId):
+ url = flowcell_url(self.root_url, flowcellId)
+ return retrieve_info(url, self.authdata)
- def get_flowcell(self, flowcellId):
- url = flowcell_url(self.root_url, flowcellId)
- return retrieve_info(url, self.authdata)
+ def get_library(self, libraryId):
+ url = library_url(self.root_url, libraryId)
+ return retrieve_info(url, self.authdata)
- def get_library(self, libraryId):
- url = library_url(self.root_url, libraryId)
- return retrieve_info(url, self.authdata)
+ def get_lanes_for_user(self, user):
+ url = lanes_for_user(self.root_url, user)
+ return retrieve_info(url, self.authdata)
- def get_lanes_for_user(self, user):
- url = lanes_for_user(self.root_url, user)
- return retrieve_info(url, self.authdata)
+ def get_url(self, url):
+ return retrieve_info(url, self.authdata)
- def get_url(self, url):
- return retrieve_info(url, self.authdata)
+def make_django_secret_key(size=216):
+ """return key suitable for use as secret key"""
+ try:
+ source = random.SystemRandom()
+ except AttributeError as e:
+ source = random.random()
+ bits = source.getrandbits(size)
+ chars = []
+ while bits > 0:
+ byte = bits & 0xff
+ chars.append(chr(byte))
+ bits >>= 8
+ return base64.encodestring("".join(chars)).strip()
+
+if __name__ == "__main__":
+ from optparse import OptionParser
+ from pprint import pprint
+ parser = OptionParser()
+ parser = add_auth_options(parser)
+ parser.add_option('--flowcell', default=None)
+ parser.add_option('--library', default=None)
+
+ opts, args = parser.parse_args()
+ apidata = make_auth_from_opts(opts)
+
+ api = HtswApi(opts.host, apidata)
+
+ if opts.flowcell is not None:
+ pprint(api.get_flowcell(opts.flowcell))
+ if opts.library is not None:
+ pprint(api.get_library(opts.library))