1 """Common functions for accessing the HTS Workflow REST API
4 from ConfigParser import SafeConfigParser
8 # try to deal with python <2.6
12 import simplejson as json
15 from optparse import OptionGroup
20 LOGGER = logging.getLogger(__name__)
22 def add_auth_options(parser):
23 """Add options OptParser configure authentication options
25 # Load defaults from the config files
26 config = SafeConfigParser()
27 config.read([os.path.expanduser('~/.htsworkflow.ini'),
28 '/etc/htsworkflow.ini'
31 sequence_archive = None
35 SECTION = 'sequence_archive'
36 if config.has_section(SECTION):
37 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
38 sequence_archive = os.path.expanduser(sequence_archive)
39 apiid = config.get(SECTION, 'apiid', apiid)
40 apikey = config.get(SECTION, 'apikey', apikey)
41 apihost = config.get(SECTION, 'host', apihost)
43 # configuration options
44 group = OptionGroup(parser, "htsw api authentication")
45 group.add_option('--apiid', default=apiid, help="Specify API ID")
46 group.add_option('--apikey', default=apikey, help="Specify API KEY")
47 group.add_option('--host', default=apihost,
48 help="specify HTSWorkflow host",)
49 group.add_option('--sequence', default=sequence_archive,
50 help="sequence repository")
51 parser.add_option_group(group)
54 def make_auth_from_opts(opts, parser=None):
55 """Create htsw auth info dictionary from optparse info
57 if opts.host is None or opts.apiid is None or opts.apikey is None:
58 if parser is not None:
59 parser.error("Please specify host url, apiid, apikey")
61 raise RuntimeError("Need host, api id api key")
63 return {'apiid': opts.apiid, 'apikey': opts.apikey }
66 def library_url(root_url, library_id):
68 Return the url for retrieving information about a specific library.
71 library_id (str): the library id of interest
72 root_url (str): the root portion of the url, e.g. http://localhost
75 str. The url to use for this REST api.
77 >>> print library_url('http://localhost', '12345')
78 http://localhost/samples/library/12345/json
81 url_fragment = '/samples/library/%s/json' % (library_id,)
82 url = urlparse.urljoin(root_url, url_fragment)
87 def flowcell_url(root_url, flowcell_id):
89 Return the url for retrieving information about a specific flowcell.
92 root_url (str): the root portion of the url, e.g. http://localhost
93 flowcell_id (str): the flowcell id of interest
96 str. The url to use for this REST api.
98 >>> print flowcell_url('http://localhost', '1234AAXX')
99 http://localhost/experiments/config/1234AAXX/json
101 url_fragment = '/experiments/config/%s/json' % (flowcell_id,)
102 url = urlparse.urljoin(root_url, url_fragment)
107 def lanes_for_user_url(root_url, username):
109 Return the url for returning all the lanes associated with a username
112 username (str): a username in your target filesystem
113 root_url (str): the root portion of the url, e.g. http://localhost
116 str. The url to use for this REST api.
118 >>> print lanes_for_user_url('http://localhost', 'diane')
119 http://localhost/lanes_for/diane/json
122 url_fragment = '/lanes_for/%s/json' % (username,)
123 url = urlparse.urljoin(root_url, url_fragment)
127 def retrieve_info(url, apidata):
129 Return a dictionary from the HTSworkflow API
132 apipayload = urllib.urlencode(apidata)
133 web = urllib2.urlopen(url, apipayload)
134 except urllib2.URLError, e:
135 if hasattr(e, 'code') and e.code == 404:
136 LOGGER.info("%s was not found" % (url,))
139 errmsg = 'URLError: %s' % (str(e))
140 raise IOError(errmsg)
142 contents = web.read()
145 return json.loads(contents)
147 class HtswApi(object):
148 def __init__(self, root_url, authdata):
149 self.root_url = root_url
150 self.authdata = authdata
152 def get_flowcell(self, flowcellId):
153 url = flowcell_url(self.root_url, flowcellId)
154 return retrieve_info(url, self.authdata)
156 def get_library(self, libraryId):
157 url = library_url(self.root_url, libraryId)
158 return retrieve_info(url, self.authdata)
160 def get_lanes_for_user(self, user):
161 url = lanes_for_user(self.root_url, user)
162 return retrieve_info(url, self.authdata)
164 def get_url(self, url):
165 return retrieve_info(url, self.authdata)
167 def make_django_secret_key(size=216):
168 """return key suitable for use as secret key"""
170 source = random.SystemRandom()
171 except AttributeError, e:
172 source = random.random()
173 bits = source.getrandbits(size)
177 chars.append(chr(byte))
179 return base64.encodestring("".join(chars)).strip()
181 if __name__ == "__main__":
182 from optparse import OptionParser
183 from pprint import pprint
184 parser = OptionParser()
185 parser = add_auth_options(parser)
186 parser.add_option('--flowcell', default=None)
187 parser.add_option('--library', default=None)
189 opts, args = parser.parse_args()
190 apidata = make_auth_from_opts(opts)
192 api = HtswApi(opts.host, apidata)
194 if opts.flowcell is not None:
195 pprint(api.get_flowcell(opts.flowcell))
196 if opts.library is not None:
197 pprint(api.get_library(opts.library))