"""
#!/usr/bin/env python
-from ConfigParser import SafeConfigParser
+from configparser import SafeConfigParser
import fnmatch
from glob import glob
import json
import os
from pprint import pprint, pformat
import shlex
-from StringIO import StringIO
+from io import StringIO
import stat
import sys
import time
import types
-import urllib
-import urllib2
-import urlparse
+import urllib.request, urllib.parse, urllib.error
+import urllib.request, urllib.error, urllib.parse
+import urllib.parse
from zipfile import ZipFile
import RDF
if opts.print_rdf:
writer = get_serializer()
- print writer.serialize_model_to_string(model)
+ print(writer.serialize_model_to_string(model))
def make_manifest(mapper, results, filename=None):
# redland rdf lib
import RDF
import sys
-import urllib
-import urlparse
+import urllib.request, urllib.parse, urllib.error
+import urllib.parse
if not 'DJANGO_SETTINGS_MODULE' in os.environ:
os.environ['DJANGO_SETTINGS_MODULE'] = 'htsworkflow.settings'
if opts.print_rdf:
serializer = get_serializer(name=opts.rdf_parser_name)
- print serializer.serialize_model_to_string(model)
+ print(serializer.serialize_model_to_string(model))
def make_parser():
for row in results:
subid = row['subid']
name = row['name']
- print "# {0}".format(name)
- print "<{0}>".format(subid.uri)
- print " encodeSubmit:library_urn "\
- "<http://jumpgate.caltech.edu/library/> ."
- print ""
+ print("# {0}".format(name))
+ print("<{0}>".format(subid.uri))
+ print(" encodeSubmit:library_urn "\
+ "<http://jumpgate.caltech.edu/library/> .")
+ print("")
def find_submissions_with_no_library(model):
missing_lib_query_text = """
load_library_detail(model, library_urn)
def user_library_id_to_library_urn(library_id):
- split_url = urlparse.urlsplit(library_id)
+ split_url = urllib.parse.urlsplit(library_id)
if len(split_url.scheme) == 0:
return LIBRARY_NS[library_id]
else:
lib_term = submissionOntology['library_urn']
sub_term = submissionOntology['submission_urn']
- for filename, attributes in file_index.items():
+ for filename, attributes in list(file_index.items()):
s = RDF.Node(RDF.Uri(filename))
model.add_statement(
RDF.Statement(s, TYPE_N, submissionOntology['ucsc_track']))
- for name, value in attributes.items():
+ for name, value in list(attributes.items()):
p = RDF.Node(DCC_NS[name])
o = RDF.Node(value)
model.add_statement(RDF.Statement(s,p,o))
try:
body = get_url_as_text(str(libraryUrn.uri), 'GET')
rdfaParser.parse_string_into_model(model, body, libraryUrn.uri)
- except httplib2.HttpLib2ErrorWithResponse, e:
+ except httplib2.HttpLib2ErrorWithResponse as e:
LOGGER.error(str(e))
elif len(results) == 1:
pass # Assuming that a loaded dataset has one record
response, content = http.request(LOGIN_URL,
'POST',
headers=headers,
- body=urllib.urlencode(credentials))
+ body=urllib.parse.urlencode(credentials))
LOGGER.debug("Login to {0}, status {1}".format(LOGIN_URL,
response['status']))
for lib_id, subobj in subl:
libraries.setdefault(lib_id, []).append(subobj)
- for submission in libraries.values():
+ for submission in list(libraries.values()):
submission.sort(key=attrgetter('date'), reverse=True)
return libraries
#!/usr/bin/env python
-from ConfigParser import SafeConfigParser
+from configparser import SafeConfigParser
import fnmatch
from glob import glob
import json
import os
from pprint import pprint, pformat
import shlex
-from StringIO import StringIO
+from io import StringIO
import stat
import sys
import time
import types
-import urllib
-import urllib2
-import urlparse
+import urllib.request, urllib.parse, urllib.error
+import urllib.request, urllib.error, urllib.parse
+import urllib.parse
from zipfile import ZipFile
import RDF
if opts.print_rdf:
writer = get_serializer()
- print writer.serialize_model_to_string(model)
+ print(writer.serialize_model_to_string(model))
def make_parser():
if opts.rdf:
print_rdf(common_extensions)
else:
- print common_extensions
+ print(common_extensions)
def make_parser():
parser = OptionParser("%prog: directory [directory...]")
return "".join(tail[::-1])
results = []
- for key, choice in index.items():
+ for key, choice in list(index.items()):
r = find_common_suffix(choice, tail+[key])
if r is not None:
results.append (r)
writer = rdfhelp.get_serializer()
writer.set_namespace('thisSubmissionView', subView._prefix)
- print writer.serialize_model_to_string(model)
+ print(writer.serialize_model_to_string(model))
if __name__ == "__main__":
main()
TYPE_N = rdfNS['type']
CREATION_DATE = libraryOntology['date']
-from encode_find import DBDIR
+from .encode_find import DBDIR
DEFAULT_GENOME='hg19'
DEFAULT_OUTPUT='/tmp/submission_report.html'
#!/usr/bin/env python
from datetime import datetime
import os
-from unittest2 import TestCase
+from unittest import TestCase
import RDF
-import encode_find
+from . import encode_find
from htsworkflow.submission.ucsc import submission_view_url
from htsworkflow.util.rdfhelp import add_default_schemas, \
dump_model, get_model, fromTypedNode
'12097')
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestEncodeFind))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main()
-from unittest2 import TestCase, TestSuite, defaultTestLoader
+from unittest import TestCase, TestSuite, defaultTestLoader
-import ucsc_gather
+from . import ucsc_gather
class testUCSCGather(TestCase):
pass
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest='suite')
#!/usr/bin/env python
-from ConfigParser import SafeConfigParser
+from configparser import SafeConfigParser
import fnmatch
from glob import glob
import json
import os
from pprint import pprint, pformat
import shlex
-from StringIO import StringIO
+from io import StringIO
import stat
import sys
import time
import types
-import urllib
-import urllib2
-import urlparse
+import urllib.request, urllib.parse, urllib.error
+import urllib.request, urllib.error, urllib.parse
+import urllib.parse
from zipfile import ZipFile
import RDF
if opts.print_rdf:
writer = get_serializer()
- print writer.serialize_model_to_string(model)
+ print(writer.serialize_model_to_string(model))
def make_parser():
def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
dag_fragment = []
- for lib_id, result_dir in library_result_map.items():
+ for lib_id, result_dir in list(library_result_map.items()):
submissionNode = view_map.get_submission_node(result_dir)
dag_fragment.extend(
make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
else:
current[variable_name] = value
- for view in all_views.keys():
+ for view in list(all_views.keys()):
line = []
for variable_name in variables:
if variable_name in ('files', 'md5sum'):
-import ConfigParser
+import configparser
import copy
import logging
import logging.handlers
import sys
import time
import traceback
-import urlparse
+import urllib.parse
from benderjab import rpc
# We made sure source ends in a / earlier
cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
entries.extend(cur_list)
- LOGGER.debug(u"Found the following: %s" % (unicode(entries)))
+ LOGGER.debug("Found the following: %s" % (str(entries)))
return entries
def list_filter(self, lines):
"""
dirs_to_copy = []
direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
- LOGGER.debug(u'direntries: %s' % (unicode(direntries),))
+ LOGGER.debug('direntries: %s' % (str(direntries),))
for permissions, size, filedate, filetime, filename in direntries:
if permissions[0] == 'd':
# hey its a directory, the first step to being something we want to
return path roots that have finished.
"""
- for dir_key, proc_value in self.processes.items():
+ for dir_key, proc_value in list(self.processes.items()):
retcode = proc_value.poll()
if retcode is None:
# process hasn't finished yet
"""
Return list of current run folder names
"""
- return self.processes.keys()
+ return list(self.processes.keys())
class CopierBot(rpc.XmlRpcBot):
def __init__(self, section=None, configfile=None):
require_resource=True)
except bot.JIDMissingResource:
msg = 'need a full jabber ID + resource for xml-rpc destinations'
- print >>sys.stderr, msg
+ print(msg, file=sys.stderr)
raise bot.JIDMissingResource(msg)
def run(self):
start our copy
"""
# Note, args comes in over the network, so don't trust it.
- LOGGER.debug("Arguments to startCopy %s" % (unicode(args),))
+ LOGGER.debug("Arguments to startCopy %s" % (str(args),))
copy_urls = []
for a in args:
clean_url = self.validate_url(a)
"""
self.rsync.poll()
for p in self.pending:
- if p not in self.rsync.keys():
+ if p not in list(self.rsync.keys()):
self.reportSequencingFinished(p)
self.pending.remove(p)
"""
Parse xmpp chat messages
"""
- help = u"I can [copy], or report current [status]"
- if re.match(u"help", msg):
+ help = "I can [copy], or report current [status]"
+ if re.match("help", msg):
reply = help
elif re.match("copy", msg):
started = self.startCopy()
- reply = u"started copying " + ", ".join(started)
- elif re.match(u"status", msg):
- msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
- for d in self.rsync.keys():
- msg.append(u" " + d)
+ reply = "started copying " + ", ".join(started)
+ elif re.match("status", msg):
+ msg = ["Currently %d rsync processes are running." % (len(self.rsync))]
+ for d in list(self.rsync.keys()):
+ msg.append(" " + d)
reply = os.linesep.join(msg)
else:
- reply = u"I didn't understand '%s'" % (unicode(msg))
+ reply = "I didn't understand '%s'" % (str(msg))
return reply
def validate_url(self, url):
- user_url = urlparse.urlsplit(url)
+ user_url = urllib.parse.urlsplit(url)
user_scheme = user_url[0]
user_netloc = user_url[1]
user_path = user_url[2]
for source in self.sources:
- source_url = urlparse.urlsplit(source)
+ source_url = urllib.parse.urlsplit(source)
source_scheme = source_url[0]
source_netloc = source_url[1]
source_path = source_url[2]
"""
Parse xmpp chat messages
"""
- help = u"I can send [start] a run, or report [status]"
- if re.match(u"help", msg):
+ help = "I can send [start] a run, or report [status]"
+ if re.match("help", msg):
reply = help
elif re.match("status", msg):
words = msg.split()
if len(words) == 2:
reply = self.getStatusReport(words[1])
else:
- reply = u"Status available for: %s" \
- % (', '.join([k for k in self.conf_info_dict.keys()]))
- elif re.match(u"start", msg):
+ reply = "Status available for: %s" \
+ % (', '.join([k for k in list(self.conf_info_dict.keys())]))
+ elif re.match("start", msg):
words = msg.split()
if len(words) == 2:
self.sequencingFinished(words[1])
- reply = u"starting run for %s" % (words[1])
+ reply = "starting run for %s" % (words[1])
else:
- reply = u"need runfolder name"
- elif re.match(u"path", msg):
- reply = u"My path is: " + unicode(os.environ['PATH'])
+ reply = "need runfolder name"
+ elif re.match("path", msg):
+ reply = "My path is: " + str(os.environ['PATH'])
else:
- reply = u"I didn't understand '%s'" %(msg)
+ reply = "I didn't understand '%s'" %(msg)
LOGGER.debug("reply: " + str(reply))
return reply
#startCmdLineStatusMonitor(ci)
# running step
- print 'Running pipeline now!'
+ print('Running pipeline now!')
run_status = run_pipeline(conf_info)
if run_status is True:
LOGGER.info('Runner: Pipeline: success')
complete = "(completed)"
else:
complete = ""
- return u"<WatchEvent: %s %s %s>" % (time.ctime(self.time), self.event_root, complete)
+ return "<WatchEvent: %s %s %s>" % (time.ctime(self.time), self.event_root, complete)
class Handler(pyinotify.ProcessEvent):
def __init__(self, watchmanager, bot, completion_files=None):
# if we've already seen an event in this directory (AKA runfolder)
# keep track if its already hit the "completed" flag
- if watch_path_events.has_key(target):
+ if target in watch_path_events:
run_already_complete = watch_path_events[target].complete
watch_path_events[target] = WatcherEvent(target)
mounts.append(w)
self.mounts_to_watches[mount_location] = mounts
- self.log.info(u"Watching:"+unicode(w))
+ self.log.info("Watching:"+str(w))
self.wdds.append(self.wm.add_watch(w, mask, rec=True, auto_add=True))
def unmount_watch(self, event_path):
# the list getting shorter
for i in range(len(self.wdds),0, -1):
wdd = self.wdds[i]
- self.log.info(u'unmounting: '+unicode(wdd.items()))
- self.wm.rm_watch(wdd.values())
+ self.log.info('unmounting: '+str(list(wdd.items())))
+ self.wm.rm_watch(list(wdd.values()))
del self.wdds[i]
self.mounted = False
self.notifier.read_events()
# should we do something?
# has something happened?
- for watchdir, last_events in self.handler.last_event.items():
- for last_event_dir, last_event_detail in last_events.items():
+ for watchdir, last_events in list(self.handler.last_event.items()):
+ for last_event_dir, last_event_detail in list(last_events.items()):
time_delta = time.time() - last_event_detail.time
if time_delta > self.write_timeout:
- LOGGER.info("timeout: %s" % (unicode(last_event_detail),))
+ LOGGER.info("timeout: %s" % (str(last_event_detail),))
copy_url = self.make_copy_url(watchdir, last_event_dir)
self.startCopy(copy_url)
if last_event_detail.complete:
self.handler.last_event[watchdir] = {}
# handle unmounted filesystems
- for mount_point, was_mounted in self.mounted_points.items():
+ for mount_point, was_mounted in list(self.mounted_points.items()):
if not was_mounted and mount.is_mounted(mount_point):
# we've been remounted. Huzzah!
# restart the watch
"""
Parse xmpp chat messages
"""
- help = u"I can send [copy] message, or squencer [finished]"
- if re.match(u"help", msg):
+ help = "I can send [copy] message, or squencer [finished]"
+ if re.match("help", msg):
reply = help
elif re.match("copy", msg):
self.startCopy(msg)
- reply = u"sent copy message"
- elif re.match(u"finished", msg):
+ reply = "sent copy message"
+ elif re.match("finished", msg):
words = msg.split()
if len(words) == 2:
self.sequencingFinished(words[1])
- reply = u"sending sequencing finished for %s" % (words[1])
+ reply = "sending sequencing finished for %s" % (words[1])
else:
- reply = u"need runfolder name"
+ reply = "need runfolder name"
else:
- reply = u"I didn't understand '%s'" %(msg)
+ reply = "I didn't understand '%s'" %(msg)
return reply
def run(self):
-from unittest2 import TestCase
+from unittest import TestCase
import os
from htsworkflow.automation.solexa import is_runfolder
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testRunner))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.automation import solexa
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testSolexaRunfolderUtils))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
ClIP = request.META['REMOTE_ADDR']
#Check client access permission
granted = False
- if (settings.ALLOWED_ANALYS_IPS.has_key(ClIP)): granted = True
+ if (ClIP in settings.ALLOWED_ANALYS_IPS): granted = True
if not granted: return HttpResponse("access denied.")
output=''
taskid=-1;
# Check required param
- if request.has_key('taskid'): taskid = request['taskid']
+ if 'taskid' in request: taskid = request['taskid']
else: return HttpResponse('missing param task id')
try:
rec = Task.objects.get(id=taskid)
mytimestamp = datetime.now().__str__()
mytimestamp = re.sub(pattern=":[^:]*$",repl="",string=mytimestamp)
- if request.has_key('msg'):
+ if 'msg' in request:
rec.task_status += ", "+request['msg']+" ("+mytimestamp+")"
else :
rec.task_status = "Registered ("+mytimestamp+")"
ClIP = request.META['REMOTE_ADDR']
#Check client access permission
granted = False
- if (settings.ALLOWED_ANALYS_IPS.has_key(ClIP)): granted = True
+ if (ClIP in settings.ALLOWED_ANALYS_IPS): granted = True
if not granted: return HttpResponse("access denied.")
outputfile = ''
All=False
- if (request.has_key('mode')):
+ if ('mode' in request):
if request['mode']=='all':
All=True
from django.core.exceptions import PermissionDenied
from django.conf import settings
-apidata = {'apiid': u'0', 'apikey': settings.DEFAULT_API_KEY}
+apidata = {'apiid': '0', 'apikey': settings.DEFAULT_API_KEY}
def require_api_key(request):
# make sure we have the api component
- if not (request.REQUEST.has_key('apiid') or request.REQUEST.has_key('apikey')):
+ if not ('apiid' in request.REQUEST or 'apikey' in request.REQUEST):
raise PermissionDenied
# make sure the id and key are right
notes = models.TextField()
def __unicode__(self):
- return u'%s, %s, %s, %s, %sx%s' % (self.name, self.model, self.ip_address, self.label_shape, self.label_width, self.label_width)
\ No newline at end of file
+ return '%s, %s, %s, %s, %sx%s' % (self.name, self.model, self.ip_address, self.label_shape, self.label_width, self.label_width)
\ No newline at end of file
if label in _SEARCH_FUNCTIONS:
msg = "search function for label (%s) already registered." % (label)
- raise ValueError, msg
+ raise ValueError(msg)
_SEARCH_FUNCTIONS[label] = search_function
\ No newline at end of file
import ftplib
import socket
-import StringIO
+import io
def print_zpl(zpl_text, host=settings.BCPRINTER_PRINTER1_HOST):
"""
ftp = ftplib.FTP(host=host, user='blank', passwd='')
ftp.login()
- ftp.storlines("STOR printme.txt", StringIO.StringIO(zpl_text))
+ ftp.storlines("STOR printme.txt", io.StringIO(zpl_text))
ftp.quit()
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
import re
"""
hits = []
- for label, search_func in plugin._SEARCH_FUNCTIONS.items():
+ for label, search_func in list(plugin._SEARCH_FUNCTIONS.items()):
result = search_func(text)
if result is not None:
hits.extend(result)
# Attempt to find a KeywordMap based on keyword
try:
keymap = models.KeywordMap.objects.get(keyword=keyword)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
return report_error('Keyword (%s) is not defined' % (keyword))
# Remove keyword and only scan the content
return self.as_divs()
def as_divs(self):
- if not self: return u''
- return u'<div class="errorlist">%s</div>' % (''.join([u'<div class="error">%s</div>' % e for e in self]))
+ if not self: return ''
+ return '<div class="errorlist">%s</div>' % (''.join(['<div class="error">%s</div>' % e for e in self]))
#Convert all newline conventions to unix style
for lane in fcObj.lane_set.all():
data.append("# Lane%d: %s | %s" % \
- (lane.lane_number, unicode(lane.library.id), lane.library.library_name.replace('%', '%%')))
+ (lane.lane_number, str(lane.library.id), lane.library.library_name.replace('%', '%%')))
#data.append("GENOME_DIR %s" % (BASE_DIR))
#data.append("CONTAM_DIR %s" % (BASE_DIR))
#l1s = form['lane1_species']
for lane in fcObj.lane_set.all():
species = lane.library.library_species.scientific_name
- genome_dict.setdefault(species, []).append(unicode(lane.lane_number))
+ genome_dict.setdefault(species, []).append(str(lane.lane_number))
- genome_list = genome_dict.keys()
+ genome_list = list(genome_dict.keys())
genome_list.sort()
#Loop through and create entries for each species.
output = []
for option_value, option_label in chain(self.choices, choices):
if isinstance(option_label, (list, tuple)):
- output.append(u'<optgroup label="%s">' % escape(force_unicode(option_value)))
+ output.append('<optgroup label="%s">' % escape(force_unicode(option_value)))
for option in option_label:
output.append(self.render_option(selected_choices, *option))
- output.append(u'</optgroup>')
+ output.append('</optgroup>')
else:
output.append(self.render_option(selected_choices, option_value, option_label))
- return u'\n'.join(output)
+ return '\n'.join(output)
# render_options blatently grabbed from 1.3.1 as the 1.2 version
# has render_option, which is what I needed to overload as a
output = []
for option_value, option_label in chain(self.choices, choices):
if isinstance(option_label, (list, tuple)):
- output.append(u'<optgroup label="%s">' % escape(force_unicode(option_value)))
+ output.append('<optgroup label="%s">' % escape(force_unicode(option_value)))
for option in option_label:
output.append(self.render_option(selected_choices, *option))
- output.append(u'</optgroup>')
+ output.append('</optgroup>')
else:
output.append(self.render_option(selected_choices, option_value, option_label))
- return u'\n'.join(output)
+ return '\n'.join(output)
def render_option(self, selected_choices, option_value, option_label):
- disabled_sequencers = [ unicode(s.id) for s in self.queryset.filter(active=False) ]
- option_value = unicode(option_value)
- selected_html = (option_value in selected_choices) and u' selected="selected"' or ''
+ disabled_sequencers = [ str(s.id) for s in self.queryset.filter(active=False) ]
+ option_value = str(option_value)
+ selected_html = (option_value in selected_choices) and ' selected="selected"' or ''
cssclass = "strikeout" if option_value in disabled_sequencers else ''
- return u'<option class="%s" value="%s"%s>%s</option>' % (
+ return '<option class="%s" value="%s"%s>%s</option>' % (
cssclass, escape(option_value), selected_html,
conditional_escape(force_unicode(option_label)))
from datetime import datetime, timedelta
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
import os
"""
try:
fc = FlowCell.objects.get(flowcell_id__startswith=flowcell_id)
- except FlowCell.DoesNotExist, e:
+ except FlowCell.DoesNotExist as e:
return None
lane_set = {}
'library_name': lane.library.library_name,
'library_id': lane.library.id,
'library_species': lane.library.library_species.scientific_name,
- 'pM': unicode(lane.pM),
+ 'pM': str(lane.pM),
'read_length': lane.flowcell.read_length,
'status_code': lane.status,
'status': LANE_STATUS_MAP[lane.status]
try:
result = lanes_for(username)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
raise Http404
#convert query set to python structure
user = request.user
#Check access permission
- if not (user.is_superuser and settings.ALLOWED_IPS.has_key(ClIP)):
+ if not (user.is_superuser and ClIP in settings.ALLOWED_IPS):
return HttpResponse("%s access denied from %s." % (user, ClIP))
# ~~~~~~Parameters for the job ~~~~
- if request.REQUEST.has_key('fcid'):
+ if 'fcid' in request.REQUEST:
fcid = request.REQUEST['fcid']
else:
return HttpResponse('missing fcid')
- if request.REQUEST.has_key('runf'):
+ if 'runf' in request.REQUEST:
runfolder = request.REQUEST['runf']
else:
return HttpResponse('missing runf')
- if request.REQUEST.has_key('updst'):
+ if 'updst' in request.REQUEST:
UpdatedStatus = request.REQUEST['updst']
else:
return HttpResponse('missing status')
#if there's a message update that too
mytimestamp = datetime.now().__str__()
mytimestamp = re.sub(pattern=":[^:]*$",repl="",string=mytimestamp)
- if request.REQUEST.has_key('msg'):
+ if 'msg' in request.REQUEST:
rec.run_note += ", "+request.REQUEST['msg']+" ("+mytimestamp+")"
else :
if UpdatedStatus == '1':
def getConfile(req):
granted = False
ClIP = req.META['REMOTE_ADDR']
- if (settings.ALLOWED_IPS.has_key(ClIP)): granted = True
+ if (ClIP in settings.ALLOWED_IPS): granted = True
if not granted: return HttpResponse("access denied. IP: "+ClIP)
cnfgfile = 'Nothing found'
runfolder = 'unknown'
request = req.REQUEST
- if request.has_key('fcid'):
+ if 'fcid' in request:
fcid = request['fcid']
- if request.has_key('runf'):
+ if 'runf' in request:
runfolder = request['runf']
try:
rec = DataRun.objects.get(run_folder=runfolder) #,flowcell_id=fcid)
def getLaneLibs(req):
granted = False
ClIP = req.META['REMOTE_ADDR']
- if (settings.ALLOWED_IPS.has_key(ClIP)): granted = True
+ if (ClIP in settings.ALLOWED_IPS): granted = True
if not granted: return HttpResponse("access denied.")
request = req.REQUEST
fcid = 'none'
outputfile = ''
- if request.has_key('fcid'):
+ if 'fcid' in request:
fcid = request['fcid']
try:
rec = FlowCell.objects.get(flowcell_id=fcid)
default_pM = 5
try:
default_pM = int(settings.DEFAULT_PM)
-except ValueError, e:
+except ValueError as e:
LOGGER.error("invalid value for frontend.default_pm")
# how many days to wait before trying to re-import a runfolder
ordering = ["-isdefault", "name"]
def __unicode__(self):
- return unicode(self.name)
+ return str(self.name)
@classmethod
def default(cls):
ordering = ["-isdefault", "-active", "name"]
def __unicode__(self):
- name = [unicode(self.name)]
+ name = [str(self.name)]
if self.instrument_name is not None:
- name.append("(%s)" % (unicode(self.instrument_name),))
+ name.append("(%s)" % (str(self.instrument_name),))
return " ".join(name)
@models.permalink
notes = models.TextField(blank=True)
def __unicode__(self):
- return unicode(self.flowcell_id)
+ return str(self.flowcell_id)
def Lanes(self):
html = ['<table>']
"""Convert our boolean 'is paired' flag to a name
"""
if self.paired_end:
- return u"Paired"
+ return "Paired"
else:
- return u"Single"
+ return "Single"
@models.permalink
def get_absolute_url(self):
[str(self.id)])
def __unicode__(self):
- return self.flowcell.flowcell_id + ':' + unicode(self.lane_number)
+ return self.flowcell.flowcell_id + ':' + str(self.lane_number)
class DataRun(models.Model):
from lxml.html import fromstring
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
import os
import shutil
import sys
import tempfile
-from urlparse import urljoin
+from urllib.parse import urljoin
from django.conf import settings
from django.core import mail
from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
-LANE_SET = range(1,9)
+LANE_SET = list(range(1,9))
NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
"""
Check the code that packs the django objects into simple types.
"""
- for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
+ for fc_id in ['FC12150', "42JTNAAXX", "42JU1AAXX"]:
fc_dict = experiments.flowcell_information(fc_id)
fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
self.assertEqual(fc_dict['flowcell_id'], fc_id)
for lane in fc_django.lane_set.all():
- lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
+ lane_contents = fc_json['lane_set'][str(lane.lane_number)]
lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
"""
Require logging in to retrieve meta data
"""
- response = self.client.get(u'/experiments/config/FC12150/json')
+ response = self.client.get('/experiments/config/FC12150/json')
self.assertEqual(response.status_code, 403)
def test_library_id(self):
This tests to make sure that the value entered in the raw library id field matches
the library id looked up.
"""
- expected_ids = [u'10981',u'11016',u'SL039',u'11060',
- u'11061',u'11062',u'11063',u'11064']
+ expected_ids = ['10981','11016','SL039','11060',
+ '11061','11062','11063','11064']
self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
response = self.client.get('/admin/experiments/flowcell/153/')
lane_dict = multi_lane_to_dict(lane_contents)
self.assertEqual(lane_dict['12044']['index_sequence'],
- {u'1': u'ATCACG',
- u'2': u'CGATGT',
- u'3': u'TTAGGC'})
+ {'1': 'ATCACG',
+ '2': 'CGATGT',
+ '3': 'TTAGGC'})
self.assertEqual(lane_dict['11045']['index_sequence'],
- {u'1': u'ATCACG'})
+ {'1': 'ATCACG'})
count = 0
for r in query.execute(model):
count += 1
- self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
+ self.assertEqual(fromTypedNode(r['flowcell_id']), '42JU1AAXX')
lane_id = fromTypedNode(r['lane_id'])
library_id = fromTypedNode(r['library_id'])
self.assertTrue(library_id in expected[lane_id])
file_type_objects = models.FileType.objects
name = 'QSEQ tarfile'
file_type_object = file_type_objects.get(name=name)
- self.assertEqual(u"QSEQ tarfile",
- unicode(file_type_object))
+ self.assertEqual("QSEQ tarfile",
+ str(file_type_object))
def test_find_file_type(self):
file_type_objects = models.FileType.objects
seq.instrument_name = "HWI-SEQ1"
seq.model = "Imaginary 5000"
- self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
+ self.assertEqual(str(seq), "Seq1 (HWI-SEQ1)")
def test_lookup(self):
fc = models.FlowCell.objects.get(pk=153)
teardown_test_environment()
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
for testcase in [ClusterStationTestCases,
SequencerTestCases,
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
warnings.append((user.admin_url(), user.username))
user=None
- for user_email in email_lane.keys():
+ for user_email in list(email_lane.keys()):
sending = ""
# build body
context = RequestContext(request,
- {u'flowcell': fc,
- u'lanes': email_lane[user_email],
- u'runfolder': 'blank',
- u'finish_low': estimate_low,
- u'finish_high': estimate_high,
- u'now': datetime.now(),
+ {'flowcell': fc,
+ 'lanes': email_lane[user_email],
+ 'runfolder': 'blank',
+ 'finish_low': estimate_low,
+ 'finish_high': estimate_high,
+ 'now': datetime.now(),
})
# build view
try:
import uuid
-except ImportError, e:
+except ImportError as e:
# Some systems are using python 2.4, which doesn't have uuid
# this is a stub
LOGGER.warning('Real uuid is not available, initializing fake uuid module')
url = models.URLField(blank=True, null=True)
def __unicode__(self):
- return u"%s" % (self.name)
+ return "%s" % (self.name)
class Location(models.Model):
def __unicode__(self):
if len(self.location_description) > 16:
- return u"%s: %s" % (self.name, self.location_description[0:16]+u"...")
+ return "%s: %s" % (self.name, self.location_description[0:16]+"...")
else:
- return u"%s: %s" % (self.name, self.location_description)
+ return "%s: %s" % (self.name, self.location_description)
pre_save.connect(_assign_uuid, sender=Location)
notes = models.TextField(blank=True, null=True)
def __unicode__(self):
- name = u''
+ name = ''
if self.model_id:
- name += u"model:%s " % (self.model_id)
+ name += "model:%s " % (self.model_id)
if self.part_number:
- name += u"part:%s " % (self.part_number)
+ name += "part:%s " % (self.part_number)
if self.lot_number:
- name += u"lot:%s " % (self.lot_number)
+ name += "lot:%s " % (self.lot_number)
- return u"%s: %s" % (name, self.purchase_date)
+ return "%s: %s" % (name, self.purchase_date)
class Meta:
verbose_name_plural = "Item Info"
description = models.TextField(blank=True, null=True)
def __unicode__(self):
- return u"%s" % (self.name)
+ return "%s" % (self.name)
class ItemStatus(models.Model):
name = models.CharField(max_length=64, unique=True)
def __unicode__(self):
if self.barcode_id is None or len(self.barcode_id) == 0:
- return u"invu|%s" % (self.uuid)
+ return "invu|%s" % (self.uuid)
else:
- return u"invb|%s" % (self.barcode_id)
+ return "invb|%s" % (self.barcode_id)
def get_absolute_url(self):
return '/inventory/%s/' % (self.uuid)
def __unicode__(self):
if self.default:
- return u'%s %s' % (self.item_type.name, self.printer.name)
+ return '%s %s' % (self.item_type.name, self.printer.name)
else:
- return u'%s %s (default)' % (self.item_type.name, self.printer.name)
+ return '%s %s (default)' % (self.item_type.name, self.printer.name)
pre_save.connect(_switch_default, sender=PrinterTemplate)
modified_date = models.DateTimeField(auto_now=True)
def __unicode__(self):
- return u"%s: %s" % (str(self.flowcell), ', '.join([ str(s) for s in self.storage_devices.iterator() ]))
+ return "%s: %s" % (str(self.flowcell), ', '.join([ str(s) for s in self.storage_devices.iterator() ]))
class Meta:
verbose_name_plural = "Long Term Storage"
flowcell = models.ForeignKey(FlowCell)
def __unicode__(self):
- return u"%s: %s" % (str(self.flowcell), ', '.join([ str(s) for s in self.reagent.iterator() ]))
+ return "%s: %s" % (str(self.flowcell), ', '.join([ str(s) for s in self.reagent.iterator() ]))
class ReagentLibrary(ReagentBase):
library = models.ForeignKey(Library)
def __unicode__(self):
- return u"%s: %s" % (str(self.library), ', '.join([ str(s) for s in self.reagent.iterator() ]))
+ return "%s: %s" % (str(self.library), ', '.join([ str(s) for s in self.reagent.iterator() ]))
itemNode = RDF.Node(RDF.Uri(url))
item_type = fromTypedNode(model.get_target(itemNode, inventoryOntology['item_type']))
- self.failUnlessEqual(item_type, u'Hard Drive')
+ self.failUnlessEqual(item_type, 'Hard Drive')
def test_itemindex(self):
url = '/inventory/it/Hard Drive/'
teardown_test_environment()
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(InventoryTestCase))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
INVENTORY_CONTEXT_DEFAULTS = {
printer_template = PrinterTemplate.objects.get(default=True)
except ObjectDoesNotExist:
msg = "No template for item type (%s) and no default template found" % (item_type.name)
- raise ValueError, msg
+ raise ValueError(msg)
return printer_template
"""
try:
item = Item.objects.get(barcode_id=barcode_id)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
item = None
return item_summary_by_uuid(request, None, msg, item)
if item is None:
try:
item = Item.objects.get(uuid=uuid)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
item = None
context_dict = {
"""
try:
item = Item.objects.get(uuid=uuid)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
item = None
msg = "Item with UUID %s does not exist" % (uuid)
# Retrieve Storage Device
try:
sd = Item.objects.get(barcode_id=serial)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
msg = "Item with barcode_id of %s not found." % (serial)
raise ObjectDoesNotExist(msg)
# Retrieve FlowCell
try:
fc = FlowCell.objects.get(flowcell_id__startswith=flowcell)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
msg = "FlowCell with flowcell_id of %s not found." % (flowcell)
raise ObjectDoesNotExist(msg)
lts = None
if count > 1:
msg = "There really should only be one longtermstorage object per flowcell"
- raise ValueError, msg
+ raise ValueError(msg)
elif count == 1:
# lts already attached to flowcell
lts = fc.longtermstorage_set.all()[0]
notes = models.TextField(null=True, blank=True)
def __unicode__(self):
- return u'%s: %s' % (self.name, self.labels)
+ return '%s: %s' % (self.name, self.labels)
class LabelTemplate(models.Model):
"""
"""}
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(SimpleTest))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import re
from xml.sax import make_parser
from xml.sax.handler import ContentHandler
-import urllib
-import urllib2
+import urllib.request, urllib.parse, urllib.error
+import urllib.request, urllib.error, urllib.parse
import os
'''
return arRes
def getWebPage(url,params):
- pdata = urllib.urlencode(params)
- req = urllib2.Request(url,pdata)
- wpage = urllib2.urlopen(req)
+ pdata = urllib.parse.urlencode(params)
+ req = urllib.request.Request(url,pdata)
+ wpage = urllib.request.urlopen(req)
restext = wpage.read()
wpage.close()
return restext
from htsworkflow.frontend.reports.utils import *
import re
##from p1 import LibInfo
-from libinfopar import *
+from .libinfopar import *
## This is a table based REPORT generator. The goal is to display a Progress Report for all the ENCODE projects, based on Study Name (e.g. NRSF, FOXP2, Methy-Seq on .. etc).
def report1(request):
EXP = 'ChIP-seq'
- if request.GET.has_key('aflid'):
+ if 'aflid' in request.GET:
AFL_Id = request.GET['aflid']
try:
AFL = Affiliation.objects.get(id=AFL_Id).name
def report_RM(request): #for RNA-Seq and Methyl-Seq
EXP = 'RNA-seq'
- if request.GET.has_key('exp'):
+ if 'exp' in request.GET:
EXP = request.GET['exp'] # Methyl-seq
- if request.GET.has_key('aflid'):
+ if 'aflid' in request.GET:
AFL_Id = request.GET['aflid']
try:
AFL = Affiliation.objects.get(id=AFL_Id).name
except TypeError:
del u # move on to the next method
else:
- return u.keys()
+ return list(u.keys())
# We can't hash all the elements. Second fastest is to sort,
# which brings the equal elements together; then duplicates are
# easy to weed out in a single pass.
if user.check_password(password):
return user
#except self.user_class.DoesNotExist:
- except Exception, e:
- print >>sys.stderr, e
+ except Exception as e:
+ print(e, file=sys.stderr)
return None
def get_user(self, user_id):
try:
return self.user_class.objects.get(pk=user_id)
#except self.user_class.DoesNotExist:
- except Exception, e:
- print >>sys.stderr, e
+ except Exception as e:
+ print(e, file=sys.stderr)
return None
@property
import types
import logging
-import urlparse
+import urllib.parse
from django.db import models
from django.contrib.auth.models import User, UserManager
from django.core import urlresolvers
biology = models.TextField(blank=True, null=True)
notes = models.TextField(blank=True, null=True)
def __unicode__(self):
- return u'%s - %s' % (self.antigene, self.antibodies)
+ return '%s - %s' % (self.antigene, self.antibodies)
class Meta:
verbose_name_plural = "antibodies"
ordering = ["antigene"]
notes = models.TextField(blank=True)
def __unicode__(self):
- return unicode(self.cellline_name)
+ return str(self.cellline_name)
class Meta:
ordering = ["cellline_name"]
notes = models.TextField(blank=True)
def __unicode__(self):
- return unicode(self.condition_name)
+ return str(self.condition_name)
class Meta:
ordering = ["condition_name"]
name = models.CharField(max_length=50, unique=True)
def __unicode__(self):
- return unicode(self.name)
+ return str(self.name)
class Tag(models.Model):
tag_name = models.CharField(max_length=100, db_index=True,blank=False,null=False)
choices=TAG_CONTEXT, default='Library')
def __unicode__(self):
- return u'%s' % (self.tag_name)
+ return '%s' % (self.tag_name)
class Meta:
ordering = ["context","tag_name"]
#use_genome_build = models.CharField(max_length=100, blank=False, null=False)
def __unicode__(self):
- return u'%s (%s)' % (self.scientific_name, self.common_name)
+ return '%s (%s)' % (self.scientific_name, self.common_name)
class Meta:
verbose_name_plural = "species"
users.admin_order_field = "username"
def __unicode__(self):
- str = unicode(self.name)
+ str = str(self.name)
if self.contact is not None and len(self.contact) > 0:
- str += u' ('+self.contact+u')'
+ str += ' ('+self.contact+')'
return str
def Users(self):
users = self.users.all().order_by('username')
- return ", ".join([unicode(a) for a in users ])
+ return ", ".join([str(a) for a in users ])
class Meta:
ordering = ["name","contact"]
help_text="Does this adapter provide multiplexing?")
def __unicode__(self):
- return unicode(self.name)
+ return str(self.name)
class Meta:
ordering = ["-id"]
undiluted_concentration = models.DecimalField("Concentration",
max_digits=5, decimal_places=2, blank=True, null=True,
- help_text=u"Undiluted concentration (ng/\u00b5l)")
+ help_text="Undiluted concentration (ng/\u00b5l)")
# note \u00b5 is the micro symbol in unicode
successful_pM = models.DecimalField(max_digits=9,
decimal_places=1, blank=True, null=True)
bioanalyzer_summary = models.TextField(blank=True,default="")
bioanalyzer_concentration = models.DecimalField(max_digits=5,
decimal_places=2, blank=True, null=True,
- help_text=u"(ng/\u00b5l)")
+ help_text="(ng/\u00b5l)")
bioanalyzer_image_url = models.URLField(blank=True,default="")
def __unicode__(self):
- return u'#%s: %s' % (self.id, self.library_name)
+ return '#%s: %s' % (self.id, self.library_name)
class Meta:
verbose_name_plural = "libraries"
adapter_type = self.library_type.id,
multiplex_id = multiplex_id)
return multiplex.sequence
- except MultiplexIndex.DoesNotExist, e:
+ except MultiplexIndex.DoesNotExist as e:
return None
def index_sequence_text(self, seperator=' '):
sequences = self.index_sequences()
if sequences is None:
return ""
- if type(sequences) in types.StringTypes:
+ if type(sequences) in str:
return sequences
- multiplex_ids = sequences.keys()
+ multiplex_ids = list(sequences.keys())
multiplex_ids.sort()
return seperator.join(( "%s:%s" %(i,sequences[i]) for i in multiplex_ids))
index_sequence_text.short_description = "Index"
ar = []
for t in affs:
ar.append(t.__unicode__())
- return u'%s' % ( ", ".join(ar))
+ return '%s' % ( ", ".join(ar))
def DataRun(self):
str ='<a target=_self href="/admin/experiments/datarun/?q='+self.id+'" title="Check All Data Runs for This Specific Library ..." ">Data Run</a>'
# Check data sanity
if res[2] != "OK":
- return u'<div style="border:solid red 2px">'+res[2]+'</div>'
+ return '<div style="border:solid red 2px">'+res[2]+'</div>'
rc = "%1.2f" % (res[1]/1000000.0)
# Color Scheme: green is more than 10M, blue is more than 5M, orange is more than 3M and red is less. For RNAseq, all those thresholds should be doubled
def __unicode__(self):
#return unicode(self.username) + u" (" + unicode(self.get_full_name()) + u")"
- return unicode(self.get_full_name()) + u' (' + unicode(self.username) + ')'
+ return str(self.get_full_name()) + ' (' + str(self.username) + ')'
def HTSUserInsertID(sender, instance, **kwargs):
"""
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
from django.test import TestCase
self.failUnlessEqual(d['stopping_point'], lib.stopping_point)
self.failUnlessEqual(d['successful_pM'], lib.successful_pM)
self.failUnlessEqual(d['undiluted_concentration'],
- unicode(lib.undiluted_concentration))
+ str(lib.undiluted_concentration))
# some specific tests
if lib.id == '10981':
# test a case where there is no known status
- lane_set = {u'status': u'Unknown',
- u'paired_end': True,
- u'read_length': 75,
- u'lane_number': 1,
- u'lane_id': 1193,
- u'flowcell': u'303TUAAXX',
- u'status_code': None}
+ lane_set = {'status': 'Unknown',
+ 'paired_end': True,
+ 'read_length': 75,
+ 'lane_number': 1,
+ 'lane_id': 1193,
+ 'flowcell': '303TUAAXX',
+ 'status_code': None}
self.failUnlessEqual(len(d['lane_set']), 1)
self.failUnlessEqual(d['lane_set'][0], lane_set)
elif lib.id == '11016':
# test a case where there is a status
- lane_set = {u'status': 'Good',
- u'paired_end': True,
- u'read_length': 75,
- u'lane_number': 5,
- u'lane_id': 1197,
- u'flowcell': u'303TUAAXX',
- u'status_code': 2}
+ lane_set = {'status': 'Good',
+ 'paired_end': True,
+ 'read_length': 75,
+ 'lane_number': 5,
+ 'lane_id': 1197,
+ 'flowcell': '303TUAAXX',
+ 'status_code': 2}
self.failUnlessEqual(len(d['lane_set']), 1)
self.failUnlessEqual(d['lane_set'][0], lane_set)
}"""
query = RDF.SPARQLQuery(body)
for r in query.execute(model):
- self.assertEqual(fromTypedNode(r['library_id']), u'10981')
+ self.assertEqual(fromTypedNode(r['library_id']), '10981')
self.assertEqual(fromTypedNode(r['name']),
- u'Paired End Multiplexed Sp-BAC')
+ 'Paired End Multiplexed Sp-BAC')
self.assertEqual(fromTypedNode(r['gel_cut']), 400)
- self.assertEqual(fromTypedNode(r['made_by']), u'Igor')
+ self.assertEqual(fromTypedNode(r['made_by']), 'Igor')
state = validate_xhtml(content)
if state is not None:
count = 0
for r in query.execute(model):
count += 1
- for name, value in r.items():
+ for name, value in list(r.items()):
self.assertTrue(name in bindings)
self.assertTrue(value is not None)
rdfNS = RDF.NS("http://www.w3.org/1999/02/22-rdf-syntax-ns#")
xsdNS = RDF.NS("http://www.w3.org/2001/XMLSchema#")
libNS = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#")
-except ImportError,e:
+except ImportError as e:
HAVE_RDF = False
self.check_literal_object(model, ['Drosophila melanogaster'], p=libNS['species_name'])
self.check_uri_object(model,
- [u'http://localhost/lane/1193'],
+ ['http://localhost/lane/1193'],
p=libNS['has_lane'])
fc_uri = RDF.Uri('http://localhost/flowcell/303TUAAXX/')
self.check_literal_object(model,
- [u"303TUAAXX"],
+ ["303TUAAXX"],
s=fc_uri, p=libNS['flowcell_id'])
def check_literal_object(self, model, values, s=None, p=None, o=None):
self.failUnlessEqual(len(statements), len(values),
"Couln't find %s %s %s" % (s,p,o))
for s in statements:
- self.failUnless(unicode(s.object.uri) in values)
+ self.failUnless(str(s.object.uri) in values)
teardown_test_environment()
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(LibraryTestCase))
suite.addTests(defaultTestLoader.loadTestsFromTestCase(SampleWebTestCase))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
# Create your views here.
-import StringIO
+import io
import logging
import os
import sys
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
from django.views.decorators.csrf import csrf_exempt
summary['amplified_from'] = ''
lanes_run = count_lanes(lib.lane_set)
# suppress zeros
- for row in xrange(len(lanes_run)):
- for col in xrange(len(lanes_run[row])):
+ for row in range(len(lanes_run)):
+ for col in range(len(lanes_run[row])):
if lanes_run[row][col] == 0:
lanes_run[row][col] = ''
summary['lanes_run'] = lanes_run
summary['is_archived'] = lib.is_archived()
records.append(summary)
- cl.result_count = unicode(cl.paginator._count)
+ cl.result_count = str(cl.paginator._count)
return {'library_list': records }
storage_ids = ', '.join([ '<a href="/inventory/%s/">%s</a>' % (s,s) for s in storage_id_list ])
results = []
- for cycle in cur_fc.keys():
+ for cycle in list(cur_fc.keys()):
result_path = cur_fc[cycle]['eland_results'].get(lanes[0], None)
result_link = make_result_link(fc_id, cycle, lanes[0], result_path)
results.append({'flowcell_id': fc_id,
"""
try:
lib = Library.objects.get(id = library_id)
- except Library.DoesNotExist, e:
+ except Library.DoesNotExist as e:
return None
#lane_info = lane_information(lib.lane_set)
--- /dev/null
+/usr/share/javascript/jquery/jquery.min.js
\ No newline at end of file
VERSION_RE = "([0-9\.]+)"
USER_RE = "([a-zA-Z0-9]+)"
LANES_PER_FLOWCELL = 8
-LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
+LANE_LIST = list(range(1, LANES_PER_FLOWCELL + 1))
# make epydoc happy
__docformat__ = "restructuredtext en"
-LANE_LIST = range(1,9)
+LANE_LIST = list(range(1,9))
class Phasing(object):
PHASING = 'Phasing'
for b in base_order:
for value in self.base[b]:
crosstalk_value = ElementTree.SubElement(root, CrosstalkMatrix.ELEMENT)
- crosstalk_value.text = unicode(value)
+ crosstalk_value.text = str(value)
crosstalk_value.tail = os.linesep
return root
# add phasing parameters
for lane in LANE_LIST:
- if self.phasing.has_key(lane):
+ if lane in self.phasing:
params.append(self.phasing[lane].get_elements())
# add crosstalk matrix if it exists
opts, args = parser.parse_args(cmdline)
for bustard_dir in args:
- print u'analyzing bustard directory: ' + unicode(bustard_dir)
+ print('analyzing bustard directory: ' + str(bustard_dir))
bustard_object = bustard(bustard_dir)
bustard_object.dump()
b2 = ElementTree.tostring(b2_tree).split(os.linesep)
for line1, line2 in zip(b1, b2):
if b1 != b2:
- print "b1: ", b1
- print "b2: ", b2
+ print("b1: ", b1)
+ print("b2: ", b2)
if __name__ == "__main__":
main(sys.argv[1:])
#Standard output handling
else:
- print 'Sequence line:', line
+ print('Sequence line:', line)
mo = s_seq_folder.search(line)
conf_info.bustard_path = line[mo.end():]
conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
try:
saveConfigFile(flowcell, options.url, cfg_filepath)
conf_info.config_filepath = cfg_filepath
- except FlowCellNotFound, e:
+ except FlowCellNotFound as e:
LOGGER.error(e)
return False
- except WebError404, e:
+ except WebError404 as e:
LOGGER.error(e)
return False
- except IOError, e:
+ except IOError as e:
LOGGER.error(e)
return False
- except Exception, e:
+ except Exception as e:
LOGGER.error(e)
return False
stdout=fout,
stderr=ferr)
- print "Configuring pipeline: %s" % (time.ctime())
+ print("Configuring pipeline: %s" % (time.ctime()))
error_code = pipe.wait()
# Clean up
# 1) The stdout completed without error
# 2) The program exited with status 0
# 3) No errors found in stdout
- print '#Expect: True, False, True, True'
- print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True
+ print('#Expect: True, False, True, True')
+ print(complete, bool(error_code), abort != RUN_ABORT, stderr_success is True)
status = complete is True and \
bool(error_code) is False and \
abort != RUN_ABORT and \
ferr.close()
# Finished file check!
- print 'RUN SUCCESS CHECK:'
- for key, value in event.run_status_dict.items():
- print ' %s: %s' % (key, value)
+ print('RUN SUCCESS CHECK:')
+ for key, value in list(event.run_status_dict.items()):
+ print(' %s: %s' % (key, value))
dstatus = event.run_status_dict
opts, args = parser.parse_args(cmdline)
if opts.version:
- print (version())
+ print((version()))
return 0
if opts.output is not None:
def get_elements(self):
lane = ElementTree.Element(ElandLane.LANE,
{'version':
- unicode(ElandLane.XML_VERSION)})
+ str(ElandLane.XML_VERSION)})
sample_tag = ElementTree.SubElement(lane, SAMPLE_NAME)
sample_tag.text = self.sample_name
lane_tag = ElementTree.SubElement(lane, LANE_ID)
end_tag = ElementTree.SubElement(lane, END)
end_tag.text = str(self.end)
genome_map = ElementTree.SubElement(lane, GENOME_MAP)
- for k, v in self.genome_map.items():
+ for k, v in list(self.genome_map.items()):
item = ElementTree.SubElement(
genome_map, GENOME_ITEM,
- {'name':k, 'value':unicode(v)})
+ {'name':k, 'value':str(v)})
mapped_reads = ElementTree.SubElement(lane, MAPPED_READS)
- for k, v in self.mapped_reads.items():
+ for k, v in list(self.mapped_reads.items()):
item = ElementTree.SubElement(
mapped_reads, MAPPED_ITEM,
- {'name':k, 'value':unicode(v)})
+ {'name':k, 'value':str(v)})
match_codes = ElementTree.SubElement(lane, MATCH_CODES)
- for k, v in self.match_codes.items():
+ for k, v in list(self.match_codes.items()):
item = ElementTree.SubElement(
match_codes, MATCH_ITEM,
- {'name':k, 'value':unicode(v)})
+ {'name':k, 'value':str(v)})
reads = ElementTree.SubElement(lane, READS)
- reads.text = unicode(self.reads)
+ reads.text = str(self.reads)
return lane
for key in initializer:
if key not in self.match_codes:
errmsg = "Initializer can only contain: %s"
- raise ValueError(errmsg % (",".join(self.match_codes.keys())))
+ raise ValueError(errmsg % (",".join(list(self.match_codes.keys()))))
self.match_codes[key] += initializer[key]
def __iter__(self):
def __setitem__(self, key, value):
if key not in self.match_codes:
errmsg = "Unrecognized key, allowed values are: %s"
- raise ValueError(errmsg % (",".join(self.match_codes.keys())))
+ raise ValueError(errmsg % (",".join(list(self.match_codes.keys()))))
self.match_codes[key] = value
def __len__(self):
raise ValueError("Expected a MatchCodes, got %s", str(type(other)))
newobj = MatchCodes(self)
- for key, value in other.items():
+ for key, value in list(other.items()):
newobj[key] = self.get(key, 0) + other[key]
return newobj
LOGGER.info("summarizing results for %s" % (pathname))
lines = 0
f = open(pathname)
- for l in f.xreadlines():
+ for l in f:
lines += 1
f.close()
def get_elements(self):
lane = ElementTree.Element(SequenceLane.LANE,
{'version':
- unicode(SequenceLane.XML_VERSION)})
+ str(SequenceLane.XML_VERSION)})
sample_tag = ElementTree.SubElement(lane, SAMPLE_NAME)
sample_tag.text = self.sample_name
lane_tag = ElementTree.SubElement(lane, LANE_ID)
end_tag = ElementTree.SubElement(lane, END)
end_tag.text = str(self.end)
reads = ElementTree.SubElement(lane, READS)
- reads.text = unicode(self.reads)
+ reads.text = str(self.reads)
sequence_type = ElementTree.SubElement(lane, SequenceLane.SEQUENCE_TYPE)
- sequence_type.text = unicode(SequenceLane.SEQUENCE_DESCRIPTION[self.sequence_type])
+ sequence_type.text = str(SequenceLane.SEQUENCE_DESCRIPTION[self.sequence_type])
return lane
def set_elements(self, tree):
if tree.tag != SequenceLane.LANE:
raise ValueError('Exptecting %s' % (SequenceLane.LANE,))
- lookup_sequence_type = dict([ (v,k) for k,v in SequenceLane.SEQUENCE_DESCRIPTION.items()])
+ lookup_sequence_type = dict([ (v,k) for k,v in list(SequenceLane.SEQUENCE_DESCRIPTION.items())])
for element in tree:
tag = element.tag.lower()
del self.result[key]
def __iter__(self):
- keys = self.results.iterkeys()
+ keys = iter(self.results.keys())
for k in sorted(keys):
yield k
raise ValueError("Key must be a %s" % (str(type(SampleKey))))
if not search.iswild:
yield self[search]
- for key in self.keys():
+ for key in list(self.keys()):
if key.matches(search): yield key
def get_elements(self):
root = ElementTree.Element(ELAND.ELAND,
- {'version': unicode(ELAND.XML_VERSION)})
+ {'version': str(ELAND.XML_VERSION)})
for key in self:
eland_lane = self[key].get_elements()
- eland_lane.attrib[ELAND.END] = unicode(self[key].end-1)
- eland_lane.attrib[ELAND.LANE_ID] = unicode(self[key].lane_id)
- eland_lane.attrib[ELAND.SAMPLE] = unicode(self[key].sample_name)
+ eland_lane.attrib[ELAND.END] = str(self[key].end-1)
+ eland_lane.attrib[ELAND.LANE_ID] = str(self[key].lane_id)
+ eland_lane.attrib[ELAND.SAMPLE] = str(self[key].sample_name)
root.append(eland_lane)
return root
return root
for a in args:
LOGGER.info("Starting scan of %s" % (a,))
e = eland(a)
- print ElementTree.tostring(e.get_elements())
+ print(ElementTree.tostring(e.get_elements()))
return
def dump(self):
"""Report debugginf information
"""
- print "Starting cycle:", self.start
- print "Ending cycle:", self.stop
- print "Firecrest version:", self.version
- print "Run date:", self.date
- print "user:", self.user
+ print("Starting cycle:", self.start)
+ print("Ending cycle:", self.stop)
+ print("Firecrest version:", self.version)
+ print("Run date:", self.date)
+ print("user:", self.user)
def get_elements(self):
"""Return XML serialization structure.
# Need valid directory
if not os.path.exists(genome_base_dir):
msg = "Directory does not exist: %s" % (genome_base_dir)
- raise IOError, msg
+ raise IOError(msg)
# Find all subdirectories
filepath_list = glob.glob(os.path.join(genome_base_dir, '*'))
build_dict = d.setdefault(species, {})
if build in build_dict:
msg = "Duplicate genome for %s|%s" % (species, build)
- raise DuplicateGenome, msg
+ raise DuplicateGenome(msg)
build_dict[build] = genome_dir
builds = self.genome_dict[elements[0]]
# sort build names the way humans would
- keys = builds.keys()
+ keys = list(builds.keys())
keys.sort(cmp=alphanum)
# return the path from the 'last' build name
def get(self, key, default=None):
try:
return self[key]
- except KeyError, e:
+ except KeyError as e:
return default
def keys(self):
keys = []
- for species in self.genome_dict.keys():
+ for species in list(self.genome_dict.keys()):
for build in self.genome_dict[species]:
keys.append([species+'|'+build])
return keys
def values(self):
values = []
- for species in self.genome_dict.keys():
+ for species in list(self.genome_dict.keys()):
for build in self.genome_dict[species]:
values.append(self.genome_dict[species][build])
return values
def items(self):
items = []
- for species in self.genome_dict.keys():
+ for species in list(self.genome_dict.keys()):
for build in self.genome_dict[species]:
key = [species+'|'+build]
value = self.genome_dict[species][build]
if __name__ == '__main__':
if len(sys.argv) != 2:
- print 'useage: %s <base_genome_dir>' % (sys.argv[0])
+ print('useage: %s <base_genome_dir>' % (sys.argv[0]))
sys.exit(1)
d = getAvailableGenomes(sys.argv[1])
d2 = constructMapperDict(d)
- for k,v in d2.items():
- print '%s: %s' % (k,v)
+ for k,v in list(d2.items()):
+ print('%s: %s' % (k,v))
return len(self._contigs)
def __iter__(self):
- return self._contigs.iterkeys()
+ return iter(self._contigs.keys())
def __getitem__(self, name):
return self._contigs[name]
genome = guess_genome(sizes)
- for contig, basese in sizes.items():
+ for contig, basese in list(sizes.items()):
name = filenames[contig]
self._contigs[name] = genome + '/' + name
return genomes[key][size]
if len(contig_sizes) == 1:
- return os.path.splitext(contig_sizes.keys()[0])[0]
+ return os.path.splitext(list(contig_sizes.keys())[0])[0]
raise RuntimeError("Unrecognized genome type, update detection code.")
"""
Debugging function, report current object
"""
- print 'Software:'. self.__class__.__name__
- print 'Alignment version:', self.version
- print 'Run date:', self.date
- print 'config.xml:', self.tree
+ print('Software:'. self.__class__.__name__)
+ print('Alignment version:', self.version)
+ print('Run date:', self.date)
+ print('config.xml:', self.tree)
self.summary.dump()
def get_elements(self, root_tag):
return None
gerald = ElementTree.Element(root_tag,
- {'version': unicode(Gerald.XML_VERSION)})
+ {'version': str(Gerald.XML_VERSION)})
gerald.append(self.tree)
gerald.append(self.summary.get_elements())
if self.eland_results:
lanes = [x.tag.split('_')[1] for x in container.getchildren()]
try:
index = lanes.index(self._lane_id)
- except ValueError, e:
+ except ValueError as e:
return None
element = container[index]
return element.text
def __iter__(self):
if self._lanes is None:
self._initialize_lanes()
- return self._lanes.iterkeys()
+ return iter(self._lanes.keys())
def __getitem__(self, key):
if self._lanes is None:
return self._lanes[real_key]
raise KeyError("%s not found in %s" % (
repr(key),
- ",".join((repr(k) for k in self._lanes.keys()))))
+ ",".join((repr(k) for k in list(self._lanes.keys())))))
def __setitem__(self, key, value):
if len(self._lanes) > 100:
return value_list
def items(self):
- return zip(self.keys(), self.values())
+ return list(zip(list(self.keys()), list(self.values())))
def __getitem__(self, key):
# FIXME: this is inefficient. building the dictionary be rescanning the xml.
- v = dict(self.items())
+ v = dict(list(self.items()))
return v[key]
class IPAR(object):
"""
suffix_node = self.tree.find('RunParameters/CompressionSuffix')
if suffix_node is None:
- print "find compression suffix failed"
+ print("find compression suffix failed")
return None
suffix = suffix_node.text
files = []
format = "%s_%s_%04d_%s.txt%s"
- for lane, attrib in self.tiles.items():
+ for lane, attrib in list(self.tiles.items()):
for file_type in ["int","nse"]:
start, stop = attrib['TileRange']
for tile in range(start, stop+1):
return files
def dump(self):
- print "Matrix:", self.matrix
- print "Tree:", self.tree
+ print("Matrix:", self.matrix)
+ print("Tree:", self.tree)
def get_elements(self):
attribs = {'version': str(IPAR.XML_VERSION) }
tree = ElementTree.parse(paramfile).getroot()
run = tree.find('Run')
- if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES:
+ if 'Name' in run.attrib and run.attrib['Name'] in SOFTWARE_NAMES:
return run
else:
LOGGER.info("No run found")
opts, args = parser.parse_args(cmdline)
if opts.version:
- print version()
+ print(version())
return 0
if opts.infile is not None:
#!/usr/bin/env python
import csv
-from ConfigParser import RawConfigParser
+from configparser import RawConfigParser
import logging
from optparse import OptionParser, IndentedHelpFormatter
import os
import sys
import types
-import urllib
-import urllib2
+import urllib.request, urllib.parse, urllib.error
+import urllib.request, urllib.error, urllib.parse
+import collections
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
from htsworkflow.frontend.auth import apidata
url = api.flowcell_url(base_host_url, flowcell)
try:
- apipayload = urllib.urlencode(apidata)
- web = urllib2.urlopen(url, apipayload)
- except urllib2.URLError, e:
+ apipayload = urllib.parse.urlencode(apidata)
+ web = urllib.request.urlopen(url, apipayload)
+ except urllib.error.URLError as e:
errmsg = 'URLError: %d %s' % (e.code, e.msg)
LOGGER.error(errmsg)
LOGGER.error('opened %s' % (url,))
(The same species, read length, and eland vs sequencing)
"""
lane_groups = {}
- for lane_number, lane_contents in flowcell_info['lane_set'].items():
+ for lane_number, lane_contents in list(flowcell_info['lane_set'].items()):
for lane_info in lane_contents:
index = (lane_info['read_length'],
lane_info['library_species'],
analysis_suffix = eland_analysis_suffix[flowcell_info['paired_end']]
sequence_suffix = sequence_analysis_suffix[flowcell_info['paired_end']]
lane_groups = group_lane_parameters(flowcell_info)
- for lane_index, lane_numbers in lane_groups.items():
+ for lane_index, lane_numbers in list(lane_groups.items()):
# lane_index is return value of group_lane_parameters
read_length, species, is_sequencing = lane_index
lane_numbers.sort()
- lane_prefix = u"".join(lane_numbers)
+ lane_prefix = "".join(lane_numbers)
species_path = genome_map.get(species, None)
LOGGER.debug("Looked for genome '%s' got location '%s'" % (species, species_path))
(i.e. http://sub.domain.edu:port)
"""
LOGGER.info('USING OPTIONS:')
- LOGGER.info(u' URL: %s' % (options.url,))
- LOGGER.info(u' OUT: %s' % (options.output_filepath,))
- LOGGER.info(u' FC: %s' % (options.flowcell,))
+ LOGGER.info(' URL: %s' % (options.url,))
+ LOGGER.info(' OUT: %s' % (options.output_filepath,))
+ LOGGER.info(' FC: %s' % (options.flowcell,))
#LOGGER.info(': %s' % (options.genome_dir,))
- LOGGER.info(u'post_run: %s' % ( unicode(options.post_run),))
+ LOGGER.info('post_run: %s' % ( str(options.post_run),))
flowcell_info = retrieve_flowcell_info(options.url, options.flowcell)
LOGGER.debug('genome_dir: %s' % ( options.genome_dir, ))
available_genomes = getAvailableGenomes(options.genome_dir)
genome_map = constructMapperDict(available_genomes)
- LOGGER.debug('available genomes: %s' % ( unicode( genome_map.keys() ),))
+ LOGGER.debug('available genomes: %s' % ( str( list(genome_map.keys()) ),))
config = format_gerald_config(options, flowcell_info, genome_map)
htsw_field = illumina_to_htsw_map.get(illumina_name, None)
if htsw_field is None:
continue
- if callable(htsw_field):
+ if isinstance(htsw_field, collections.Callable):
renamed[illumina_name] = htsw_field(options,
flowcell_info,
library)
sequences = library.get('index_sequence', None)
if sequences is None:
return []
- elif (type(sequences) in types.StringTypes and
+ elif (type(sequences) in str and
sequences.lower().startswith('err')):
shared['Index'] = ''
shared['SampleProject'] = library['library_id']
return [shared]
- elif (type(sequences) == types.DictType):
+ elif (type(sequences) == dict):
pooled = []
- multiplex_ids = sequences.keys()
+ multiplex_ids = list(sequences.keys())
multiplex_ids.sort(cmp=alphanum.alphanum)
for multiplex_id in multiplex_ids:
sample = {}
# lane, and cycle provided (INVALID)
if tile is None and cycle is not None:
msg = "Handling of cycle without tile is not currently implemented."
- raise ValueError, msg
+ raise ValueError(msg)
# lane, tile, cycle provided
elif cycle:
"""
firecrest = self.status['firecrest']
total = len(firecrest)
- completed = firecrest.values().count(True)
+ completed = list(firecrest.values()).count(True)
return (completed, total)
"""
bustard = self.status['bustard']
total = len(bustard)
- completed = bustard.values().count(True)
+ completed = list(bustard.values()).count(True)
return (completed, total)
"""
gerald = self.status['gerald']
total = len(gerald)
- completed = gerald.values().count(True)
+ completed = list(gerald.values()).count(True)
return (completed, total)
while 1:
if conf_info.status is None:
- print "No status object yet."
+ print("No status object yet.")
time.sleep(SLEEP_AMOUNT)
continue
report = conf_info.status.statusReport()
- print os.linesep.join(report)
- print
+ print(os.linesep.join(report))
+ print()
time.sleep(SLEEP_AMOUNT)
parser.error("need name of configuration file")
status = GARunStatus(args[0])
- print os.linesep.join(status.statusReport())
+ print(os.linesep.join(status.statusReport()))
return 0
if __name__ == "__main__":
p.bustard = b
p.gerald = g
runs.append(p)
- except IOError, e:
+ except IOError as e:
LOGGER.error("Ignoring " + str(e))
return len(runs) - start
if aligned:
p.gerald = gerald.gerald(aligned)
runs.append(p)
- except IOError, e:
+ except IOError as e:
LOGGER.error("Ignoring " + str(e))
return len(runs) - start
summarized_reads = {}
genome_reads = 0
genome = 'unknown'
- for k, v in mapped_reads.items():
+ for k, v in list(mapped_reads.items()):
path, k = os.path.split(k)
if len(path) > 0 and path not in genome_map:
genome = path
report.append("Mapped Reads")
mapped_reads = summarize_mapped_reads(eland_result.genome_map,
eland_result.mapped_reads)
- for name, counts in mapped_reads.items():
+ for name, counts in list(mapped_reads.items()):
report.append(" %s: %d" % (name, counts))
report.append('')
import types
import re
import sys
-from urlparse import urljoin, urlparse
+from urllib.parse import urljoin, urlparse
import RDF
from htsworkflow.util.rdfhelp import libraryOntology as libNS
return (self.flowcell, self.lane, self.read, self.project, self.split)
def __unicode__(self):
- return unicode(self.path)
+ return str(self.path)
def __eq__(self, other):
"""
return not self == other
def __repr__(self):
- return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
+ return "<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
def make_target_name(self, root):
"""
def get_one(s, p):
values = get(s, p)
if len(values) > 1:
- errmsg = u"To many values for %s %s"
- raise ValueError(errmsg % (unicode(s), unicode(p)))
+ errmsg = "To many values for %s %s"
+ raise ValueError(errmsg % (str(s), str(p)))
elif len(values) == 1:
return values[0]
else:
rdfNS['type'],
libNS['IlluminaResult'])
if not model.contains_statement(result_statement):
- raise KeyError(u"%s not found" % (unicode(seq_id),))
+ raise KeyError("%s not found" % (str(seq_id),))
seq_type_node = model.get_target(seq_id, libNS['file_type'])
seq_type = stripNamespace(libNS, seq_type_node)
Scan through a list of directories for sequence like files
"""
sequences = []
- if type(dirs) in types.StringTypes:
+ if type(dirs) in str:
raise ValueError("You probably want a list or set, not a string")
for d in dirs:
qseq_patterns = []
# grab a lane from the dictionary
# I don't think it matters which one.
- k = lanes.keys()[0]
+ k = list(lanes.keys())[0]
# build the list of patterns
for read in lanes[k]:
read = int(read)
logging.basicConfig(level=logging.WARN)
if opts.version:
- print version()
+ print(version())
return 0
if len(args) != 1:
target2_name = base + '_r2.fastq'
for target_name in [target1_name, target2_name]:
- print 'target name', target_name
+ print('target name', target_name)
if os.path.exists(target_name):
raise RuntimeError("%s exists" % (target_name,))
def get_elements(self):
summary = etree.Element(Summary.SUMMARY,
- {'version': unicode(Summary.XML_VERSION)})
+ {'version': str(Summary.XML_VERSION)})
for end in self.lane_results:
- for lane in end.values():
+ for lane in list(end.values()):
summary.append(lane.get_elements())
return summary
Debugging function, report current object
"""
tree = self.get_elements()
- print etree.tostring(tree)
+ print(etree.tostring(tree))
class SummaryGA(Summary):
def __init__(self, filename=None, xml=None):
('Lane Results Summary : Read 1', 0),
('Lane Results Summary : Read 2', 1),]
for name, end in table_names:
- if tables.has_key(name):
+ if name in tables:
self._extract_lane_results_for_end(tables, name, end)
if len(self.lane_results[0]) == 0:
def get_elements(self):
lane_result = etree.Element(
LaneResultSummary.LANE_RESULT_SUMMARY,
- {'lane': unicode(self.lane), 'end': unicode(self.end)})
- for tag, variable_name in LaneResultSummary.TAGS.items():
+ {'lane': str(self.lane), 'end': str(self.end)})
+ for tag, variable_name in list(LaneResultSummary.TAGS.items()):
value = getattr(self, variable_name)
if value is None:
continue
# it looks like a sequence
- elif type(value) in (types.TupleType, types.ListType):
+ elif type(value) in (tuple, list):
element = make_mean_range_element(
lane_result,
tag,
)
else:
element = etree.SubElement(lane_result, tag)
- element.text = unicode(value)
+ element.text = str(value)
return lane_result
def set_elements(self, tree):
variable_name = tags[element.tag]
setattr(self, variable_name,
parse_summary_element(element))
- except KeyError, e:
+ except KeyError as e:
LOGGER.warn('Unrecognized tag %s' % (element.tag,))
else:
self.lane_yield = None
- for GeraldName, LRSName in LaneResultSummary.GERALD_TAGS.items():
+ for GeraldName, LRSName in list(LaneResultSummary.GERALD_TAGS.items()):
node = element.find(GeraldName)
if node is None:
LOGGER.info("Couldn't find %s" % (GeraldName))
"""
try:
v = int(v)
- except ValueError, e:
+ except ValueError as e:
v = float(v)
return v
Make an etree subelement <Name mean='mean', deviation='deviation'/>
"""
element = etree.SubElement(parent, name,
- { 'mean': unicode(mean),
- 'deviation': unicode(deviation)})
+ { 'mean': str(mean),
+ 'deviation': str(deviation)})
return element
def parse_mean_range_element(element):
TEST_CODE_DIR = os.path.split(__file__)[0]
TESTDATA_DIR = os.path.join(TEST_CODE_DIR, 'testdata')
-LANE_LIST = range(1,9)
-TILE_LIST = range(1,101)
+LANE_LIST = list(range(1,9))
+TILE_LIST = list(range(1,101))
HISEQ_TILE_LIST = [1101, 1102, 1103, 1104, 1105, 1106, 1107, 1108,
1201, 1202, 1203, 1204, 1205, 1206, 1207, 1208,
2101, 2102, 2103, 2104, 2105, 2106, 2107, 2108,
return pathname
def dump(self):
- print ('index seq: {0}'.format(self.index_seq))
+ print(('index seq: {0}'.format(self.index_seq)))
- print ('project dir: {0}'.format(self.project_dir))
- print ('sample dir: {0}'.format(self.sample_dir))
- print ('rootname: {0}'.format(self.rootname))
- print ('path: {0}'.format(
+ print(('project dir: {0}'.format(self.project_dir)))
+ print(('sample dir: {0}'.format(self.sample_dir)))
+ print(('rootname: {0}'.format(self.rootname)))
+ print(('path: {0}'.format(
os.path.join(self.project_dir,
self.sample_dir,
- self.rootname+'R1_001.fastq.gz')))
+ self.rootname+'R1_001.fastq.gz'))))
def get_unaligned_sample_fastq_data(flowcell_id, lane, index_seq):
"""
for dirpath, dirnames, filenames in os.walk(root):
for filename in filenames:
- print os.path.join(dirpath, filename)
+ print(os.path.join(dirpath, filename))
class BaseCallInfo(object):
#!/usr/bin/env python
"""More direct synthetic test cases for the eland output file processing
"""
-from StringIO import StringIO
-from unittest2 import TestCase
+from io import StringIO
+from unittest import TestCase
from htsworkflow.pipelines.eland import ELAND, ElandLane, ElandMatches, \
SampleKey, MatchCodes, MappedReads
'U0':0, 'U1':0, 'U2':0,
'R0':0, 'R1':0, 'R2':0,
}
- self.assertEqual(mc.keys(), match_codes.keys())
- self.assertEqual(mc.items(), match_codes.items())
- self.assertEqual(mc.values(), match_codes.values())
+ self.assertEqual(list(mc.keys()), list(match_codes.keys()))
+ self.assertEqual(list(mc.items()), list(match_codes.items()))
+ self.assertEqual(list(mc.values()), list(match_codes.values()))
self.assertRaises(KeyError, mc.__getitem__, 'foo')
def test_addition(self):
mr1['chr9'] = 7
self.assertEqual(list(mr1.keys()), ['chr9'])
self.assertEqual(mr1['chr9'], 7)
- self.assertEqual(mr1.items(), [('chr9', 7)])
+ self.assertEqual(list(mr1.items()), [('chr9', 7)])
del mr1['chr9']
self.assertEqual(len(mr1), 0)
"""Test specific Eland modules
"""
def compare_match_array(self, current, expected):
- for key in expected.keys():
+ for key in list(expected.keys()):
self.assertEqual(current[key], expected[key],
"Key %s: %s != %s" % (key,current[key],expected[key]))
e.results[sl3] = 'Lane3'
e.results[sl1] = 'Lane1'
- e_list = e.values()
+ e_list = list(e.values())
self.assertEqual(e_list[0], 'Lane1')
self.assertEqual(e_list[1], 'Lane3')
self.assertEqual(e_list[2], 'Lane5')
self.assertEqual(len(em[key]), 1)
filename = iter(em[key]).next().filename
self.assertEqual(filename, 's_1_sequence.txt')
- self.assertEqual(em.keys(), [key])
+ self.assertEqual(list(em.keys()), [key])
em.add('s_1_eland_result.txt')
self.assertEqual(len(em), 1)
self.assertEqual(len(em[key]), 1)
filename = iter(em[key]).next().filename
self.assertEqual(filename, 's_1_eland_result.txt')
- self.assertEqual(em.keys(), [key])
+ self.assertEqual(list(em.keys()), [key])
def test_parts(self):
key11111 = SampleKey(1, 1, '11111')
self.assertEqual(len(em[key11112]), 2)
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(MatchCodeTests))
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestMappedReads))
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import tempfile
import shutil
import sys
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import eland
from htsworkflow.pipelines import ipar
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderExtractTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
-from unittest2 import TestCase
+from unittest import TestCase
-from StringIO import StringIO
+from io import StringIO
from htsworkflow.pipelines import genome_mapper
class testGenomeMapper(TestCase):
self.failUnlessEqual("%(Mus musculus|mm8)s" % (genome_map), "/mm8")
self.failUnlessEqual("%(Mus musculus|mm10)s" % (genome_map), "/mm10")
- self.failUnlessEqual(len(genome_map.keys()), 6)
- self.failUnlessEqual(len(genome_map.values()), 6)
- self.failUnlessEqual(len(genome_map.items()), 6)
+ self.failUnlessEqual(len(list(genome_map.keys())), 6)
+ self.failUnlessEqual(len(list(genome_map.values())), 6)
+ self.failUnlessEqual(len(list(genome_map.items())), 6)
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testGenomeMapper))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
"""More direct synthetic test cases for the eland output file processing
"""
import os
-from StringIO import StringIO
+from io import StringIO
import shutil
import tempfile
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import ElementTree
from htsworkflow.pipelines import genomemap
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestGenomeMap))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import csv
import os
import re
-from StringIO import StringIO
+from io import StringIO
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
from django.test import TestCase
flowcell_info = json.loads(flowcell_request.content)
options = getCombinedOptions(['-f','FC12150','-g',os.getcwd()])
- genome_map = {u'Homo sapiens': '/tmp/hg18' }
+ genome_map = {'Homo sapiens': '/tmp/hg18' }
config = format_gerald_config(options, flowcell_info, genome_map)
config_lines = config.split('\n')
output = StringIO()
save_sample_sheet(output, options, flowcell_info)
- print output.buf
+ print(output.buf)
output.seek(0)
sheet = list(csv.DictReader(output))
]
self.failUnlessEqual(len(sheet), len(expected))
for s, e in zip(sheet, expected):
- for key in e.keys():
+ for key in list(e.keys()):
self.failUnlessEqual(s[key], e[key],
"%s != %s for key %s" % (s[key],e[key], key))
import os
-from unittest2 import TestCase
-from StringIO import StringIO
+from unittest import TestCase
+from io import StringIO
-from simulate_runfolder import TESTDATA_DIR
+from .simulate_runfolder import TESTDATA_DIR
from htsworkflow.pipelines.runfolder import load_pipeline_run_xml
from htsworkflow.pipelines.eland import SampleKey
self.failUnlessEqual(run.runfolder_name, runfolder_name)
self.failUnlessEqual(run.gerald.runfolder_name, runfolder_name)
- for (end, lane), lane_results in results['lane_results'].items():
- for name, test_value in lane_results.items():
+ for (end, lane), lane_results in list(results['lane_results'].items()):
+ for name, test_value in list(lane_results.items()):
xml_value = getattr(run.gerald.summary[end][lane], name)
self.failUnlessEqual(xml_value, test_value,
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testLoadRunXML))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import firecrest
from htsworkflow.pipelines import bustard
self.failUnlessEqual(b.date, b2.date )
self.failUnlessEqual(b.user, b2.user)
self.failUnlessEqual(len(b.phasing), len(b2.phasing))
- for key in b.phasing.keys():
+ for key in list(b.phasing.keys()):
self.failUnlessEqual(b.phasing[key].lane,
b2.phasing[key].lane)
self.failUnlessEqual(b.phasing[key].phasing,
self.failUnlessEqual(g.software, 'GERALD')
self.failUnlessEqual(g.version, '1.68.2.2')
self.failUnlessEqual(g.date, datetime(2008,4,19,19,8,30))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
# do it all again after extracting from the xml file
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
for i in range(1,9):
g2_results.reads)
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
self.failUnlessEqual(l1.lane_id, l2.lane_id)
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 3)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import firecrest
from htsworkflow.pipelines import bustard
self.failUnlessEqual(b.date, b2.date )
self.failUnlessEqual(b.user, b2.user)
self.failUnlessEqual(len(b.phasing), len(b2.phasing))
- for key in b.phasing.keys():
+ for key in list(b.phasing.keys()):
self.failUnlessEqual(b.phasing[key].lane,
b2.phasing[key].lane)
self.failUnlessEqual(b.phasing[key].phasing,
self.failUnlessEqual(g.software, 'GERALD')
self.failUnlessEqual(g.version, '1.68.2.2')
self.failUnlessEqual(g.date, datetime(2008,4,19,19,8,30))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
# do it all again after extracting from the xml file
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
for i in range(1,9):
g2_results.reads)
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
self.failUnlessEqual(l1.lane_id, l2.lane_id)
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 3)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import firecrest
from htsworkflow.pipelines import bustard
self.failUnlessEqual(b.date, b2.date )
self.failUnlessEqual(b.user, b2.user)
self.failUnlessEqual(len(b.phasing), len(b2.phasing))
- for key in b.phasing.keys():
+ for key in list(b.phasing.keys()):
self.failUnlessEqual(b.phasing[key].lane,
b2.phasing[key].lane)
self.failUnlessEqual(b.phasing[key].phasing,
self.failUnlessEqual(g.software, 'GERALD')
self.failUnlessEqual(g.version, '1.171')
self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
# I want to be able to use a simple iterator
- for l in g.lanes.values():
+ for l in list(g.lanes.values()):
self.failUnlessEqual(l.analysis, 'eland_extended')
self.failUnlessEqual(l.read_length, '37')
self.failUnlessEqual(l.use_bases, 'Y'*37)
self.failUnlessEqual(g.software, g2.software)
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
for i in range(1,9):
g2_results.reads)
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
self.failUnlessEqual(l1.lane_id, l2.lane_id)
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 17)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import bustard
from htsworkflow.pipelines import eland
self.failUnlessEqual(g.software, 'CASAVA')
self.failUnlessEqual(g.version, '1.7.0')
self.failUnlessEqual(g.date, datetime(2011,5,2,19,19,49))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
self.failUnlessEqual(cur_lane.use_bases, 'Y'*100+'y'*100)
# I want to be able to use a simple iterator
- for l in g.lanes.values():
+ for l in list(g.lanes.values()):
self.failUnless(l.analysis in ('sequence_pair', 'eland_pair'))
self.failUnlessEqual(l.read_length, '100')
self.failUnlessEqual(l.use_bases, 'Y'*100+'y'*100)
self.failUnlessEqual(g.software, g2.software)
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
for i in range(1,9):
if isinstance(g_results, eland.ElandLane):
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
if isinstance(l1, eland.ElandLane):
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 7)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
elif isinstance(l1, eland.SequenceLane):
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import ipar
from htsworkflow.pipelines import bustard
self.failUnlessEqual(b.date, b2.date )
self.failUnlessEqual(b.user, b2.user)
self.failUnlessEqual(len(b.phasing), len(b2.phasing))
- for key in b.phasing.keys():
+ for key in list(b.phasing.keys()):
self.failUnlessEqual(b.phasing[key].lane,
b2.phasing[key].lane)
self.failUnlessEqual(b.phasing[key].phasing,
self.failUnlessEqual(g.software, 'GERALD')
self.failUnlessEqual(g.version, '1.171')
self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
# I want to be able to use a simple iterator
- for l in g.lanes.values():
+ for l in list(g.lanes.values()):
self.failUnlessEqual(l.analysis, 'eland_extended')
self.failUnlessEqual(l.read_length, '37')
self.failUnlessEqual(l.use_bases, 'Y'*37)
self.failUnlessEqual(g.software, g2.software)
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
for i in range(1,9):
g2_results.reads)
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
self.failUnlessEqual(l1.lane_id, l2.lane_id)
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 17)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import eland
from htsworkflow.pipelines import ipar
self.failUnlessEqual(b.user, 'diane')
self.failUnlessEqual(len(b.phasing), 8)
self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
- self.failUnlessEqual(b.crosstalk.base.keys(), ['A','C','T','G'])
+ self.failUnlessEqual(list(b.crosstalk.base.keys()), ['A','C','T','G'])
check_crosstalk(b.crosstalk)
xml = b.get_elements()
self.failUnlessEqual(b.date, b2.date )
self.failUnlessEqual(b.user, b2.user)
self.failUnlessEqual(len(b.phasing), len(b2.phasing))
- for key in b.phasing.keys():
+ for key in list(b.phasing.keys()):
self.failUnlessEqual(b.phasing[key].lane,
b2.phasing[key].lane)
self.failUnlessEqual(b.phasing[key].phasing,
self.failUnlessEqual(g.software, 'GERALD')
self.failUnlessEqual(g.version, '1.171')
self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
# I want to be able to use a simple iterator
- for l in g.lanes.values():
+ for l in list(g.lanes.values()):
self.failUnlessEqual(l.analysis, 'eland_extended')
self.failUnlessEqual(l.read_length, '37')
self.failUnlessEqual(l.use_bases, 'Y'*37)
self.failUnlessEqual(g.software, g2.software)
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
for i in range(1,9):
if isinstance(g_results, eland.ElandLane):
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
if isinstance(l1, eland.ElandLane):
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 17)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
elif isinstance(l1, eland.SequenceLane):
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import firecrest
from htsworkflow.pipelines import bustard
self.failUnlessEqual(b.date, b2.date )
self.failUnlessEqual(b.user, b2.user)
self.failUnlessEqual(len(b.phasing), len(b2.phasing))
- for key in b.phasing.keys():
+ for key in list(b.phasing.keys()):
self.failUnlessEqual(b.phasing[key].lane,
b2.phasing[key].lane)
self.failUnlessEqual(b.phasing[key].phasing,
self.failUnlessEqual(g.software, 'GERALD')
self.failUnlessEqual(g.version, '1.171')
self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
# I want to be able to use a simple iterator
- for l in g.lanes.values():
+ for l in list(g.lanes.values()):
self.failUnlessEqual(l.analysis, 'eland_extended')
self.failUnlessEqual(l.read_length, '37')
self.failUnlessEqual(l.use_bases, 'Y'*37)
self.failUnlessEqual(g.software, g2.software)
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
for i in range(1,9):
g2_results.reads)
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
self.failUnlessEqual(l1.lane_id, l2.lane_id)
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 17)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import eland
from htsworkflow.pipelines import ipar
self.failUnlessEqual(g.version, '1.171')
self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
# I want to be able to use a simple iterator
- for l in g.lanes.values():
+ for l in list(g.lanes.values()):
self.failUnlessEqual(l.analysis, 'eland_extended')
self.failUnlessEqual(l.read_length, '37')
self.failUnlessEqual(l.use_bases, 'Y'*37)
# do it all again after extracting from the xml file
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
for i in range(1,9):
if isinstance(g_results, eland.ElandLane):
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
if isinstance(l1, eland.ElandLane):
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 17)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
elif isinstance(l1, eland.SequenceLane):
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import eland
from htsworkflow.pipelines import ipar
self.failUnlessEqual(g.software, 'GERALD')
self.failUnlessEqual(g.version, '1.171')
self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
# I want to be able to use a simple iterator
- for l in g.lanes.values():
+ for l in list(g.lanes.values()):
self.failUnlessEqual(l.analysis, 'eland_extended')
self.failUnlessEqual(l.read_length, '37')
self.failUnlessEqual(l.use_bases, 'Y'*37)
self.failUnlessEqual(g.software, g2.software)
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
for i in range(1,9):
if isinstance(g_results, eland.ElandLane):
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
if isinstance(l1, eland.ElandLane):
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 17)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
elif isinstance(l1, eland.SequenceLane):
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import eland
from htsworkflow.pipelines import ipar
self.failUnlessEqual(g.software, 'GERALD')
self.failUnlessEqual(g.version, '1.171')
self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
# I want to be able to use a simple iterator
- for l in g.lanes.values():
+ for l in list(g.lanes.values()):
self.failUnlessEqual(l.analysis, 'eland_extended')
self.failUnlessEqual(l.read_length, '37')
self.failUnlessEqual(l.use_bases, 'Y'*37)
self.failUnlessEqual(g.software, g2.software)
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
for i in range(1,9):
if isinstance(g_results, eland.ElandLane):
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
if isinstance(l1, eland.ElandLane):
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 7)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
elif isinstance(l1, eland.SequenceLane):
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import tempfile
import shutil
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.pipelines import eland
from htsworkflow.pipelines.samplekey import SampleKey
self.failUnlessEqual(g.software, 'CASAVA')
self.failUnlessEqual(g.version, '1.8.1')
- self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
- self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.keys())))
+ self.failUnlessEqual(len(g.lanes), len(list(g.lanes.items())))
# list of genomes, matches what was defined up in
# make_gerald_config.
self.failUnlessEqual(g.software, g2.software)
self.failUnlessEqual(g.version, g2.version)
self.failUnlessEqual(g.date, g2.date)
- self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
- self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
+ self.failUnlessEqual(len(list(g.lanes.keys())), len(list(g2.lanes.keys())))
+ self.failUnlessEqual(len(list(g.lanes.items())), len(list(g2.lanes.items())))
# test lane specific parameters from gerald config file
- for i in g.lanes.keys():
+ for i in list(g.lanes.keys()):
g_lane = g.lanes[i]
g2_lane = g2.lanes[i]
self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
if isinstance(g_results, eland.ElandLane):
self.failUnlessEqual(len(g_results.mapped_reads),
len(g2_results.mapped_reads))
- for k in g_results.mapped_reads.keys():
+ for k in list(g_results.mapped_reads.keys()):
self.failUnlessEqual(g_results.mapped_reads[k],
g2_results.mapped_reads[k])
self.failUnlessEqual(len(g_results.match_codes),
len(g2_results.match_codes))
- for k in g_results.match_codes.keys():
+ for k in list(g_results.match_codes.keys()):
self.failUnlessEqual(g_results.match_codes[k],
g2_results.match_codes[k])
eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
- for lane in eland_container.values():
+ for lane in list(eland_container.values()):
# I added sequence lanes to the last 2 lanes of this test case
if lane.sample_name == '11113':
self.assertEqual(lane.reads, 24)
if isinstance(l1, eland.ElandLane):
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
self.failUnlessEqual(len(l1.mapped_reads), 1)
- for k in l1.mapped_reads.keys():
+ for k in list(l1.mapped_reads.keys()):
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
self.failUnlessEqual(len(l1.match_codes), 9)
self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
+ for k in list(l1.match_codes.keys()):
self.failUnlessEqual(l1.match_codes[k],
l2.match_codes[k])
elif isinstance(l1, eland.SequenceLane):
self.failIfEqual(r2.gerald, None)
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
-from unittest2 import TestCase, TestSuite, defaultTestLoader
+from unittest import TestCase, TestSuite, defaultTestLoader
from htsworkflow.pipelines import runfolder
class TestRunfolderUtilities(TestCase):
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
#!/usr/bin/env python
"""More direct synthetic test cases for the eland output file processing
"""
-from StringIO import StringIO
-from unittest2 import TestCase
+from io import StringIO
+from unittest import TestCase
from htsworkflow.pipelines.samplekey import SampleKey
self.assertTrue(k3.matches(q3))
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestSampleKey))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import shutil
import tempfile
-from unittest2 import TestCase
+from unittest import TestCase
import RDF
self.assertEqual(f0.filetype, 'srf')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<srf 42BW9AAXX 4 %s>" % (pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '4')
self.assertEqual(f0.filetype, 'qseq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<qseq 42BW9AAXX 4 %s>" %(pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '4')
self.assertEqual(f0.filetype, 'qseq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<qseq ilmn200901 1 %s>" %(pathname,))
self.assertEqual(f0.lane, '1')
self.assertEqual(f0.read, 1)
self.assertEqual(f0.filetype, 'fastq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<fastq 42BW9AAXX 4 %s>" % (pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '4')
self.assertEqual(f0.filetype, 'fastq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<fastq 42BW9AAXX 4 %s>" %(pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '4')
self.assertEqual(f0.filetype, 'split_fastq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<split_fastq 42BW9AAXX 1 %s>" %(pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '1')
self.assertEqual(f0.filetype, 'split_fastq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<split_fastq 42BW9AAXX 1 %s>" % (pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '1')
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(SequenceFileTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
#!/usr/bin/env python
import os
-from StringIO import StringIO
-from unittest2 import TestCase
+from io import StringIO
+from unittest import TestCase
from htsworkflow.pipelines import summary
-from simulate_runfolder import TESTDATA_DIR
+from .simulate_runfolder import TESTDATA_DIR
class SummaryTests(TestCase):
"""Test elements of the summary file parser
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(SummaryTests))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
localhost=127.0.0.1
"""
-import ConfigParser
+import configparser
import logging
import os
import shlex
dest[name] = options.get(section_name, name)
# define your defaults here
-options = ConfigParser.SafeConfigParser()
+options = configparser.SafeConfigParser()
def save_options(filename, options):
try:
ini_stream = open(filename, 'w')
options.write(ini_stream)
ini_stream.close()
- except IOError, e:
+ except IOError as e:
LOGGER.debug("Error saving setting: %s" % (str(e)))
INI_FILE = options.read([os.path.expanduser("~/.htsworkflow.ini"),
options_to_list(options, NOTIFICATION_BCC, 'frontend', 'notification_bcc')
if not options.has_option('frontend', 'database'):
- raise ConfigParser.NoSectionError(
+ raise configparser.NoSectionError(
"Please define [frontend] database=<Section>")
database_section = options.get('frontend', 'database')
if not options.has_section(database_section):
- raise ConfigParser.NoSectionError(
+ raise configparser.NoSectionError(
"No database=<database_section_name> defined")
# 'postgresql_psycopg2', 'postgresql', 'mysql', 'sqlite3' or 'ado_mssql'.
from pprint import pformat,pprint
import sys
import types
-from urlparse import urljoin, urlparse
+from urllib.parse import urljoin, urlparse
from htsworkflow.pipelines.sequences import scan_for_sequences, \
update_model_sequence_library
if pythonpath is not None:
env = "PYTHONPATH=%s" % (pythonpath,)
condor_entries = self.build_condor_arguments(result_map)
- for script_type in template_map.keys():
+ for script_type in list(template_map.keys()):
template = loader.get_template(template_map[script_type])
variables = {'python': sys.executable,
'logdir': self.log_path,
sequences = self.find_archive_sequence_files(result_map)
needed_targets = self.update_fastq_targets(result_map, sequences)
- for target_pathname, available_sources in needed_targets.items():
+ for target_pathname, available_sources in list(needed_targets.items()):
LOGGER.debug(' target : %s' % (target_pathname,))
LOGGER.debug(' candidate sources: %s' % (available_sources,))
- for condor_type in available_sources.keys():
+ for condor_type in list(available_sources.keys()):
conversion = conversion_funcs.get(condor_type, None)
if conversion is None:
errmsg = "Unrecognized type: {0} for {1}"
return results
def import_libraries(self, result_map):
- for lib_id in result_map.keys():
+ for lib_id in list(result_map.keys()):
lib_id_encoded = lib_id.encode('utf-8')
liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
library = RDF.Node(RDF.Uri(liburl))
self.cycle = fromTypedNode(result['cycle'])
self.lane_number = fromTypedNode(result['lane_number'])
self.read = fromTypedNode(result['read'])
- if type(self.read) in types.StringTypes:
+ if type(self.read) in str:
self.read = 1
self.library = result['library']
self.library_id = fromTypedNode(result['library_id'])
if url.scheme == 'file':
return url.path
else:
- errmsg = u"Unsupported scheme {0} for {1}"
- raise ValueError(errmsg.format(url.scheme, unicode(url)))
+ errmsg = "Unsupported scheme {0} for {1}"
+ raise ValueError(errmsg.format(url.scheme, str(url)))
path = property(_get_path)
def __repr__(self):
from pprint import pformat
import re
import string
-from StringIO import StringIO
+from io import StringIO
import types
-import urlparse
+import urllib.parse
import RDF
from htsworkflow.util.rdfhelp import \
returns length of string if it can't find anything
"""
- for i in xrange(start, len(line)):
+ for i in range(start, len(line)):
if line[i] not in string.whitespace:
return i
returns length of string if nothing matches
"""
- for i in xrange(start, len(line)):
+ for i in range(start, len(line)):
if line[i] in string.whitespace:
return i
def get_view_namespace(submission_uri):
submission_uri = submission_uri_to_string(submission_uri)
- view_uri = urlparse.urljoin(submission_uri, 'view/')
+ view_uri = urllib.parse.urljoin(submission_uri, 'view/')
viewNS = RDF.NS(view_uri)
return viewNS
def scan_submission_dirs(self, result_map):
"""Examine files in our result directory
"""
- for lib_id, result_dir in result_map.items():
+ for lib_id, result_dir in list(result_map.items()):
LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
try:
self.import_submission_dir(result_dir, lib_id)
- except MetadataLookupException, e:
+ except MetadataLookupException as e:
LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
def import_submission_dir(self, submission_dir, library_id):
self.__view_map = self._get_filename_view_map()
results = []
- for pattern, view in self.__view_map.items():
+ for pattern, view in list(self.__view_map.items()):
if re.match(pattern, filename):
results.append(view)
LOGGER.debug("Found: %s" % (literal_re,))
try:
filename_re = re.compile(literal_re)
- except re.error, e:
+ except re.error as e:
LOGGER.error("Unable to compile: %s" % (literal_re,))
patterns[literal_re] = view_name
return patterns
base_daf = self.daf_name
- for result_dir in result_map.values():
+ for result_dir in list(result_map.values()):
if not os.path.exists(result_dir):
raise RuntimeError(
"Couldn't find target directory %s" %(result_dir,))
if self.is_paired and self['read'] is None:
return False
- for k in self.keys():
+ for k in list(self.keys()):
if k == 'read':
continue
if self[k] is None:
series = self.get_series_metadata()
series_attribs = dict(series)
series_id = series_attribs['^series']
- for lib_id, result_dir in result_map.items():
+ for lib_id, result_dir in list(result_map.items()):
an_analysis = self.get_submission_node(result_dir)
metadata = self.get_sample_metadata(an_analysis)
if len(metadata) == 0:
'platform_id': platform_id,
'series_id': series_id,
})
- print str(soft_template.render(context))
+ print(str(soft_template.render(context)))
def check_for_name(self, analysis_node):
name = fromTypedNode(
lanes = {}
for row in self.execute_query(query_template, context):
data = {}
- for k, v in row.items():
+ for k, v in list(row.items()):
data[k] = v
library = str(data['library'])
lanes.setdefault(library, []).append(data)
result = []
- for library, files in lanes.items():
+ for library, files in list(lanes.items()):
if len(files) > 2:
errmsg = "Don't know what to do with more than 2 raw files"
raise ValueError(errmsg)
from optparse import OptionParser
import os
import RDF
-import urllib
+import urllib.request, urllib.parse, urllib.error
from htsworkflow.util.rdfhelp import get_model, dump_model
search = {'db': database,
'term': term,
'retmax': return_max}
- tree = parse(ESEARCH_URL + urllib.urlencode(search))
+ tree = parse(ESEARCH_URL + urllib.parse.urlencode(search))
root = tree.getroot()
count = get_node_scalar(root, '/eSearchResult/Count', int)
retmax_node = get_node_scalar(root, '/eSearchResult/RetMax', int)
"""
search = {'db':DB,
'id': ncbi_id}
- url = EFETCH_URL + urllib.urlencode(search)
+ url = EFETCH_URL + urllib.parse.urlencode(search)
tree = parse(url)
context = Context()
def scan_submission_dirs(self, result_map):
"""Examine files in our result directory
"""
- for lib_id, result_dir in result_map.items():
+ for lib_id, result_dir in list(result_map.items()):
LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
try:
self.import_analysis_dir(result_dir, lib_id)
- except MetadataLookupException, e:
+ except MetadataLookupException as e:
LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
def import_analysis_dir(self, analysis_dir, library_id):
def analysis_nodes(self, result_map):
"""Return an iterable of analysis nodes
"""
- for result_dir in result_map.values():
+ for result_dir in list(result_map.values()):
an_analysis = self.get_submission_node(result_dir)
yield an_analysis
LOGGER.debug("Importing %s" % (lane.uri,))
try:
parser.parse_into_model(self.model, lane.uri)
- except RDF.RedlandError, e:
+ except RDF.RedlandError as e:
LOGGER.error("Error accessing %s" % (lane.uri,))
raise e
self.__view_map = self._get_filename_view_map()
results = []
- for pattern, view in self.__view_map.items():
+ for pattern, view in list(self.__view_map.items()):
if re.match(pattern, filename):
results.append(view)
LOGGER.debug("Found: %s" % (literal_re,))
try:
filename_re = re.compile(literal_re)
- except re.error, e:
+ except re.error as e:
LOGGER.error("Unable to compile: %s" % (literal_re,))
patterns[literal_re] = view_name
return patterns
results = []
for record in rdfstream:
d = {}
- for key, value in record.items():
+ for key, value in list(record.items()):
d[key] = fromTypedNode(value)
results.append(d)
return results
stream.write('testfile')
self.result_map = ResultMap()
- for lib_id in [u'11154', u'12345']:
+ for lib_id in ['11154', '12345']:
subname = 'sub-%s' % (lib_id,)
sub_dir = os.path.join(self.tempdir, subname)
os.mkdir(sub_dir)
seqs = self.extract.find_archive_sequence_files(self.result_map)
expected = set([
- (u'11154', u'42JUYAAXX', '5', 1, 76, True, 'qseq'),
- (u'11154', u'42JUYAAXX', '5', 2, 76, True, 'qseq'),
- (u'11154', u'61MJTAAXX', '6', 1, 76, False, 'qseq'),
- (u'11154', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
- (u'11154', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
- (u'11154', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
- (u'11154', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
- (u'12345', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
- (u'12345', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
- (u'12345', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
- (u'12345', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
- (u'12345', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
- (u'12345', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
- (u'11154', u'30221AAXX', '4', 1, 33, False, 'srf'),
- (u'11154', u'30DY0AAXX', '8', 1, 151, True, 'srf')
+ ('11154', '42JUYAAXX', '5', 1, 76, True, 'qseq'),
+ ('11154', '42JUYAAXX', '5', 2, 76, True, 'qseq'),
+ ('11154', '61MJTAAXX', '6', 1, 76, False, 'qseq'),
+ ('11154', 'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
+ ('11154', 'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
+ ('11154', 'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
+ ('11154', 'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
+ ('12345', 'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
+ ('12345', 'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
+ ('12345', 'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
+ ('12345', 'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
+ ('12345', 'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
+ ('12345', 'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
+ ('11154', '30221AAXX', '4', 1, 33, False, 'srf'),
+ ('11154', '30DY0AAXX', '8', 1, 151, True, 'srf')
])
found = set([(l.library_id, l.flowcell_id, l.lane_number, l.read, l.cycle, l.ispaired, l.filetype) for l in seqs])
self.assertEqual(expected, found)
lib_db)
self.assertEqual(len(needed_targets), 9)
srf_30221 = needed_targets[
- self.result_map['11154'] + u'/11154_30221AAXX_c33_l4.fastq']
+ self.result_map['11154'] + '/11154_30221AAXX_c33_l4.fastq']
qseq_42JUY_r1 = needed_targets[
- self.result_map['11154'] + u'/11154_42JUYAAXX_c76_l5_r1.fastq']
+ self.result_map['11154'] + '/11154_42JUYAAXX_c76_l5_r1.fastq']
qseq_42JUY_r2 = needed_targets[
- self.result_map['11154'] + u'/11154_42JUYAAXX_c76_l5_r2.fastq']
+ self.result_map['11154'] + '/11154_42JUYAAXX_c76_l5_r2.fastq']
qseq_61MJT = needed_targets[
- self.result_map['11154'] + u'/11154_61MJTAAXX_c76_l6.fastq']
+ self.result_map['11154'] + '/11154_61MJTAAXX_c76_l6.fastq']
split_C02F9_r1 = needed_targets[
- self.result_map['11154'] + u'/11154_C02F9ACXX_c202_l3_r1.fastq']
+ self.result_map['11154'] + '/11154_C02F9ACXX_c202_l3_r1.fastq']
split_C02F9_r2 = needed_targets[
- self.result_map['11154'] + u'/11154_C02F9ACXX_c202_l3_r2.fastq']
+ self.result_map['11154'] + '/11154_C02F9ACXX_c202_l3_r2.fastq']
self.assertEqual(len(srf_30221['srf']), 1)
self.assertEqual(len(qseq_42JUY_r1['qseq']), 1)
'11154_30221AAXX_c33_l4.fastq'): {
'mid': None,
'ispaired': False,
- 'sources': [u'woldlab_090425_HWI-EAS229_0110_30221AAXX_4.srf'],
- 'flowcell': u'30221AAXX',
+ 'sources': ['woldlab_090425_HWI-EAS229_0110_30221AAXX_4.srf'],
+ 'flowcell': '30221AAXX',
'target': os.path.join(self.result_map['11154'],
- u'11154_30221AAXX_c33_l4.fastq'),
+ '11154_30221AAXX_c33_l4.fastq'),
},
os.path.join(self.result_map['11154'],
'11154_30DY0AAXX_c151_l8_r1.fastq'): {
'mid': None,
'ispaired': True,
- 'flowcell': u'30DY0AAXX',
- 'sources': [u'woldlab_090725_HWI-EAS229_0110_30DY0AAXX_8.srf'],
+ 'flowcell': '30DY0AAXX',
+ 'sources': ['woldlab_090725_HWI-EAS229_0110_30DY0AAXX_8.srf'],
'mid': 76,
'target':
os.path.join(self.result_map['11154'],
- u'11154_30DY0AAXX_c151_l8_r1.fastq'),
+ '11154_30DY0AAXX_c151_l8_r1.fastq'),
'target_right':
os.path.join(self.result_map['11154'],
- u'11154_30DY0AAXX_c151_l8_r2.fastq'),
+ '11154_30DY0AAXX_c151_l8_r2.fastq'),
}
}
for args in srf:
'istar': True,
'ispaired': True,
'sources': [
- u'woldlab_100826_HSI-123_0001_42JUYAAXX_l5_r1.tar.bz2']
+ 'woldlab_100826_HSI-123_0001_42JUYAAXX_l5_r1.tar.bz2']
},
os.path.join(self.result_map['11154'],
'11154_42JUYAAXX_c76_l5_r2.fastq'): {
'istar': True,
'ispaired': True,
'sources': [
- u'woldlab_100826_HSI-123_0001_42JUYAAXX_l5_r2.tar.bz2']
+ 'woldlab_100826_HSI-123_0001_42JUYAAXX_l5_r2.tar.bz2']
},
os.path.join(self.result_map['11154'],
'11154_61MJTAAXX_c76_l6.fastq'): {
'istar': True,
'ispaired': False,
'sources': [
- u'woldlab_100826_HSI-123_0001_61MJTAAXX_l6_r1.tar.bz2'],
+ 'woldlab_100826_HSI-123_0001_61MJTAAXX_l6_r1.tar.bz2'],
},
}
for args in qseq:
split_test = dict((( x['target'], x) for x in
- [{'sources': [u'11154_NoIndex_L003_R1_001.fastq.gz',
- u'11154_NoIndex_L003_R1_002.fastq.gz'],
+ [{'sources': ['11154_NoIndex_L003_R1_001.fastq.gz',
+ '11154_NoIndex_L003_R1_002.fastq.gz'],
'pyscript': 'desplit_fastq.pyc',
- 'target': u'11154_C02F9ACXX_c202_l3_r1.fastq'},
- {'sources': [u'11154_NoIndex_L003_R2_001.fastq.gz',
- u'11154_NoIndex_L003_R2_002.fastq.gz'],
+ 'target': '11154_C02F9ACXX_c202_l3_r1.fastq'},
+ {'sources': ['11154_NoIndex_L003_R2_001.fastq.gz',
+ '11154_NoIndex_L003_R2_002.fastq.gz'],
'pyscript': 'desplit_fastq.pyc',
- 'target': u'11154_C02F9ACXX_c202_l3_r2.fastq'},
- {'sources': [u'12345_CGATGT_L003_R1_001.fastq.gz',
- u'12345_CGATGT_L003_R1_002.fastq.gz',
- u'12345_CGATGT_L003_R1_003.fastq.gz',
+ 'target': '11154_C02F9ACXX_c202_l3_r2.fastq'},
+ {'sources': ['12345_CGATGT_L003_R1_001.fastq.gz',
+ '12345_CGATGT_L003_R1_002.fastq.gz',
+ '12345_CGATGT_L003_R1_003.fastq.gz',
],
'pyscript': 'desplit_fastq.pyc',
- 'target': u'12345_C02F9ACXX_c202_l3_r1.fastq'},
- {'sources': [u'12345_CGATGT_L003_R2_001.fastq.gz',
- u'12345_CGATGT_L003_R2_002.fastq.gz',
- u'12345_CGATGT_L003_R2_003.fastq.gz',
+ 'target': '12345_C02F9ACXX_c202_l3_r1.fastq'},
+ {'sources': ['12345_CGATGT_L003_R2_001.fastq.gz',
+ '12345_CGATGT_L003_R2_002.fastq.gz',
+ '12345_CGATGT_L003_R2_003.fastq.gz',
],
'pyscript': 'desplit_fastq.pyc',
- 'target': u'12345_C02F9ACXX_c202_l3_r2.fastq'}
+ 'target': '12345_C02F9ACXX_c202_l3_r2.fastq'}
]
))
for arg in split:
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestCondorFastq))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest='suite')
from contextlib import contextmanager
import os
-from StringIO import StringIO
+from io import StringIO
import shutil
import tempfile
-from unittest2 import TestCase, TestSuite, defaultTestLoader
+from unittest import TestCase, TestSuite, defaultTestLoader
from htsworkflow.submission import daf, results
from htsworkflow.util.rdfhelp import \
signal_view_node, None, None)))
self.failUnlessEqual(len(statements), 6)
name = model.get_target(signal_view_node, dafTermOntology['name'])
- self.failUnlessEqual(fromTypedNode(name), u'Signal')
+ self.failUnlessEqual(fromTypedNode(name), 'Signal')
def test_get_view_namespace_from_string(self):
url = "http://jumpgate.caltech.edu/wiki/SubmissionLog/cursub/"
def dump_model(model):
writer = get_serializer()
turtle = writer.serialize_model_to_string(model)
- print turtle
+ print(turtle)
class TestUCSCSubmission(TestCase):
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest='suite')
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.submission.fastqname import FastqName
class TestFastqName(TestCase):
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestFastqName))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest='suite')
from pprint import pprint
import shutil
-from unittest2 import TestCase, defaultTestLoader
+from unittest import TestCase, defaultTestLoader
from htsworkflow.submission.results import ResultMap
-from submission_test_common import *
+from .submission_test_common import *
def generate_sample_results_tree(obj):
obj.tempdir = tempfile.mkdtemp(prefix="results_test")
results['2000'] = 'dir2000'
results['1500'] = 'dir1500'
- self.failUnlessEqual(results.keys(), ['1000', '2000', '1500'])
+ self.failUnlessEqual(list(results.keys()), ['1000', '2000', '1500'])
self.failUnlessEqual(list(results.values()),
['dir1000', 'dir2000', 'dir1500'])
self.failUnlessEqual(list(results.items()),
self.failUnlessEqual(results['1500'], 'dir1500')
self.failUnlessEqual(results['2000'], 'dir2000')
- self.assertTrue(u'2000' in results)
self.assertTrue('2000' in results)
- self.assertFalse(u'77777' in results)
+ self.assertTrue('2000' in results)
+ self.assertFalse('77777' in results)
self.assertFalse('77777' in results)
def test_make_from_absolute(self):
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.DEBUG)
- from unittest2 import main
+ from unittest import main
main(defaultTest='suite')
import os
-from StringIO import StringIO
+from io import StringIO
import shutil
import tempfile
-from unittest2 import TestCase, TestSuite, defaultTestLoader
+from unittest import TestCase, TestSuite, defaultTestLoader
from htsworkflow.submission import daf, results
from htsworkflow.util.rdfhelp import \
get_serializer
from htsworkflow.submission.submission import list_submissions, Submission
from htsworkflow.submission.results import ResultMap
-from submission_test_common import *
+from .submission_test_common import *
import RDF
#import logging
"""
map = ResultMap()
- print self.tempdir
- print os.listdir(self.tempdir)
+ print(self.tempdir)
+ print(os.listdir(self.tempdir))
map['1000'] = os.path.join(self.tempdir, S1_NAME)
map['2000'] = os.path.join(self.tempdir, S2_NAME)
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest='suite')
-from unittest2 import TestCase, TestSuite, defaultTestLoader
-from StringIO import StringIO
+from unittest import TestCase, TestSuite, defaultTestLoader
+from io import StringIO
from htsworkflow.submission import ucsc
file_index = ucsc.parse_ucsc_file_index(stream, 'http://example.com/files')
self.assertEquals(len(file_index), 2)
- for attributes in file_index.values():
+ for attributes in list(file_index.values()):
self.failUnless('subId' in attributes)
self.failUnless('project' in attributes)
self.assertEquals(attributes['project'], 'wgEncode')
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest='suite')
self.baseurl = os.path.join(baseurl, self.name)
if baseupload:
sshurl = parse_ssh_url(baseupload)
- print sshurl
+ print(sshurl)
self.user = sshurl.user
self.host = sshurl.host
self.uploadpath = sshurl.path
subgroups.append(definitions)
names.append(name)
sortorder.append("{}=+".format(name))
- d = dimnames.next()
+ d = next(dimnames)
dimensions.append("{}={}".format(d, name))
filtercomposite.append("{}=multi".format(d))
"""Utilities for extracting information from the ENCODE DCC
"""
import logging
-import urlparse
-import urllib2
+import urllib.parse
+import urllib.request, urllib.error, urllib.parse
LOGGER = logging.getLogger(__name__)
'http://encodesubmit.ucsc.edu/pipeline/download_ddf/1234'
"""
fragment = 'download_ddf/%s' % (submission_id,)
- return urlparse.urljoin(UCSCEncodePipeline, fragment)
+ return urllib.parse.urljoin(UCSCEncodePipeline, fragment)
def daf_download_url(submission_id):
'http://encodesubmit.ucsc.edu/pipeline/download_daf/1234'
"""
fragment = 'download_daf/%s' % (submission_id,)
- return urlparse.urljoin(UCSCEncodePipeline, fragment)
+ return urllib.parse.urljoin(UCSCEncodePipeline, fragment)
def submission_view_url(submission_id):
'http://encodesubmit.ucsc.edu/pipeline/show/1234'
"""
fragment = 'show/%s' % (submission_id,)
- return urlparse.urljoin(UCSCEncodePipeline, fragment)
+ return urllib.parse.urljoin(UCSCEncodePipeline, fragment)
def get_encodedcc_file_index(genome, composite):
request_url = base_url + 'files.txt'
try:
- request = urllib2.urlopen(request_url)
+ request = urllib.request.urlopen(request_url)
file_index = parse_ucsc_file_index(request, base_url)
return file_index
- except urllib2.HTTPError, e:
+ except urllib.error.HTTPError as e:
err = e
pass
return a list of numbers and non-numeric substrings of +str+
the numeric substrings are converted to integer, non-numeric are left as is
"""
- if type(str) in types.StringTypes:
+ if type(str) in str:
chunks = re.findall("(\d+|\D+)",str)
#convert numeric strings to numbers
chunks = [re.match('\d',x) and int(x) or x for x in chunks]
return chunks
- elif type(str) in [types.IntType, types.LongType, types.FloatType]:
+ elif type(str) in [int, int, float]:
return [str]
else:
raise ValueError("Unsupported type %s for input %s" % (type(str), str))
"""Common functions for accessing the HTS Workflow REST API
"""
import base64
-from ConfigParser import SafeConfigParser
+from configparser import SafeConfigParser
import random
import logging
import os
from optparse import OptionGroup
-import urllib
-import urllib2
-import urlparse
+import urllib.request, urllib.parse, urllib.error
+import urllib.request, urllib.error, urllib.parse
+import urllib.parse
LOGGER = logging.getLogger(__name__)
"""
url_fragment = '/samples/library/%s/json' % (library_id,)
- url = urlparse.urljoin(root_url, url_fragment)
+ url = urllib.parse.urljoin(root_url, url_fragment)
return url
http://localhost/experiments/config/1234AAXX/json
"""
url_fragment = '/experiments/config/%s/json' % (flowcell_id,)
- url = urlparse.urljoin(root_url, url_fragment)
+ url = urllib.parse.urljoin(root_url, url_fragment)
return url
"""
url_fragment = '/lanes_for/%s/json' % (username,)
- url = urlparse.urljoin(root_url, url_fragment)
+ url = urllib.parse.urljoin(root_url, url_fragment)
return url
Return a dictionary from the HTSworkflow API
"""
try:
- apipayload = urllib.urlencode(apidata)
- web = urllib2.urlopen(url, apipayload)
- except urllib2.URLError, e:
+ apipayload = urllib.parse.urlencode(apidata)
+ web = urllib.request.urlopen(url, apipayload)
+ except urllib.error.URLError as e:
if hasattr(e, 'code') and e.code == 404:
LOGGER.info("%s was not found" % (url,))
return None
"""return key suitable for use as secret key"""
try:
source = random.SystemRandom()
- except AttributeError, e:
+ except AttributeError as e:
source = random.random()
bits = source.getrandbits(size)
chars = []
if value is None:
return None
else:
- return unicode(value)
+ return str(value)
def parse_flowcell_id(flowcell_id):
"""
# extract just the field name
description = [ f[0] for f in c.description]
for row in c:
- row_dict = dict(zip(description, row))
+ row_dict = dict(list(zip(description, row)))
table[row_dict[pkey_name]] = row_dict
c.close()
return table
"""
library_id_re = re.compile('lane_\d_library_id')
- for fc_id, fc in self.flowcells.items():
- lane_library = [ (x[0][5], x[1]) for x in fc.items()
+ for fc_id, fc in list(self.flowcells.items()):
+ lane_library = [ (x[0][5], x[1]) for x in list(fc.items())
if library_id_re.match(x[0]) ]
for lane, library_id in lane_library:
- if not self.library[library_id].has_key('lanes'):
+ if 'lanes' not in self.library[library_id]:
self.library[library_id]['lanes'] = []
self.library[library_id]['lanes'].append((fc_id, lane))
# extract just the field name
description = [ f[0] for f in c.description ]
for row in c:
- row_dict = dict(zip(description, row))
+ row_dict = dict(list(zip(description, row)))
fcid, status = self._parse_flowcell_id(row_dict)
row_dict['flowcell_id'] = fcid
row_dict['flowcell_status'] = status
# sort flowcells by run date
flowcell_list = []
- for key, cell in flowcells.items():
+ for key, cell in list(flowcells.items()):
flowcell_list.append( (cell['run_date'], key) )
flowcell_list.sort()
# the 2nd of which is the serial number
return data.strip('\x00').split()[1]
-except ImportError, e:
- print >>sys.stderr, "hdquery requires py_sg"
+except ImportError as e:
+ print("hdquery requires py_sg", file=sys.stderr)
def get_hd_serial_num(device):
raise NotImplemented('get_hd_serial_num is not available for anything other than linux')
import gzip
import bz2
import types
-import urllib2
+import urllib.request, urllib.error, urllib.parse
def isfilelike(file_ref, mode):
"""Does file_ref have the core file operations?
elif isfilelike(file_ref, mode):
return file_ref
elif isurllike(file_ref, mode):
- return urllib2.urlopen(file_ref)
+ return urllib.request.urlopen(file_ref)
elif os.path.splitext(file_ref)[1] == ".gz":
return gzip.open(file_ref, mode)
elif os.path.splitext(file_ref)[1] == '.bz2':
# build a list of file descriptors
# fds=file desciptors
- fds = [ x.stdout for x in self.running.values()]
+ fds = [ x.stdout for x in list(self.running.values())]
# wait for something to finish
# wl= write list, xl=exception list (not used so get bad names)
import collections
from datetime import datetime
from glob import glob
-from urlparse import urlparse, urlunparse
-from urllib2 import urlopen
+from urllib.parse import urlparse, urlunparse
+from urllib.request import urlopen
import logging
import os
import sys
"""A very simple display of sparql query results showing name value pairs
"""
for row in results:
- for k, v in row.items()[::-1]:
- print "{0}: {1}".format(k, v)
- print
+ for k, v in list(row.items())[::-1]:
+ print("{0}: {1}".format(k, v))
+ print()
def html_query_results(result_stream):
from django.conf import settings
for row in result_stream:
new_row = collections.OrderedDict()
row_urls = []
- for k,v in row.items():
+ for k,v in list(row.items()):
new_row[k] = Simplified(v)
results.append(new_row)
context = Context({'results': results,})
- print template.render(context)
+ print(template.render(context))
def blankOrUri(value=None):
"""Return a blank node for None or a resource node for strings.
node = None
if value is None:
node = RDF.Node()
- elif type(value) in types.StringTypes:
+ elif type(value) in str:
node = RDF.Node(uri_string=value)
elif isinstance(value, RDF.Node):
node = value
def toTypedNode(value, language="en"):
"""Convert a python variable to a RDF Node with its closest xsd type
"""
- if type(value) == types.BooleanType:
+ if type(value) == bool:
value_type = xsdNS['boolean'].uri
if value:
- value = u'1'
+ value = '1'
else:
- value = u'0'
- elif type(value) in (types.IntType, types.LongType):
+ value = '0'
+ elif type(value) in (int, int):
value_type = xsdNS['decimal'].uri
- value = unicode(value)
- elif type(value) == types.FloatType:
+ value = str(value)
+ elif type(value) == float:
value_type = xsdNS['float'].uri
- value = unicode(value)
+ value = str(value)
elif isinstance(value, datetime):
value_type = xsdNS['dateTime'].uri
if value.microsecond == 0:
value = value.strftime(ISOFORMAT_MS)
else:
value_type = None
- value = unicode(value)
+ value = str(value)
if value_type is not None:
node = RDF.Node(literal=value, datatype=value_type)
else:
- node = RDF.Node(literal=unicode(value).encode('utf-8'), language=language)
+ node = RDF.Node(literal=str(value).encode('utf-8'), language=language)
return node
elif value_type in ('dateTime'):
try:
return datetime.strptime(literal, ISOFORMAT_MS)
- except ValueError, _:
+ except ValueError as _:
return datetime.strptime(literal, ISOFORMAT_SHORT)
return literal
def load_into_model(model, parser_name, path, ns=None):
- if type(ns) in types.StringTypes:
+ if type(ns) in str:
ns = RDF.Uri(ns)
if isinstance(path, RDF.Node):
retries -= 1
statements = rdf_parser.parse_as_stream(url, ns)
retries = 0
- except RDF.RedlandError, e:
+ except RDF.RedlandError as e:
errmsg = "RDF.RedlandError: {0} {1} tries remaining"
logger.error(errmsg.format(str(e), retries))
def fixup_namespace(ns):
if ns is None:
ns = RDF.Uri("http://localhost/")
- elif type(ns) in types.StringTypes:
+ elif type(ns) in str:
ns = RDF.Uri(ns)
elif not(isinstance(ns, RDF.Uri)):
errmsg = "Namespace should be string or uri not {0}"
add_schema(model, schema, namespace)
if schema_path:
- if type(schema_path) in types.StringTypes:
+ if type(schema_path) in str:
schema_path = [schema_path]
for path in schema_path:
import copy
import os
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.util.alphanum import alphanum
scratch = copy.copy(unsorted)
scratch.sort(alphanum)
- for i in xrange(len(scratch)):
+ for i in range(len(scratch)):
self.failIfEqual(scratch[i], unsorted[i])
- for i in xrange(len(scratch)):
+ for i in range(len(scratch)):
self.failUnlessEqual(scratch[i], sorted[i])
def test_numbers(self):
scratch = copy.copy(unsorted)
scratch.sort(alphanum)
- for i in xrange(len(scratch)):
+ for i in range(len(scratch)):
self.failIfEqual(scratch[i], unsorted[i])
- for i in xrange(len(scratch)):
+ for i in range(len(scratch)):
self.failUnlessEqual(scratch[i], sorted[i])
def test_long_names(self):
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testAlphanum))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import copy
import os
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.util import api
self.failUnless(k1 != k2)
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestApi))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
#!/usr/bin/env python
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.util import conversion
self.failUnlessEqual(s.stop, 2)
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestConversion))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
-from unittest2 import TestCase
+from unittest import TestCase
try:
from xml.etree import ElementTree
-except ImportError, e:
+except ImportError as e:
from elementtree import ElementTree
from htsworkflow.util.ethelp import indent, flatten
self.failUnless(flatten(self.foo_tree), 'asdf')
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testETHelper))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
-from StringIO import StringIO
-from unittest2 import TestCase
+from io import StringIO
+from unittest import TestCase
from htsworkflow.util import makebed
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testMakeBed))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import logging
import time
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.util.queuecommands import QueueCommands
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testQueueCommands))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
import types
-from unittest2 import TestCase
+from unittest import TestCase
from datetime import datetime
def test_typed_node_boolean(self):
node = toTypedNode(True)
- self.assertIn(node.literal_value['string'], (u'1', u'true'))
+ self.assertIn(node.literal_value['string'], ('1', 'true'))
self.assertEqual(str(node.literal_value['datatype']),
'http://www.w3.org/2001/XMLSchema#boolean')
def test_typed_node_string(self):
node = toTypedNode('hello')
- self.assertEqual(node.literal_value['string'], u'hello')
+ self.assertEqual(node.literal_value['string'], 'hello')
self.assertTrue(node.literal_value['datatype'] is None)
def test_typed_real_like(self):
s = "Argh matey"
node = toTypedNode(s)
self.assertEqual(fromTypedNode(node), s)
- self.assertEqual(type(fromTypedNode(node)), types.UnicodeType)
+ self.assertEqual(type(fromTypedNode(node)), str)
def test_blank_or_uri_blank(self):
node = blankOrUri()
self.assertEqual(node, s)
def test_unicode_node_roundtrip(self):
- literal = u'\u5927'
+ literal = '\u5927'
roundtrip = fromTypedNode(toTypedNode(literal))
self.assertEqual(roundtrip, literal)
- self.assertEqual(type(roundtrip), types.UnicodeType)
+ self.assertEqual(type(roundtrip), str)
def test_datetime_no_microsecond(self):
dateTimeType = xsdNS['dateTime'].uri
self.assertTrue(model.contains_statement(s))
-except ImportError, e:
- print "Unable to test rdfhelp"
+except ImportError as e:
+ print("Unable to test rdfhelp")
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestRDFHelp))
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestRDFSchemas))
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
-from unittest2 import TestCase
+from unittest import TestCase
import RDF
inference = Infer(self.model)
errmsg = list(inference._validate_property_types())
- print errmsg
+ print(errmsg)
self.failUnlessEqual(len(errmsg), 0)
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestInfer))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.util.url import normalize_url, parse_ssh_url
self.assertRaises(ValueError, parse_ssh_url, 'hello')
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestURLUtilities))
return suite
if __name__ == '__main__':
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
-from StringIO import StringIO
-from unittest2 import TestCase
+from io import StringIO
+from unittest import TestCase
from htsworkflow.util import validate
class TestValidate(TestCase):
def test_phred33_works(self):
- q = StringIO(u"@ abc\nAGCT\n+\nBBBB\n")
+ q = StringIO("@ abc\nAGCT\n+\nBBBB\n")
errors = validate.validate_fastq(q)
self.failUnlessEqual(0, errors)
def test_phred64_works(self):
- q = StringIO(u"@ abc\nAGCT\n+\nfgh]\n")
+ q = StringIO("@ abc\nAGCT\n+\nfgh]\n")
errors = validate.validate_fastq(q, 'phred64')
self.failUnlessEqual(0, errors)
def test_fasta_fails(self):
- q = StringIO(u">abc\nAGCT\n>foo\nCGAT\n")
+ q = StringIO(">abc\nAGCT\n>foo\nCGAT\n")
errors = validate.validate_fastq(q)
self.failUnlessEqual(3, errors)
def test_fastq_diff_length_uniform(self):
- q = StringIO(u"@ abc\nAGCT\n+\nBBBB\n@ abcd\nAGCTT\n+\nJJJJJ\n")
+ q = StringIO("@ abc\nAGCT\n+\nBBBB\n@ abcd\nAGCTT\n+\nJJJJJ\n")
errors = validate.validate_fastq(q, 'phred33', True)
self.failUnlessEqual(2, errors)
def test_fastq_diff_length_variable(self):
- q = StringIO(u"@ abc\nAGCT\n+\n@@@@\n@ abcd\nAGCTT\n+\nJJJJJ\n")
+ q = StringIO("@ abc\nAGCT\n+\n@@@@\n@ abcd\nAGCTT\n+\nJJJJJ\n")
errors = validate.validate_fastq(q, 'phred33', False)
self.failUnlessEqual(0, errors)
def test_fastq_qual_short(self):
- q = StringIO(u"@ abc\nAGCT\n+\nJJ\n")
+ q = StringIO("@ abc\nAGCT\n+\nJJ\n")
errors = validate.validate_fastq(q)
self.failUnlessEqual(1, errors)
def test_fastq_seq_invalid_char(self):
- q = StringIO(u"@ abc\nAGC\u1310\n+\nEFGH\n")
+ q = StringIO("@ abc\nAGC\u1310\n+\nEFGH\n")
errors = validate.validate_fastq(q)
self.failUnlessEqual(1, errors)
def test_fastq_qual_invalid_char(self):
- q = StringIO(u"+ abc\nAGC.\n+\n!@#J\n")
+ q = StringIO("+ abc\nAGC.\n+\n!@#J\n")
errors = validate.validate_fastq(q)
self.failUnlessEqual(1, errors)
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testValidate))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
-from unittest2 import TestCase
+from unittest import TestCase
from htsworkflow.util import version
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTest(defaultTestLoader.loadTestsFromTestCase(TestVersion))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
opts.uniform_lengths,
opts.max_errors)
if errors > 0:
- print "%s failed validation" % (filename,)
+ print("%s failed validation" % (filename,))
error_happened = True
stream.close()
def validate_re(pattern, line, line_number, errmsg):
if pattern.match(line) is None:
- print errmsg, "[%d]: %s" % (line_number, line)
+ print(errmsg, "[%d]: %s" % (line_number, line))
return 1
else:
return 0
if line_length is None:
line_length = len(line)
elif len(line) != line_length:
- print errmsg, "%d: %s" %(line_number, line)
+ print(errmsg, "%d: %s" %(line_number, line))
error_count = 1
return line_length, error_count
version = None
try:
import pkg_resources
- except ImportError, e:
+ except ImportError as e:
LOGGER.error("Can't find version number, please install setuptools")
raise e
try:
version = pkg_resources.get_distribution("htsworkflow")
- except pkg_resources.DistributionNotFound, e:
+ except pkg_resources.DistributionNotFound as e:
LOGGER.error("Package not installed")
return version
if not dry_run: os.mkdir(output_dir)
processes = []
- for lane_id, lane_param in g.lanes.items():
+ for lane_id, lane_param in list(g.lanes.items()):
eland = g.eland_results[lane_id]
inpathname = eland.pathname
-from unittest2 import TestCase
+from unittest import TestCase
-from StringIO import StringIO
+from io import StringIO
from htsworkflow.automation import copier
from htsworkflow.automation.solexa import is_runfolder
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testCopier))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
import os
-from StringIO import StringIO
+from io import StringIO
import sys
-from unittest2 import TestCase
+from unittest import TestCase
_module_path, _module_name = os.path.split(__file__)
sys.path.append(os.path.join(_module_path, '..', 'scripts'))
def suite():
- from unittest2 import TestSuite, defaultTestLoader
+ from unittest import TestSuite, defaultTestLoader
suite = TestSuite()
suite.addTests(defaultTestLoader.loadTestsFromTestCase(testSrf2Fastq))
return suite
if __name__ == "__main__":
- from unittest2 import main
+ from unittest import main
main(defaultTest="suite")
"""
symbols = "abcdefhijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
name = []
- for i in xrange(length):
+ for i in range(length):
name.append(random.choice(symbols))
return "".join(name)
difference = experimental_set - theoretical_set
issame = (len(difference) == 0)
if verbose and not issame:
- print difference
+ print(difference)
return issame