+from __future__ import unicode_literals
+
from django.contrib import admin
from .models import KeywordMap, Printer
+from __future__ import unicode_literals
+
from django import forms
class BarcodeMagicForm(forms.Form):
+from __future__ import unicode_literals
+
from django.db import models
#FIXME: Should be made more generic and probably pre-populated supported list
label_height = models.FloatField(help_text='height in inches')
notes = models.TextField()
- def __unicode__(self):
- return u'%s, %s, %s, %s, %sx%s' % (self.name, self.model, self.ip_address, self.label_shape, self.label_width, self.label_width)
\ No newline at end of file
+ def __str__(self):
+ return '%s, %s, %s, %s, %sx%s' % (self.name, self.model, self.ip_address, self.label_shape, self.label_width, self.label_width)
+from __future__ import unicode_literals
+
BCM_PLUGINS = {
#'cmd_move_sample': bcm_cmds.cmd_move_sample
}
+from __future__ import unicode_literals
+
from django.conf.urls import patterns
urlpatterns = patterns('',
+from __future__ import unicode_literals
+
from django.conf import settings
import ftplib
import socket
-import StringIO
+from six.moves import StringIO
def print_zpl(zpl_text, host=None):
+from __future__ import unicode_literals
+
from django.http import HttpResponse
from django.template import RequestContext, Template, Context
from django.shortcuts import render_to_response
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
import re
+from __future__ import unicode_literals
+
from django.contrib import admin
from django.utils.translation import ugettext_lazy as _
+from __future__ import unicode_literals
+
from django import forms
from django.forms.util import ErrorList
+from __future__ import unicode_literals
+
from django.db import models
# Create your models here.
+from __future__ import unicode_literals
+
from django.conf.urls import patterns, url
urlpatterns = patterns('',
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
from django.conf import settings
from django.http import HttpResponse
+#!/usr/bin/env python
"""Create a track hub
"""
+from __future__ import print_function, unicode_literals
-#!/usr/bin/env python
from ConfigParser import SafeConfigParser
import fnmatch
from glob import glob
import os
from pprint import pprint, pformat
import shlex
-from StringIO import StringIO
+from six.moves import StringIO
import stat
import sys
import time
import types
-import urllib
-import urllib2
-import urlparse
from zipfile import ZipFile
import RDF
if opts.print_rdf:
writer = get_serializer()
- print writer.serialize_model_to_string(model)
+ print(writer.serialize_model_to_string(model))
def make_parser():
"""
Gather information about our submissions into a single RDF store
"""
+from __future__ import print_function
from datetime import datetime
import hashlib
# redland rdf lib
import RDF
import sys
-import urllib
-import urlparse
+from six.moves import urllib
if not 'DJANGO_SETTINGS_MODULE' in os.environ:
os.environ['DJANGO_SETTINGS_MODULE'] = 'htsworkflow.settings'
if opts.print_rdf:
serializer = get_serializer(name=opts.rdf_parser_name)
- print serializer.serialize_model_to_string(model)
+ print(serializer.serialize_model_to_string(model))
def make_parser():
for row in results:
subid = row['subid']
name = row['name']
- print "# {0}".format(name)
- print "<{0}>".format(subid.uri)
- print " encodeSubmit:library_urn "\
- "<http://jumpgate.caltech.edu/library/> ."
- print ""
+ print("# {0}".format(name))
+ print("<{0}>".format(subid.uri))
+ print(" encodeSubmit:library_urn "\
+ "<http://jumpgate.caltech.edu/library/> .")
+ print("")
def find_submissions_with_no_library(model):
missing_lib_query_text = """
load_library_detail(model, library_urn)
def user_library_id_to_library_urn(library_id):
- split_url = urlparse.urlsplit(library_id)
+ split_url = urllib.parse.urlsplit(library_id)
if len(split_url.scheme) == 0:
return LIBRARY_NS[library_id]
else:
try:
body = get_url_as_text(str(libraryUrn.uri), 'GET')
rdfaParser.parse_string_into_model(model, body, libraryUrn.uri)
- except httplib2.HttpLib2ErrorWithResponse, e:
+ except httplib2.HttpLib2ErrorWithResponse as e:
LOGGER.error(str(e))
elif len(results) == 1:
pass # Assuming that a loaded dataset has one record
response, content = http.request(LOGIN_URL,
'POST',
headers=headers,
- body=urllib.urlencode(credentials))
+ body=urllib.parse.urlencode(credentials))
LOGGER.debug("Login to {0}, status {1}".format(LOGIN_URL,
response['status']))
#!/usr/bin/env python
+from __future__ import print_function, unicode_literals
+
from ConfigParser import SafeConfigParser
import fnmatch
from glob import glob
import os
from pprint import pprint, pformat
import shlex
-from StringIO import StringIO
+from six.moves import StringIO
import stat
import sys
import time
import types
-import urllib
-import urllib2
-import urlparse
from zipfile import ZipFile
import RDF
if opts.print_rdf:
writer = get_serializer()
- print writer.serialize_model_to_string(model)
+ print(writer.serialize_model_to_string(model))
def make_parser():
+from __future__ import print_function, unicode_literals
+
from optparse import OptionParser
import os
import sys
if opts.rdf:
print_rdf(common_extensions)
else:
- print common_extensions
+ print(common_extensions)
def make_parser():
parser = OptionParser("%prog: directory [directory...]")
writer = rdfhelp.get_serializer()
writer.set_namespace('thisSubmissionView', subView._prefix)
- print writer.serialize_model_to_string(model)
+ print(writer.serialize_model_to_string(model))
if __name__ == "__main__":
main()
#!/usr/bin/env python
+from __future__ import absolute_import
+
from datetime import datetime
import os
from unittest import TestCase
import keyring
keyring.set_keyring(MockKeyring())
-import encode_find
+from . import encode_find
from htsworkflow.submission.ucsc import submission_view_url
from htsworkflow.util.rdfhelp import add_default_schemas, \
dump_model, get_model, fromTypedNode
+from __future__ import absolute_import
+
from unittest import TestCase, TestSuite, defaultTestLoader
-import ucsc_gather
+from . import ucsc_gather
class testUCSCGather(TestCase):
pass
#!/usr/bin/env python
-from ConfigParser import SafeConfigParser
+from __future__ import print_function, unicode_literals
+
+from six.moves.configparser import SafeConfigParser
import fnmatch
from glob import glob
import json
import os
from pprint import pprint, pformat
import shlex
-from StringIO import StringIO
+from six.moves import StringIO
import stat
import sys
import time
import types
-import urllib
-import urllib2
-import urlparse
from zipfile import ZipFile
import RDF
if opts.print_rdf:
writer = get_serializer()
- print writer.serialize_model_to_string(model)
+ print(writer.serialize_model_to_string(model))
def make_parser():
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
from itertools import chain
from django.forms import ModelForm
from django.forms.fields import Field, CharField
from django.forms.widgets import TextInput, Select
-from django.utils.encoding import force_unicode
+from django.utils.encoding import force_text
from django.utils.html import escape, conditional_escape
from django.utils.translation import ugettext_lazy as _
def render_options(self, choices, selected_choices):
# Normalize to strings.
- selected_choices = set([force_unicode(v) for v in selected_choices])
+ selected_choices = set([force_text(v) for v in selected_choices])
output = []
for option_value, option_label in chain(self.choices, choices):
if isinstance(option_label, (list, tuple)):
- output.append(u'<optgroup label="%s">' % escape(force_unicode(option_value)))
+ output.append(u'<optgroup label="%s">' % escape(force_text(option_value)))
for option in option_label:
output.append(self.render_option(selected_choices, *option))
output.append(u'</optgroup>')
# nested function in render_options
def render_options(self, choices, selected_choices):
# Normalize to strings.
- selected_choices = set([force_unicode(v) for v in selected_choices])
+ selected_choices = set([force_text(v) for v in selected_choices])
output = []
for option_value, option_label in chain(self.choices, choices):
if isinstance(option_label, (list, tuple)):
- output.append(u'<optgroup label="%s">' % escape(force_unicode(option_value)))
+ output.append(u'<optgroup label="%s">' % escape(force_text(option_value)))
for option in option_label:
output.append(self.render_option(selected_choices, *option))
output.append(u'</optgroup>')
def render_option(self, selected_choices, option_value, option_label):
- disabled_sequencers = [ unicode(s.id) for s in self.queryset.filter(active=False) ]
- option_value = unicode(option_value)
+ disabled_sequencers = [ str(s.id) for s in self.queryset.filter(active=False) ]
+ option_value = str(option_value)
selected_html = (option_value in selected_choices) and u' selected="selected"' or ''
cssclass = "strikeout" if option_value in disabled_sequencers else ''
return u'<option class="%s" value="%s"%s>%s</option>' % (
cssclass, escape(option_value), selected_html,
- conditional_escape(force_unicode(option_label)))
+ conditional_escape(force_text(option_label)))
class SequencerOptions(admin.ModelAdmin):
list_display = ('name', 'active', 'isdefault', 'instrument_name', 'model')
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
# some core functions of the exp tracker module
from datetime import datetime, timedelta
-try:
- import json
-except ImportError, e:
- import simplejson as json
-
import os
import re
from django.views.decorators.csrf import csrf_exempt
from django.core.exceptions import ObjectDoesNotExist
from django.core.mail import send_mail, mail_admins
-from django.http import HttpResponse, Http404
+from django.http import HttpResponse, Http404, JsonResponse
from django.conf import settings
from django.utils import timezone
"""
try:
fc = FlowCell.objects.get(flowcell_id__startswith=flowcell_id)
- except FlowCell.DoesNotExist, e:
+ except FlowCell.DoesNotExist as e:
return None
lane_set = {}
'library_name': lane.library.library_name,
'library_id': lane.library.id,
'library_species': lane.library.library_species.scientific_name,
- 'pM': unicode(lane.pM),
+ 'pM': str(lane.pM),
'read_length': lane.flowcell.read_length,
'status_code': lane.status,
'status': LANE_STATUS_MAP[lane.status]
if fc_dict is None:
raise Http404
- fc_json = json.dumps({'result': fc_dict})
- return HttpResponse(fc_json, content_type = 'application/json')
+ return JsonResponse({'result': fc_dict})
def lanes_for(username=None):
"""
try:
result = lanes_for(username)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
raise Http404
#convert query set to python structure
- result_json = json.dumps({'result': result})
- return HttpResponse(result_json, content_type='application/json')
+ return JsonResponse({'result': result})
def updStatus(request):
user = request.user
#Check access permission
- if not (user.is_superuser and settings.ALLOWED_IPS.has_key(ClIP)):
+ if not (user.is_superuser and ClIP in settings.ALLOWED_IPS):
return HttpResponse("%s access denied from %s." % (user, ClIP))
# ~~~~~~Parameters for the job ~~~~
- if request.REQUEST.has_key('fcid'):
+ if 'fcid' in request.REQUEST:
fcid = request.REQUEST['fcid']
else:
return HttpResponse('missing fcid')
- if request.REQUEST.has_key('runf'):
+ if 'runf' in request.REQUEST:
runfolder = request.REQUEST['runf']
else:
return HttpResponse('missing runf')
- if request.REQUEST.has_key('updst'):
+ if 'updst' in request.REQUEST:
UpdatedStatus = request.REQUEST['updst']
else:
return HttpResponse('missing status')
#if there's a message update that too
mytimestamp = timezone.now().__str__()
mytimestamp = re.sub(pattern=":[^:]*$",repl="",string=mytimestamp)
- if request.REQUEST.has_key('msg'):
+ if 'msg' in request.REQUEST:
rec.run_note += ", "+request.REQUEST['msg']+" ("+mytimestamp+")"
else :
if UpdatedStatus == '1':
def generateConfile(request,fcid):
#granted = False
#ClIP = request.META['REMOTE_ADDR']
- #if (settings.ALLOWED_IPS.has_key(ClIP)): granted = True
+ #if (ClIP in settings.ALLOWED_IPS): granted = True
#if not granted: return HttpResponse("access denied.")
def getConfile(req):
granted = False
ClIP = req.META['REMOTE_ADDR']
- if (settings.ALLOWED_IPS.has_key(ClIP)): granted = True
+ if (ClIP in settings.ALLOWED_IPS): granted = True
if not granted: return HttpResponse("access denied. IP: "+ClIP)
cnfgfile = 'Nothing found'
runfolder = 'unknown'
request = req.REQUEST
- if request.has_key('fcid'):
+ if 'fcid' in request:
fcid = request['fcid']
- if request.has_key('runf'):
+ if 'runf' in request:
runfolder = request['runf']
try:
rec = DataRun.objects.get(run_folder=runfolder) #,flowcell_id=fcid)
def getLaneLibs(req):
granted = False
ClIP = req.META['REMOTE_ADDR']
- if (settings.ALLOWED_IPS.has_key(ClIP)): granted = True
+ if (ClIP in settings.ALLOWED_IPS): granted = True
if not granted: return HttpResponse("access denied.")
request = req.REQUEST
fcid = 'none'
outputfile = ''
- if request.has_key('fcid'):
+ if 'fcid' in request:
fcid = request['fcid']
try:
rec = FlowCell.objects.get(flowcell_id=fcid)
+from __future__ import unicode_literals
+
import datetime
import factory
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
import datetime
import glob
default_pM = 5
try:
default_pM = int(settings.DEFAULT_PM)
-except AttributeError, e:
+except AttributeError as e:
LOGGER.error("invalid value for default_pm")
# how many days to wait before trying to re-import a runfolder
class Meta:
ordering = ["-isdefault", "name"]
- def __unicode__(self):
- return unicode(self.name)
+ def __str__(self):
+ return str(self.name)
@staticmethod
def update_isdefault(sender, instance, **kwargs):
class Meta:
ordering = ["-isdefault", "-active", "name"]
- def __unicode__(self):
- name = [unicode(self.name)]
+ def __str__(self):
+ name = [str(self.name)]
if self.instrument_name is not None:
- name.append("(%s)" % (unicode(self.instrument_name),))
+ name.append("(%s)" % (str(self.instrument_name),))
return " ".join(name)
@models.permalink
notes = models.TextField(blank=True)
- def __unicode__(self):
- return unicode(self.flowcell_id)
+ def __str__(self):
+ return str(self.flowcell_id)
def Lanes(self):
html = ['<table>']
"""Convert our boolean 'is paired' flag to a name
"""
if self.paired_end:
- return u"Paired"
+ return "Paired"
else:
- return u"Single"
+ return "Single"
@models.permalink
def get_absolute_url(self):
return ('experiments.views.flowcell_lane_detail',
[str(self.id)])
- def __unicode__(self):
- return self.flowcell.flowcell_id + ':' + unicode(self.lane_number)
+ def __str__(self):
+ return self.flowcell.flowcell_id + ':' + str(self.lane_number)
class DataRun(models.Model):
return self.name.replace(' ', '_').lower()
normalized_name = property(_get_normalized_name)
- def __unicode__(self):
- #return u"<FileType: %s>" % (self.name,)
+ def __str__(self):
+ #return "<FileType: %s>" % (self.name,)
return self.name
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
from django.test import TestCase
from ..models import ClusterStation, cluster_station_default
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
from django.test import TestCase
file_type_objects = FileType.objects
name = 'QSEQ tarfile'
file_type_object = file_type_objects.get(name=name)
- self.assertEqual(u"QSEQ tarfile",
- unicode(file_type_object))
+ self.assertEqual("QSEQ tarfile",
+ str(file_type_object))
def test_find_file_type(self):
file_type_objects = FileType.objects
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
from django.test import TestCase
from ..models import Sequencer, sequencer_default
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
import re
from lxml.html import fromstring
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
import os
import shutil
import sys
import tempfile
-from urlparse import urljoin
+from six.moves.urllib.parse import urljoin
from django.conf import settings
from django.core import mail
from django.test.utils import setup_test_environment, teardown_test_environment
from django.db import connection
from django.conf import settings
+from django.utils.encoding import smart_text
from .models import ClusterStation, cluster_station_default, \
DataRun, Sequencer, FlowCell, FileType
fc42jtn = self.fc42jtn
fc42ju1 = FlowCellFactory(flowcell_id='42JU1AAXX')
- for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
+ for fc_id in ['FC12150', '42JTNAAXX', '42JU1AAXX']:
fc_dict = flowcell_information(fc_id)
fc_django = FlowCell.objects.get(flowcell_id=fc_id)
self.assertEqual(fc_dict['flowcell_id'], fc_id)
response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
# strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
- fc_json = json.loads(response.content)['result']
+ fc_json = json.loads(smart_text(response.content))['result']
self.assertEqual(fc_json['flowcell_id'], fc_id)
self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
self.assertEqual(fc_json['read_length'], fc_django.read_length)
for lane in fc_django.lane_set.all():
- lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
+ lane_contents = fc_json['lane_set'][str(lane.lane_number)]
lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
"""
Require logging in to retrieve meta data
"""
- response = self.client.get(u'/experiments/config/FC12150/json')
+ response = self.client.get('/experiments/config/FC12150/json')
self.assertEqual(response.status_code, 403)
def test_library_id(self):
"""
response = self.client.get('/experiments/config/FC12150/json', apidata)
self.assertEqual(response.status_code, 200)
- flowcell = json.loads(response.content)['result']
+ flowcell = json.loads(smart_text(response.content))['result']
# library id is 12150 + lane number (1-8), so 12153
lane_contents = flowcell['lane_set']['3']
response = self.client.get('/samples/library/12153/json', apidata)
self.assertEqual(response.status_code, 200)
- library_12153 = json.loads(response.content)['result']
+ library_12153 = json.loads(smart_text(response.content))['result']
self.assertEqual(library_12153['library_id'], '12153')
This tests to make sure that the value entered in the raw library id field matches
the library id looked up.
"""
- expected_ids = [ u'1215{}'.format(i) for i in range(1,9) ]
+ expected_ids = [ '1215{}'.format(i) for i in range(1,9) ]
self.assertTrue(self.client.login(username=self.admin.username, password=self.password))
response = self.client.get('/admin/experiments/flowcell/{}/'.format(self.fc12150.id))
self.assertEqual(len(lanes), 8)
response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
- lanes_json = json.loads(response.content)['result']
+ lanes_json = json.loads(smart_text(response.content))['result']
self.assertEqual(len(lanes), len(lanes_json))
for i in range(len(lanes)):
self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
self.assertEqual(response.status_code, 404)
-
def test_raw_data_dir(self):
"""Raw data path generator check"""
flowcell_id = self.fc1_id
if status is not None: self.assertTrue(status)
ns = urljoin('http://localhost', url)
- load_string_into_model(model, 'rdfa', response.content, ns=ns)
+ load_string_into_model(model, 'rdfa', smart_text(response.content), ns=ns)
body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
count = 0
for r in query.execute(model):
count += 1
- self.assertEqual(fromTypedNode(r['flowcell_id']), u'FC12150')
+ self.assertEqual(fromTypedNode(r['flowcell_id']), 'FC12150')
lane_id = fromTypedNode(r['lane_id'])
library_id = fromTypedNode(r['library_id'])
self.assertTrue(library_id in expected[lane_id])
response = self.client.get(self.url)
self.assertEqual(response.status_code, 200)
- self.assertTrue(self.affiliation.email in response.content)
- self.assertTrue(self.library.library_name in response.content)
+ self.assertTrue(self.affiliation.email in smart_text(response.content))
+ self.assertTrue(self.library.library_name in smart_text(response.content))
response = self.client.get(self.url, {'send':'1','bcc':'on'})
self.assertEqual(response.status_code, 200)
response = self.client.get(self.url)
self.assertEqual(response.status_code, 200)
#print("email navigation content:", response.content)
- self.assertTrue(re.search(self.fc.flowcell_id, response.content))
+ self.assertTrue(re.search(self.fc.flowcell_id, smart_text(response.content)))
# require that navigation back to the admin page exists
- self.assertTrue(re.search('<a href="{}">[^<]+</a>'.format(admin_url), response.content))
+ self.assertTrue(re.search('<a href="{}">[^<]+</a>'.format(admin_url),
+ smart_text(response.content)))
def multi_lane_to_dict(lane):
"""Convert a list of lane entries into a dictionary indexed by library ID
seq.instrument_name = "HWI-SEQ1"
seq.model = "Imaginary 5000"
- self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
+ self.assertEqual(str(seq), "Seq1 (HWI-SEQ1)")
def test_lookup(self):
fc = self.fc12150
status = validate_xhtml(response.content)
if status is not None: self.assertTrue(status)
- load_string_into_model(model, 'rdfa', response.content)
+ load_string_into_model(model, 'rdfa', smart_text(response.content))
errmsgs = list(inference.run_validation())
self.assertEqual(len(errmsgs), 0)
url = '/lane/{}'.format(self.lane.id)
response = self.client.get(url)
+ rdfbody = smart_text(response.content)
self.assertEqual(response.status_code, 200)
- status = validate_xhtml(response.content)
+ status = validate_xhtml(rdfbody)
if status is not None: self.assertTrue(status)
- load_string_into_model(model, 'rdfa', response.content)
+ load_string_into_model(model, 'rdfa', rdfbody)
errmsgs = list(inference.run_validation())
self.assertEqual(len(errmsgs), 0)
+from __future__ import unicode_literals
+
from django.conf.urls import patterns
urlpatterns = patterns('',
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
# Create your views here.
from datetime import datetime
content_type = data_file.file_type.mimetype
if os.path.exists(data_file.pathname):
- return HttpResponse(open(data_file.pathname,'r'),
+ return HttpResponse(open(data_file.pathname,'rb'),
content_type=content_type)
raise Http404
def require_api_key(request):
# make sure we have the api component
- if not (request.REQUEST.has_key('apiid') or request.REQUEST.has_key('apikey')):
+ if not ('apiid' in request.REQUEST or 'apikey' in request.REQUEST):
raise PermissionDenied
# make sure the id and key are right
import sys
import time
import traceback
-import urlparse
+from six.moves import urllib
from benderjab import rpc
return reply
def validate_url(self, url):
- user_url = urlparse.urlsplit(url)
+ user_url = urllib.parse.urlsplit(url)
user_scheme = user_url[0]
user_netloc = user_url[1]
user_path = user_url[2]
for source in self.sources:
- source_url = urlparse.urlsplit(source)
+ source_url = urllib.parse.urlsplit(source)
source_scheme = source_url[0]
source_netloc = source_url[1]
source_path = source_url[2]
# if we've already seen an event in this directory (AKA runfolder)
# keep track if its already hit the "completed" flag
- if watch_path_events.has_key(target):
+ if target in watch_path_events:
run_already_complete = watch_path_events[target].complete
watch_path_events[target] = WatcherEvent(target)
ClIP = request.META['REMOTE_ADDR']
#Check client access permission
granted = False
- if (settings.ALLOWED_ANALYS_IPS.has_key(ClIP)): granted = True
+ if (ClIP in settings.ALLOWED_ANALYS_IPS): granted = True
if not granted: return HttpResponse("access denied.")
output=''
taskid=-1;
# Check required param
- if request.has_key('taskid'): taskid = request['taskid']
+ if 'taskid' in request: taskid = request['taskid']
else: return HttpResponse('missing param task id')
try:
rec = Task.objects.get(id=taskid)
mytimestamp = datetime.now().__str__()
mytimestamp = re.sub(pattern=":[^:]*$",repl="",string=mytimestamp)
- if request.has_key('msg'):
+ if 'msg' in request:
rec.task_status += ", "+request['msg']+" ("+mytimestamp+")"
else :
rec.task_status = "Registered ("+mytimestamp+")"
ClIP = request.META['REMOTE_ADDR']
#Check client access permission
granted = False
- if (settings.ALLOWED_ANALYS_IPS.has_key(ClIP)): granted = True
+ if (ClIP in settings.ALLOWED_ANALYS_IPS): granted = True
if not granted: return HttpResponse("access denied.")
outputfile = ''
All=False
- if (request.has_key('mode')):
+ if ('mode' in request):
if request['mode']=='all':
All=True
def report1(request):
EXP = 'ChIP-seq'
- if request.GET.has_key('aflid'):
+ if 'aflid' in request.GET:
AFL_Id = request.GET['aflid']
try:
AFL = Affiliation.objects.get(id=AFL_Id).name
def report_RM(request): #for RNA-Seq and Methyl-Seq
EXP = 'RNA-seq'
- if request.GET.has_key('exp'):
+ if 'exp' in request.GET:
EXP = request.GET['exp'] # Methyl-seq
- if request.GET.has_key('aflid'):
+ if 'aflid' in request.GET:
AFL_Id = request.GET['aflid']
try:
AFL = Affiliation.objects.get(id=AFL_Id).name
This includes the version number, run date, bustard executable parameters, and
phasing estimates.
"""
+from __future__ import print_function, unicode_literals
+
from copy import copy
from datetime import date
from glob import glob
for b in base_order:
for value in self.base[b]:
crosstalk_value = ElementTree.SubElement(root, CrosstalkMatrix.ELEMENT)
- crosstalk_value.text = unicode(value)
+ crosstalk_value.text = str(value)
crosstalk_value.tail = os.linesep
return root
time = property(_get_time, doc='return run time as seconds since epoch')
def dump(self):
- #print ElementTree.tostring(self.get_elements())
+ #print(ElementTree.tostring(self.get_elements()))
ElementTree.dump(self.get_elements())
def get_elements(self):
# add phasing parameters
for lane in LANE_LIST:
- if self.phasing.has_key(lane):
+ if lane in self.phasing:
params.append(self.phasing[lane].get_elements())
# add crosstalk matrix if it exists
opts, args = parser.parse_args(cmdline)
for bustard_dir in args:
- print u'analyzing bustard directory: ' + unicode(bustard_dir)
+ print(u'analyzing bustard directory: ' + str(bustard_dir))
bustard_object = bustard(bustard_dir)
bustard_object.dump()
bustard_object2 = Bustard(xml=bustard_object.get_elements())
- print ('-------------------------------------')
+ print('-------------------------------------')
bustard_object2.dump()
- print ('=====================================')
+ print('=====================================')
b1_tree = bustard_object.get_elements()
b1 = ElementTree.tostring(b1_tree).split(os.linesep)
b2_tree = bustard_object2.get_elements()
b2 = ElementTree.tostring(b2_tree).split(os.linesep)
for line1, line2 in zip(b1, b2):
if b1 != b2:
- print "b1: ", b1
- print "b2: ", b2
+ print("b1: ", b1)
+ print("b2: ", b2)
if __name__ == "__main__":
main(sys.argv[1:])
try:
saveConfigFile(flowcell, options.url, cfg_filepath)
conf_info.config_filepath = cfg_filepath
- except FlowCellNotFound, e:
+ except FlowCellNotFound as e:
LOGGER.error(e)
return False
- except WebError404, e:
+ except WebError404 as e:
LOGGER.error(e)
return False
- except IOError, e:
+ except IOError as e:
LOGGER.error(e)
return False
- except Exception, e:
+ except Exception as e:
LOGGER.error(e)
return False
if opts.output is not None:
if opts.bzip:
- output = bz2.BZ2File(opts.output,'w')
+ output = bz2.BZ2File(opts.output, 'wt')
elif opts.gzip:
- output = gzip.GzipFile(opts.output, 'w')
+ output = gzip.GzipFile(opts.output, 'wt')
else:
output = open(opts.output, 'w')
else:
"""
Analyze ELAND files
"""
+from __future__ import print_function
+
import collections
from glob import glob
import logging
self._reads = 0
for pathname in self.pathnames:
- stream = autoopen(pathname, 'r')
+ stream = autoopen(pathname, 'rt')
if self.eland_type == ELAND_SINGLE:
result = self._update_eland_result(stream)
elif self.eland_type == ELAND_MULTI or \
def get_elements(self):
lane = ElementTree.Element(ElandLane.LANE,
{'version':
- unicode(ElandLane.XML_VERSION)})
+ str(ElandLane.XML_VERSION)})
sample_tag = ElementTree.SubElement(lane, SAMPLE_NAME)
sample_tag.text = self.sample_name
lane_tag = ElementTree.SubElement(lane, LANE_ID)
for k, v in self.genome_map.items():
item = ElementTree.SubElement(
genome_map, GENOME_ITEM,
- {'name':k, 'value':unicode(v)})
+ {'name':k, 'value':str(v)})
mapped_reads = ElementTree.SubElement(lane, MAPPED_READS)
for k, v in self.mapped_reads.items():
item = ElementTree.SubElement(
mapped_reads, MAPPED_ITEM,
- {'name':k, 'value':unicode(v)})
+ {'name':k, 'value':str(v)})
match_codes = ElementTree.SubElement(lane, MATCH_CODES)
for k, v in self.match_codes.items():
item = ElementTree.SubElement(
match_codes, MATCH_ITEM,
- {'name':k, 'value':unicode(v)})
+ {'name':k, 'value':str(v)})
reads = ElementTree.SubElement(lane, READS)
- reads.text = unicode(self.reads)
+ reads.text = str(self.reads)
return lane
"""
Determine if we have a scarf or fastq sequence file
"""
- f = open(pathname,'r')
+ f = open(pathname,'rt')
l = f.readline()
f.close()
LOGGER.info("summarizing results for %s" % (pathname))
lines = 0
- f = open(pathname)
- for l in f.xreadlines():
+ f = open(pathname, 'rt')
+ for l in f:
lines += 1
f.close()
def get_elements(self):
lane = ElementTree.Element(SequenceLane.LANE,
{'version':
- unicode(SequenceLane.XML_VERSION)})
+ str(SequenceLane.XML_VERSION)})
sample_tag = ElementTree.SubElement(lane, SAMPLE_NAME)
sample_tag.text = self.sample_name
lane_tag = ElementTree.SubElement(lane, LANE_ID)
end_tag = ElementTree.SubElement(lane, END)
end_tag.text = str(self.end)
reads = ElementTree.SubElement(lane, READS)
- reads.text = unicode(self.reads)
+ reads.text = str(self.reads)
sequence_type = ElementTree.SubElement(lane, SequenceLane.SEQUENCE_TYPE)
- sequence_type.text = unicode(SequenceLane.SEQUENCE_DESCRIPTION[self.sequence_type])
+ sequence_type.text = str(SequenceLane.SEQUENCE_DESCRIPTION[self.sequence_type])
return lane
elif tag == END.lower():
self.end = int(element.text)
elif tag == READS.lower():
- self._reads = int(element.text)
+ self._reads = int(float(element.text))
elif tag == SequenceLane.SEQUENCE_TYPE.lower():
self.sequence_type = lookup_sequence_type.get(element.text, None)
else:
del self.result[key]
def __iter__(self):
- keys = self.results.iterkeys()
- for k in sorted(keys):
+ for k in sorted(self.results):
yield k
def __len__(self):
def get_elements(self):
root = ElementTree.Element(ELAND.ELAND,
- {'version': unicode(ELAND.XML_VERSION)})
+ {'version': str(ELAND.XML_VERSION)})
for key in self:
eland_lane = self[key].get_elements()
- eland_lane.attrib[ELAND.END] = unicode(self[key].end-1)
- eland_lane.attrib[ELAND.LANE_ID] = unicode(self[key].lane_id)
- eland_lane.attrib[ELAND.SAMPLE] = unicode(self[key].sample_name)
+ eland_lane.attrib[ELAND.END] = str(self[key].end-1)
+ eland_lane.attrib[ELAND.LANE_ID] = str(self[key].lane_id)
+ eland_lane.attrib[ELAND.SAMPLE] = str(self[key].sample_name)
root.append(eland_lane)
return root
return root
for a in args:
LOGGER.info("Starting scan of %s" % (a,))
e = eland(a)
- print ElementTree.tostring(e.get_elements())
+ print(ElementTree.tostring(e.get_elements()))
return
Firecrest factory function initalized from an xml dump from
the Firecrest object.
"""
+from __future__ import print_function
from datetime import date
from glob import glob
def dump(self):
"""Report debugginf information
"""
- print "Starting cycle:", self.start
- print "Ending cycle:", self.stop
- print "Firecrest version:", self.version
- print "Run date:", self.date
- print "user:", self.user
+ print("Starting cycle:", self.start)
+ print("Ending cycle:", self.stop)
+ print("Firecrest version:", self.version)
+ print("Run date:", self.date)
+ print("user:", self.user)
def get_elements(self):
"""Return XML serialization structure.
if os.path.exists(matrix_pathname):
# this is for firecrest < 1.3.2
f.matrix = open(matrix_pathname, 'r').read()
- elif glob(bustard_pattern) > 0:
+ elif len(glob(bustard_pattern)) > 0:
f.matrix = None
# there are runs here. Bustard should save the matrix.
else:
import logging
-from htsworkflow.util.alphanum import alphanum
+from htsworkflow.util.alphanum import natural_sort_key
LOGGER = logging.getLogger(__name__)
class DuplicateGenome(Exception): pass
# Need valid directory
if not os.path.exists(genome_base_dir):
msg = "Directory does not exist: %s" % (genome_base_dir)
- raise IOError, msg
+ raise IOError(msg)
# Find all subdirectories
filepath_list = glob.glob(os.path.join(genome_base_dir, '*'))
build_dict = d.setdefault(species, {})
if build in build_dict:
msg = "Duplicate genome for %s|%s" % (species, build)
- raise DuplicateGenome, msg
+ raise DuplicateGenome(msg)
build_dict[build] = genome_dir
builds = self.genome_dict[elements[0]]
# sort build names the way humans would
- keys = builds.keys()
- keys.sort(cmp=alphanum)
+ last_key = max(builds, key=natural_sort_key)
# return the path from the 'last' build name
- return builds[keys[-1]]
+ return builds[last_key]
elif len(elements) == 2:
# we have species, and build name
def get(self, key, default=None):
try:
return self[key]
- except KeyError, e:
+ except KeyError as e:
return default
def keys(self):
if __name__ == '__main__':
if len(sys.argv) != 2:
- print 'useage: %s <base_genome_dir>' % (sys.argv[0])
+ print('useage: %s <base_genome_dir>' % (sys.argv[0]))
sys.exit(1)
d = getAvailableGenomes(sys.argv[1])
d2 = constructMapperDict(d)
for k,v in d2.items():
- print '%s: %s' % (k,v)
+ print('%s: %s' % (k,v))
return len(self._contigs)
def __iter__(self):
- return self._contigs.iterkeys()
+ return iter(self._contigs.keys())
def __getitem__(self, name):
return self._contigs[name]
"""Provide access to information stored in the GERALD directory.
"""
+from __future__ import print_function, unicode_literals
+
import collections
from datetime import datetime, date
import logging
"""
Debugging function, report current object
"""
- print 'Software:'. self.__class__.__name__
- print 'Alignment version:', self.version
- print 'Run date:', self.date
- print 'config.xml:', self.tree
+ print('Software:'. self.__class__.__name__)
+ print('Alignment version:', self.version)
+ print('Run date:', self.date)
+ print('config.xml:', self.tree)
self.summary.dump()
def get_elements(self, root_tag):
return None
gerald = ElementTree.Element(root_tag,
- {'version': unicode(Gerald.XML_VERSION)})
+ {'version': str(Gerald.XML_VERSION)})
gerald.append(self.tree)
gerald.append(self.summary.get_elements())
if self.eland_results:
return None
time_element = self.tree.xpath('TIME_STAMP')
if len(time_element) == 1:
- timetuple = time.strptime(
- time_element[0].text.strip(),
- "%a %b %d %H:%M:%S %Y")
- return datetime(*timetuple[:6])
+ timetuple = time.strptime(
+ time_element[0].text.strip(),
+ "%a %b %d %H:%M:%S %Y")
+ return datetime(*timetuple[:6])
return super(CASAVA, self)._get_date()
date = property(_get_date)
lanes = [x.tag.split('_')[1] for x in container.getchildren()]
try:
index = lanes.index(self._lane_id)
- except ValueError, e:
+ except ValueError as e:
return None
element = container[index]
return element.text
def __iter__(self):
if self._lanes is None:
self._initialize_lanes()
- return self._lanes.iterkeys()
+ return iter(self._lanes.keys())
def __getitem__(self, key):
if self._lanes is None:
IPAR factory function initalized from an xml dump from
the IPAR object.
"""
+from __future__ import print_function
+
__docformat__ = "restructuredtext en"
import datetime
else:
return runfolder[0].text
runfolder_name = property(_get_runfolder_name)
-
+
def _get_software(self):
"""Return software name"""
if self.tree is None:
"""
suffix_node = self.tree.find('RunParameters/CompressionSuffix')
if suffix_node is None:
- print "find compression suffix failed"
+ print("find compression suffix failed")
return None
suffix = suffix_node.text
files = []
return files
def dump(self):
- print "Matrix:", self.matrix
- print "Tree:", self.tree
+ print("Matrix:", self.matrix)
+ print("Tree:", self.tree)
def get_elements(self):
attribs = {'version': str(IPAR.XML_VERSION) }
if element.tag == IPAR.RUN:
self.tree = element
elif element.tag == IPAR.TIMESTAMP:
- self.time = int(element.text)
+ self.time = int(element.text)
elif element.tag == IPAR.MATRIX:
self.matrix = element.text
else:
tree = ElementTree.parse(paramfile).getroot()
run = tree.find('Run')
- if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES:
+ if run.attrib.get('Name', None) in SOFTWARE_NAMES:
return run
else:
LOGGER.info("No run found")
if not (groups[0] == 'IPAR' or groups[0] == 'Intensities'):
raise ValueError('ipar can only process IPAR directories')
- bustard_pattern = os.path.join(pathname, 'Bustard*')
# contents of the matrix file?
matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
if os.path.exists(matrix_pathname):
# this is IPAR_1.01
i.matrix = open(matrix_pathname, 'r').read()
- elif glob(bustard_pattern) > 0:
+ else:
i.matrix = None
# its still live.
f = IPAR()
f.set_elements(tree)
return f
-
-#if __name__ == "__main__":
- #i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
- #x = i.get_elements()
- #j = fromxml(x)
- #ElementTree.dump(x)
- #print j.date
- #print j.start
- #print j.stop
- #print i.tiles.keys()
- #print j.tiles.keys()
- #print j.tiles.items()
- #print j.file_list()
#!/usr/bin/env python
"""Convert a collection of qseq or a tar file of qseq files to a fastq file
"""
+from __future__ import print_function, unicode_literals
from glob import glob
import os
from optparse import OptionParser
opts, args = parser.parse_args(cmdline)
if opts.version:
- print version()
+ print(version())
return 0
if opts.infile is not None:
#!/usr/bin/env python
import csv
-from ConfigParser import RawConfigParser
+from six.moves.configparser import RawConfigParser
import logging
from optparse import OptionParser, IndentedHelpFormatter
import os
import sys
import types
-import urllib
-import urllib2
+import six
+from six.moves import urllib
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
from htsworkflow.auth import apidata
from htsworkflow.util import api
-from htsworkflow.util import alphanum
+from htsworkflow.util.alphanum import natural_sort_key
from htsworkflow.util.url import normalize_url
from htsworkflow.pipelines.genome_mapper import \
getAvailableGenomes, \
url = api.flowcell_url(base_host_url, flowcell)
try:
- apipayload = urllib.urlencode(apidata)
- web = urllib2.urlopen(url, apipayload)
- except urllib2.URLError, e:
+ apipayload = urllib.parse.urlencode(apidata)
+ web = urllib.request.urlopen(url, apipayload)
+ except urllib.request.HTTPError as e:
errmsg = 'URLError: %d %s' % (e.code, e.msg)
LOGGER.error(errmsg)
LOGGER.error('opened %s' % (url,))
sequences = library.get('index_sequence', None)
if sequences is None:
return []
- elif (type(sequences) in types.StringTypes and
- sequences.lower().startswith('err')):
+ elif isinstance(sequences, six.string_types):
shared['Index'] = ''
shared['SampleProject'] = library['library_id']
return [shared]
- elif (type(sequences) == types.DictType):
+ elif isinstance(sequences, dict):
pooled = []
- multiplex_ids = sequences.keys()
- multiplex_ids.sort(cmp=alphanum.alphanum)
+ multiplex_ids = sorted(sequences.keys(), key=natural_sort_key)
for multiplex_id in multiplex_ids:
sample = {}
sample.update(shared)
VERSION_RE, USER_RE, \
LANES_PER_FLOWCELL, LANE_LIST
from htsworkflow.pipelines.samplekey import LANE_SAMPLE_KEYS
-from htsworkflow.util.alphanum import alphanum
from htsworkflow.util.ethelp import indent, flatten
from htsworkflow.util.queuecommands import QueueCommands
class PipelineRun(object):
"""Capture "interesting" information about a pipeline run
-
+
:Variables:
- `pathname` location of the root of this runfolder
- `serialization_filename` read only property containing name of run xml file
def __init__(self, pathname=None, flowcell_id=None, xml=None):
"""Initialize a PipelineRun object
-
+
:Parameters:
- `pathname` the root directory of this run folder.
- `flowcell_id` the flowcell ID in case it can't be determined
- `xml` Allows initializing an object from a serialized xml file.
-
+
:Types:
- `pathname` str
- `flowcell_id` str
def _get_flowcell_id(self):
"""Return the flowcell ID
-
+
Attempts to find the flowcell ID through several mechanisms.
"""
# extract flowcell ID
def _get_flowcell_id_from_flowcellid(self):
"""Extract flowcell id from a Config/FlowcellId.xml file
-
+
:return: flowcell_id or None if not found
"""
config_dir = os.path.join(self.pathname, 'Config')
def _get_run_dirname(self):
"""Return name of directory to hold result files from one analysis
-
+
For pre-multiplexing runs this is just the cycle range C1-123
- For post-multiplexing runs the "suffix" that we add to
+ For post-multiplexing runs the "suffix" that we add to
differentiate runs will be added to the range.
E.g. Unaligned_6mm may produce C1-200_6mm
"""
def get_elements(self):
"""make one master xml file from all of our sub-components.
-
+
:return: an ElementTree containing all available pipeline
run xml compoents.
"""
def _get_serialization_filename(self):
"""Compute the filename for the run xml file
-
- Attempts to find the latest date from all of the run
+
+ Attempts to find the latest date from all of the run
components.
-
+
:return: filename run_{flowcell id}_{timestamp}.xml
:rtype: str
"""
if self._name is None:
components = [self.image_analysis, self.bustard, self.gerald]
- tmax = max([ c.time for c in components if c ])
+ tmax = max([ c.time for c in components if c and c.time ])
timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
return self._name
def save(self, destdir=None):
"""Save a run xml file.
-
+
:param destdir: Directory name to save too, uses current directory
if not specified.
:type destdir: str
def load(self, filename):
"""Load a run xml into this object.
-
+
:Parameters:
- `filename` location of a run xml file
-
+
:Types:
- `filename` str
"""
def get_runs(runfolder, flowcell_id=None):
"""Find all runs associated with a runfolder.
-
+
We end up with multiple analysis runs as we sometimes
need to try with different parameters. This attempts
to return a list of all the various runs.
p.bustard = b
p.gerald = g
runs.append(p)
- except IOError, e:
+ except IOError as e:
LOGGER.error("Ignoring " + str(e))
return len(runs) - start
p.gerald = gerald.gerald(aligned)
runs.append(p)
except (IOError, RuntimeError) as e:
- LOGGER.error("Exception %s", str(e))
+ LOGGER.error("Exception %s", str(e))
LOGGER.error("Skipping run in %s", flowcell_id)
return len(runs) - start
for run in runs:
# print a run name?
report.append('Summary for %s' % (run.serialization_filename,))
- # sort the report
- if run.gerald:
+ # sort the report
+ if run.gerald:
eland_keys = sorted(run.gerald.eland_results.keys())
else:
report.append("Alignment not done, no report possible")
clean_process = subprocess.Popen(['make', 'clean_intermediate'],
cwd=run.image_analysis.pathname,)
clean_process.wait()
-
-
-
import types
import re
import sys
-from urlparse import urljoin, urlparse
+import six
+from six.moves.urllib.parse import urljoin, urlparse
import RDF
from htsworkflow.util.rdfhelp import libraryOntology as libNS
def key(self):
return (self.flowcell, self.lane, self.read, self.project, self.split)
- def __unicode__(self):
- return unicode(self.path)
+ def __str__(self):
+ return str(self.path)
def __eq__(self, other):
"""
def add(model, s, p, o):
model.add_statement(RDF.Statement(s,p,o))
# a bit unreliable... assumes filesystem is encoded in utf-8
- path = os.path.abspath(self.path.encode('utf-8'))
+ path = os.path.abspath(self.path)
fileNode = RDF.Node(RDF.Uri('file://' + path))
add(model, fileNode, rdfNS['type'], libNS['IlluminaResult'])
add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
Scan through a list of directories for sequence like files
"""
sequences = []
- if type(dirs) in types.StringTypes:
+ if isinstance(dirs, six.string_types):
raise ValueError("You probably want a list or set, not a string")
for d in dirs:
qseq_patterns = []
# grab a lane from the dictionary
# I don't think it matters which one.
- k = lanes.keys()[0]
+ k = next(iter(lanes.keys()))
# build the list of patterns
for read in lanes[k]:
read = int(read)
seq_cmds = make_srf_commands(opts.name, source, opts.lanes, opts.site_name, opts.destination, 0)
else:
raise ValueError('Unknown --format=%s' % (opts.format))
- print seq_cmds
srf.run_commands(args.source, seq_cmds, num_jobs)
def make_parser():
#!/usr/bin/env python
+from __future__ import print_function, unicode_literals
+
import logging
import mmap
from optparse import OptionParser
logging.basicConfig(level=logging.WARN)
if opts.version:
- print version()
+ print(version())
return 0
if len(args) != 1:
# sequence
elif state == FASTQ_SEQUENCE:
if mid is None:
- mid = len(line)/2
+ mid = len(line) // 2
write_split_sequence(target1, target2, line, mid)
state = FASTQ_SEQUENCE_HEADER
# quality header
"""
Check filename to see if it is likely to be a SRF file
"""
- f = open(filename, 'r')
+ f = open(filename, 'rb')
header = f.read(4)
f.close()
- return header == "SSRF"
+ return header == b"SSRF"
def is_cnf1(filename):
"""
Brute force detection if a SRF file is using CNF1/CNF4 records
"""
max_header = 1024 ** 2
- PROGRAM_ID = 'PROGRAM_ID\000'
- cnf4_apps = set(("solexa2srf v1.4",
- "illumina2srf v1.11.5.Illumina.1.3"))
+ PROGRAM_ID = b'PROGRAM_ID\000'
+ cnf4_apps = set((b"solexa2srf v1.4",
+ b"illumina2srf v1.11.5.Illumina.1.3"))
if not is_srf(filename):
raise ValueError("%s must be a srf file" % (filename,))
# alas the max search length requires python 2.6+
program_id_location = f.find(PROGRAM_ID, 0) #, max_header)
program_header_start = program_id_location+len(PROGRAM_ID)
- next_null = f.find('\000', program_header_start) #, max_header)
+ next_null = f.find(b'\000', program_header_start) #, max_header)
program_id_header = f[program_header_start:next_null]
f.close()
os.close(fd)
target2_name = base + '_r2.fastq'
for target_name in [target1_name, target2_name]:
- print 'target name', target_name
+ print('target name', target_name)
if os.path.exists(target_name):
raise RuntimeError("%s exists" % (target_name,))
"""
Analyze the Summary.htm file produced by GERALD
"""
+from __future__ import print_function, unicode_literals
+
import os
import logging
import re
-import types
from pprint import pprint
from lxml import html
def get_elements(self):
summary = etree.Element(Summary.SUMMARY,
- {'version': unicode(Summary.XML_VERSION)})
+ {'version': str(Summary.XML_VERSION)})
for end in self.lane_results:
for lane in end.values():
summary.append(lane.get_elements())
Debugging function, report current object
"""
tree = self.get_elements()
- print etree.tostring(tree)
+ print(etree.tostring(tree))
class SummaryGA(Summary):
def __init__(self, filename=None, xml=None):
('Lane Results Summary : Read 1', 0),
('Lane Results Summary : Read 2', 1),]
for name, end in table_names:
- if tables.has_key(name):
+ if name in tables:
self._extract_lane_results_for_end(tables, name, end)
if len(self.lane_results[0]) == 0:
def get_elements(self):
lane_result = etree.Element(
LaneResultSummary.LANE_RESULT_SUMMARY,
- {'lane': unicode(self.lane), 'end': unicode(self.end)})
+ {'lane': str(self.lane), 'end': str(self.end)})
for tag, variable_name in LaneResultSummary.TAGS.items():
value = getattr(self, variable_name)
if value is None:
continue
- # it looks like a sequence
- elif type(value) in (types.TupleType, types.ListType):
+ elif isinstance(value, (tuple, list)):
element = make_mean_range_element(
lane_result,
tag,
)
else:
element = etree.SubElement(lane_result, tag)
- element.text = unicode(value)
+ element.text = str(value)
return lane_result
def set_elements(self, tree):
variable_name = tags[element.tag]
setattr(self, variable_name,
parse_summary_element(element))
- except KeyError, e:
+ except KeyError as e:
LOGGER.warn('Unrecognized tag %s' % (element.tag,))
"""
try:
v = int(v)
- except ValueError, e:
+ except ValueError as e:
v = float(v)
return v
Make an etree subelement <Name mean='mean', deviation='deviation'/>
"""
element = etree.SubElement(parent, name,
- { 'mean': unicode(mean),
- 'deviation': unicode(deviation)})
+ { 'mean': str(mean),
+ 'deviation': str(deviation)})
return element
def parse_mean_range_element(element):
"""
Create simulated solexa/illumina runfolders for testing
"""
+from __future__ import print_function, unicode_literals
+
import gzip
import os
import shutil
for read in UNALIGNED_READS:
suffix = 'R{0}_{1}_export.txt.gz'.format(read, split)
pathname = paths.make_test_filename(suffix)
- stream = gzip.open(pathname, 'w')
+ stream = gzip.open(pathname, 'wt')
stream.write(body)
stream.close()
for read in reads:
suffix = 'R{0}_{1}.fastq.gz'.format(read, split)
pathname = paths.make_test_filename(suffix)
- stream = gzip.open(pathname, 'w')
+ stream = gzip.open(pathname, 'wt')
stream.write(sample_seq)
stream.close()
return pathname
def dump(self):
- print ('index seq: {0}'.format(self.index_seq))
+ print('index seq: {0}'.format(self.index_seq))
- print ('project dir: {0}'.format(self.project_dir))
- print ('sample dir: {0}'.format(self.sample_dir))
- print ('rootname: {0}'.format(self.rootname))
- print ('path: {0}'.format(
+ print('project dir: {0}'.format(self.project_dir))
+ print('sample dir: {0}'.format(self.sample_dir))
+ print('rootname: {0}'.format(self.rootname))
+ print('path: {0}'.format(
os.path.join(self.project_dir,
self.sample_dir,
self.rootname+'R1_001.fastq.gz')))
"""
for dirpath, dirnames, filenames in os.walk(root):
for filename in filenames:
- print os.path.join(dirpath, filename)
+ print(os.path.join(dirpath, filename))
class BaseCallInfo(object):
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import os
#!/usr/bin/env python
"""More direct synthetic test cases for the eland output file processing
"""
-from StringIO import StringIO
+from six.moves import StringIO
from unittest import TestCase
from htsworkflow.pipelines.eland import ELAND, ElandLane, ElandMatches, \
'R0':0, 'R1':0, 'R2':0,
}
self.assertEqual(mc.keys(), match_codes.keys())
- self.assertEqual(mc.items(), match_codes.items())
- self.assertEqual(mc.values(), match_codes.values())
+ self.assertEqual(list(mc.items()), list(match_codes.items()))
+ self.assertEqual(list(mc.values()), list(match_codes.values()))
self.assertRaises(KeyError, mc.__getitem__, 'foo')
def test_addition(self):
mr1['chr9'] = 7
self.assertEqual(list(mr1.keys()), ['chr9'])
self.assertEqual(mr1['chr9'], 7)
- self.assertEqual(mr1.items(), [('chr9', 7)])
+ self.assertEqual(list(mr1.items()), [('chr9', 7)])
del mr1['chr9']
self.assertEqual(len(mr1), 0)
e.results[sl3] = 'Lane3'
e.results[sl1] = 'Lane1'
- e_list = e.values()
+ e_list = list(e.values())
self.assertEqual(e_list[0], 'Lane1')
self.assertEqual(e_list[1], 'Lane3')
self.assertEqual(e_list[2], 'Lane5')
em.add('s_1_sequence.txt')
self.assertEqual(len(em), 1)
self.assertEqual(len(em[key]), 1)
- filename = iter(em[key]).next().filename
+ filename = next(iter(em[key])).filename
self.assertEqual(filename, 's_1_sequence.txt')
- self.assertEqual(em.keys(), [key])
+ self.assertEqual(list(em.keys()), [key])
em.add('s_1_eland_result.txt')
self.assertEqual(len(em), 1)
self.assertEqual(len(em[key]), 1)
- filename = iter(em[key]).next().filename
+ filename = next(iter(em[key])).filename
self.assertEqual(filename, 's_1_eland_result.txt')
- self.assertEqual(em.keys(), [key])
+ self.assertEqual(list(em.keys()), [key])
def test_parts(self):
key11111 = SampleKey(1, 1, '11111')
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import logging
from htsworkflow.pipelines import runfolder
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
logging.basicConfig(level=logging.ERROR)
from unittest import TestCase
-from StringIO import StringIO
+from six.moves import StringIO
from htsworkflow.pipelines import genome_mapper
class testGenomeMapper(TestCase):
"""More direct synthetic test cases for the eland output file processing
"""
import os
-from StringIO import StringIO
+from six.moves import StringIO
import shutil
import tempfile
from unittest import TestCase
import csv
import os
import re
-from StringIO import StringIO
+from six.moves import StringIO
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
from django.test import TestCase
+from django.utils.encoding import smart_text
from samples.samples_factory import LibraryFactory, LibraryTypeFactory, \
MultiplexIndexFactory
flowcell_request = self.client.get('/experiments/config/FC12150/json',
apidata)
self.failUnlessEqual(flowcell_request.status_code, 200)
- flowcell_info = json.loads(flowcell_request.content)['result']
+ flowcell_info = json.loads(smart_text(flowcell_request.content))['result']
options = getCombinedOptions(['-f','FC12150','-g',os.getcwd()])
genome_map = {library.library_species.scientific_name: '/tmp/build' }
url = '/experiments/config/%s/json' % (fcid,)
flowcell_request = self.client.get(url, apidata)
self.failUnlessEqual(flowcell_request.status_code, 200)
- flowcell_info = json.loads(flowcell_request.content)['result']
+ flowcell_info = json.loads(smart_text(flowcell_request.content))['result']
options = getCombinedOptions(['-f',fcid,'-g',os.getcwd(),])
output.seek(0)
sheet = list(csv.DictReader(output))
- name1 = library1.id + '_index' + library1.index_sequences().keys()[0]
- name2 = library2.id + '_index' + library2.index_sequences().keys()[0]
+ name1 = library1.id + '_index' + next(iter(library1.index_sequences()))
+ name2 = library2.id + '_index' + next(iter(library2.index_sequences()))
expected = [{'SampleProject': name1,
- 'Index': library1.index_sequences().values()[0],
+ 'Index': next(iter(library1.index_sequences().values())),
'Lane': '1',
},
{'SampleProject': name2,
- 'Index': library2.index_sequences().values()[0],
+ 'Index': next(iter(library2.index_sequences().values())),
'Lane': '1',
},
{'SampleProject': name1,
- 'Index': library1.index_sequences().values()[0],
+ 'Index': next(iter(library1.index_sequences().values())),
'Lane': '2',
},
{'SampleProject': name2,
- 'Index': library2.index_sequences().values()[0],
+ 'Index': next(iter(library2.index_sequences().values())),
'Lane': '2',
},
]
+from __future__ import absolute_import
+
import os
from unittest import TestCase
-from StringIO import StringIO
+from six.moves import StringIO
-from simulate_runfolder import TESTDATA_DIR
+from .simulate_runfolder import TESTDATA_DIR
from htsworkflow.pipelines.runfolder import load_pipeline_run_xml
from htsworkflow.pipelines.eland import SampleKey
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import os
from htsworkflow.pipelines import runfolder
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
def make_summary_htm(gerald_dir):
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import os
from htsworkflow.pipelines import runfolder
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
def make_runfolder(obj=None):
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import glob
from htsworkflow.pipelines import srf
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
FCID = 'AA01AAABXX'
RUN_NAME = '110420_SN787_0069_%s' %( FCID,)
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import os
from htsworkflow.pipelines import runfolder
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
def make_runfolder(obj=None):
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import os
from htsworkflow.pipelines import runfolder
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
def make_runfolder(obj=None):
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import os
from htsworkflow.pipelines import runfolder
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
def make_runfolder(obj=None):
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import os
from htsworkflow.pipelines.samplekey import SampleKey
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
def make_runfolder(obj=None):
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import os
from htsworkflow.pipelines.samplekey import SampleKey
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
def make_runfolder(obj=None):
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import os
from htsworkflow.pipelines.samplekey import SampleKey
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
def make_runfolder(obj=None):
#!/usr/bin/env python
+from __future__ import absolute_import
from datetime import datetime, date
import logging
from htsworkflow.pipelines import runfolder
from htsworkflow.pipelines import ElementTree
-from htsworkflow.pipelines.test.simulate_runfolder import *
+from .simulate_runfolder import *
def make_runfolder(obj=None):
#!/usr/bin/env python
"""More direct synthetic test cases for the eland output file processing
"""
-from StringIO import StringIO
+from six.moves import StringIO
from unittest import TestCase
from htsworkflow.pipelines.samplekey import SampleKey
self.assertEqual(f0.filetype, 'srf')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<srf 42BW9AAXX 4 %s>" % (pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '4')
self.assertEqual(f0.filetype, 'qseq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<qseq 42BW9AAXX 4 %s>" %(pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '4')
self.assertEqual(f0.filetype, 'qseq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<qseq ilmn200901 1 %s>" %(pathname,))
self.assertEqual(f0.lane, '1')
self.assertEqual(f0.read, 1)
self.assertEqual(f0.filetype, 'fastq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<fastq 42BW9AAXX 4 %s>" % (pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '4')
self.assertEqual(f0.filetype, 'fastq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<fastq 42BW9AAXX 4 %s>" %(pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '4')
self.assertEqual(f0.filetype, 'split_fastq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<split_fastq 42BW9AAXX 1 %s>" %(pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '1')
self.assertEqual(f0.filetype, 'split_fastq')
self.assertEqual(f0.path, pathname)
- self.assertEqual(unicode(f0), unicode(pathname))
+ self.assertEqual(str(f0), str(pathname))
self.assertEqual(repr(f0), "<split_fastq 42BW9AAXX 1 %s>" % (pathname,))
self.assertEqual(f0.flowcell, '42BW9AAXX')
self.assertEqual(f0.lane, '1')
#!/usr/bin/env python
+from __future__ import absolute_import
+
import os
-from StringIO import StringIO
+from six.moves import StringIO
from unittest import TestCase
from htsworkflow.pipelines import summary
-from simulate_runfolder import TESTDATA_DIR
+from .simulate_runfolder import TESTDATA_DIR
class SummaryTests(TestCase):
"""Test elements of the summary file parser
'handlers': ['console'],
'level': 'WARNING',
}
+ },
+}
+DATABASES = {
+ 'default': {
+ 'ENGINE': 'django.db.backends.postgresql_psycopg2',
+ 'USER': 'diane',
+ 'NAME': 'htsworkflow',
}
}
from pprint import pformat,pprint
import sys
import types
-from urlparse import urljoin, urlparse
+import six
+from six.moves.urllib.parse import urljoin, urlparse
from htsworkflow.pipelines.sequences import scan_for_sequences, \
update_model_sequence_library
from django.conf import settings
from django.template import Context, loader
+from django.utils.encoding import smart_str
import RDF
def import_libraries(self, result_map):
for lib_id in result_map.keys():
- lib_id_encoded = lib_id.encode('utf-8')
- liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
+ liburl = urljoin(self.host, 'library/%s/' % (lib_id,))
library = RDF.Node(RDF.Uri(liburl))
self.import_library(library)
"""Add link between target pathname and the 'lane' that produced it
(note lane objects are now post demultiplexing.)
"""
- target_uri = 'file://' + target.encode('utf-8')
+ target_uri = 'file://' + smart_str(target)
target_node = RDF.Node(RDF.Uri(target_uri))
source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
self.model.add_statement(source_stmt)
self.cycle = fromTypedNode(result['cycle'])
self.lane_number = fromTypedNode(result['lane_number'])
self.read = fromTypedNode(result['read'])
- if type(self.read) in types.StringTypes:
+ if isinstance(self.read, six.string_types):
self.read = 1
self.library = result['library']
self.library_id = fromTypedNode(result['library_id'])
"""Parse UCSC DAF File
"""
+import collections
import logging
import os
from pprint import pformat
import re
import string
-from StringIO import StringIO
+from six.moves import StringIO
import types
-import urlparse
+from six.moves import urllib
import RDF
from htsworkflow.util.rdfhelp import \
returns length of string if it can't find anything
"""
- for i in xrange(start, len(line)):
- if line[i] not in string.whitespace:
- return i
+ for i, c in enumerate(line[start:]):
+ if c not in string.whitespace:
+ return i+start
return len(line)
returns length of string if nothing matches
"""
- for i in xrange(start, len(line)):
- if line[i] in string.whitespace:
- return i
+ for i, c in enumerate(line[start:]):
+ if c in string.whitespace:
+ return i+start
return len(line)
def get_view_namespace(submission_uri):
submission_uri = submission_uri_to_string(submission_uri)
- view_uri = urlparse.urljoin(submission_uri, 'view/')
+ view_uri = urllib.parse.urljoin(submission_uri, 'view/')
viewNS = RDF.NS(view_uri)
return viewNS
else:
self.model = get_model()
- if hasattr(daf_file, 'next'):
+ if isinstance(daf_file, collections.Iterable):
# its some kind of stream
self.daf = daf_file.read()
else:
# file
- stream = open(daf_file, 'r')
+ stream = open(daf_file, 'rt')
self.daf = stream.read()
stream.close()
LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
try:
self.import_submission_dir(result_dir, lib_id)
- except MetadataLookupException, e:
+ except MetadataLookupException as e:
LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
def import_submission_dir(self, submission_dir, library_id):
LOGGER.debug("Found: %s" % (literal_re,))
try:
filename_re = re.compile(literal_re)
- except re.error, e:
+ except re.error as e:
LOGGER.error("Unable to compile: %s" % (literal_re,))
patterns[literal_re] = view_name
return patterns
import jsonschema
import os
import requests
-import types
-from urlparse import urljoin, urlparse, urlunparse
+import six
+from six.moves.urllib.parse import urljoin, urlparse, urlunparse
LOGGER = logging.getLogger(__name__)
and I needed a way to properly compute a the correct base URL.
'''
# pretend strings aren't iterable
- if type(obj) in types.StringTypes:
+ if isinstance(obj, six.string_types):
return
# recurse on container types
obj_type = obj.get('@type')
if not obj_type:
raise ValueError('None type')
- if type(obj_type) in types.StringTypes:
+ if isinstance(obj_type, six.string_types):
raise ValueError('@type should be a list, not a string')
if not isinstance(obj_type, collections.Sequence):
raise ValueError('@type is not a sequence')
url = urlunparse(url.values())
return url
- def search_jsonld(self, term, **kwargs):
+ def search_jsonld(self, **kwargs):
'''Send search request to ENCODED
+
+ to do a general search do
+ searchTerm=term
'''
url = self.prepare_url('/search/')
- result = self.get_json(url, searchTerm=term, **kwargs)
+ result = self.get_json(url, **kwargs)
self.convert_search_to_jsonld(result)
return result
LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
try:
self.import_analysis_dir(result_dir, lib_id)
- except MetadataLookupException, e:
+ except MetadataLookupException as e:
LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
def import_analysis_dir(self, analysis_dir, library_id):
LOGGER.debug("Importing %s" % (lane.uri,))
try:
parser.parse_into_model(self.model, lane.uri)
- except RDF.RedlandError, e:
+ except RDF.RedlandError as e:
LOGGER.error("Error accessing %s" % (lane.uri,))
raise e
LOGGER.debug("Found: %s" % (literal_re,))
try:
filename_re = re.compile(literal_re)
- except re.error, e:
+ except re.error as e:
LOGGER.error("Unable to compile: %s" % (literal_re,))
patterns[literal_re] = view_name
return patterns
import copy
import os
+import re
from pprint import pprint
import shutil
import tempfile
split_test = dict((( x['target'], x) for x in
[{'sources': [u'11154_NoIndex_L003_R1_001.fastq.gz',
u'11154_NoIndex_L003_R1_002.fastq.gz'],
- 'pyscript': 'desplit_fastq.pyc',
+ 'pyscript': 'desplit_fastq.pyc?$',
'target': u'11154_C02F9ACXX_c202_l3_r1.fastq'},
{'sources': [u'11154_NoIndex_L003_R2_001.fastq.gz',
u'11154_NoIndex_L003_R2_002.fastq.gz'],
- 'pyscript': 'desplit_fastq.pyc',
+ 'pyscript': 'desplit_fastq.pyc?$',
'target': u'11154_C02F9ACXX_c202_l3_r2.fastq'},
{'sources': [u'12345_CGATGT_L003_R1_001.fastq.gz',
u'12345_CGATGT_L003_R1_002.fastq.gz',
u'12345_CGATGT_L003_R1_003.fastq.gz',
],
- 'pyscript': 'desplit_fastq.pyc',
+ 'pyscript': 'desplit_fastq.pyc?$',
'target': u'12345_C02F9ACXX_c202_l3_r1.fastq'},
{'sources': [u'12345_CGATGT_L003_R2_001.fastq.gz',
u'12345_CGATGT_L003_R2_002.fastq.gz',
u'12345_CGATGT_L003_R2_003.fastq.gz',
],
- 'pyscript': 'desplit_fastq.pyc',
+ 'pyscript': 'desplit_fastq.pyc?$',
'target': u'12345_C02F9ACXX_c202_l3_r2.fastq'}
]
))
for arg in split:
_, target = os.path.split(arg['target'])
pyscript = split_test[target]['pyscript']
- self.assertTrue(arg['pyscript'].endswith(pyscript))
+ self.assertTrue(re.search(pyscript, arg['pyscript']))
filename = split_test[target]['target']
self.assertTrue(arg['target'].endswith(filename))
for s_index in range(len(arg['sources'])):
from contextlib import contextmanager
import logging
import os
-from StringIO import StringIO
+from six.moves import StringIO
import shutil
import tempfile
from unittest import TestCase, TestSuite, defaultTestLoader
mapper = daf.UCSCSubmission(name, daf_file = test_daf_stream, model=model)
return mapper
-def dump_model(model):
- writer = get_serializer()
- turtle = writer.serialize_model_to_string(model)
- print turtle
-
class TestUCSCSubmission(TestCase):
def setUp(self):
view = daf_mapper.find_view('filename_r1.fastq')
- # dump_model(daf_mapper.model)
view_root = 'http://jumpgate.caltech.edu/wiki/SubmissionsLog/{0}/view/'
view_root = view_root.format(name)
self.failUnlessEqual(str(view.uri),
libNode,
filename)
- #dump_model(daf_mapper.model)
-
sub_root = "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/"
submission_name = sub_root + analysis_name
source = daf_mapper.model.get_source(rdfNS['type'], submissionOntology['submission'])
#!/usr/bin/env python
+from __future__ import absolute_import
from pprint import pprint
import shutil
from unittest import TestCase, defaultTestLoader
from htsworkflow.submission.results import ResultMap
-from submission_test_common import *
+from .submission_test_common import *
class TestResultMap(TestCase):
def setUp(self):
results['2000'] = 'dir2000'
results['1500'] = 'dir1500'
- self.failUnlessEqual(results.keys(), ['1000', '2000', '1500'])
+ self.failUnlessEqual(list(results.keys()), ['1000', '2000', '1500'])
self.failUnlessEqual(list(results.values()),
['dir1000', 'dir2000', 'dir1500'])
self.failUnlessEqual(list(results.items()),
import os
-from StringIO import StringIO
+from six.moves import StringIO
import shutil
import tempfile
from unittest import TestCase, TestSuite, defaultTestLoader
get_serializer
from htsworkflow.submission.submission import list_submissions, Submission
from htsworkflow.submission.results import ResultMap
-from submission_test_common import *
+from .submission_test_common import *
import RDF
#import logging
from unittest import TestCase, TestSuite, defaultTestLoader
-from StringIO import StringIO
+from six.moves import StringIO
from htsworkflow.submission import ucsc
"""Utilities for extracting information from the ENCODE DCC
"""
import logging
-import urlparse
-import urllib2
+from six.moves import urllib
LOGGER = logging.getLogger(__name__)
'http://encodesubmit.ucsc.edu/pipeline/download_ddf/1234'
"""
fragment = 'download_ddf/%s' % (submission_id,)
- return urlparse.urljoin(UCSCEncodePipeline, fragment)
+ return urllib.parse.urljoin(UCSCEncodePipeline, fragment)
def daf_download_url(submission_id):
'http://encodesubmit.ucsc.edu/pipeline/download_daf/1234'
"""
fragment = 'download_daf/%s' % (submission_id,)
- return urlparse.urljoin(UCSCEncodePipeline, fragment)
+ return urllib.parse.urljoin(UCSCEncodePipeline, fragment)
def submission_view_url(submission_id):
'http://encodesubmit.ucsc.edu/pipeline/show/1234'
"""
fragment = 'show/%s' % (submission_id,)
- return urlparse.urljoin(UCSCEncodePipeline, fragment)
+ return urllib.parse.urljoin(UCSCEncodePipeline, fragment)
def get_encodedcc_file_index(genome, composite):
request_url = base_url + 'files.txt'
try:
- request = urllib2.urlopen(request_url)
+ request = urllib.request.urlopen(request_url)
file_index = parse_ucsc_file_index(request, base_url)
return file_index
- except urllib2.HTTPError, e:
+ except urllib.request.HTTPError as e:
err = e
pass
-#
-# The Alphanum Algorithm is an improved sorting algorithm for strings
-# containing numbers. Instead of sorting numbers in ASCII order like
-# a standard sort, this algorithm sorts numbers in numeric order.
-#
-# The Alphanum Algorithm is discussed at http://www.DaveKoelle.com
-#
-#* Python implementation provided by Chris Hulan (chris.hulan@gmail.com)
-#* Distributed under same license as original
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
+# from http://stackoverflow.com/questions/4836710/does-python-have-a-built-in-function-for-string-natural-sort
+# modified by Diane Trout
import re
-import types
-#
-# TODO: Make decimal points be considered in the same class as digits
-#
-
-def chunkify(str):
- """
- return a list of numbers and non-numeric substrings of +str+
- the numeric substrings are converted to integer, non-numeric are left as is
- """
- if type(str) in types.StringTypes:
- chunks = re.findall("(\d+|\D+)",str)
- #convert numeric strings to numbers
- chunks = [re.match('\d',x) and int(x) or x for x in chunks]
- return chunks
- elif type(str) in [types.IntType, types.LongType, types.FloatType]:
- return [str]
+def natural_sort_key(s, _nsre=re.compile('([0-9]+)')):
+ if isinstance(s, type("")) or isinstance(s, type(u"")):
+ return [int(text) if text.isdigit() else text.lower()
+ for text in re.split(_nsre, s)]
+ elif isinstance(s, int):
+ return [s]
else:
- raise ValueError("Unsupported type %s for input %s" % (type(str), str))
-
-def alphanum(a,b):
- """
- breaks +a+ and +b+ into pieces and returns left-to-right comparison of the pieces
-
- +a+ and +b+ are expected to be strings (for example file names) with numbers and non-numeric characters
- Split the values into list of numbers and non numeric sub-strings and so comparison of numbers gives
- Numeric sorting, comparison of non-numeric gives Lexicographic order
- """
- # split strings into chunks
- aChunks = chunkify(a)
- bChunks = chunkify(b)
-
- return cmp(aChunks,bChunks) #built in comparison works once data is prepared
+ raise ValueError("Unsupported type %s for input %s" % (type(s), s))
"""Common functions for accessing the HTS Workflow REST API
"""
+from __future__ import unicode_literals
+
import base64
-from ConfigParser import SafeConfigParser
+from six.moves import configparser
+from six import int2byte
import random
import logging
import os
from optparse import OptionGroup
-import urllib
-import urllib2
-import urlparse
+from six.moves import urllib
LOGGER = logging.getLogger(__name__)
"""Add options OptParser configure authentication options
"""
# Load defaults from the config files
- config = SafeConfigParser()
+ config = configparser.SafeConfigParser()
config.read([os.path.expanduser('~/.htsworkflow.ini'),
'/etc/htsworkflow.ini'
])
"""
url_fragment = '/samples/library/%s/json' % (library_id,)
- url = urlparse.urljoin(root_url, url_fragment)
+ url = urllib.parse.urljoin(root_url, url_fragment)
return url
http://localhost/experiments/config/1234AAXX/json
"""
url_fragment = '/experiments/config/%s/json' % (flowcell_id,)
- url = urlparse.urljoin(root_url, url_fragment)
+ url = urllib.parse.urljoin(root_url, url_fragment)
return url
"""
url_fragment = '/lanes_for/%s/json' % (username,)
- url = urlparse.urljoin(root_url, url_fragment)
+ url = urllib.parse.urljoin(root_url, url_fragment)
return url
Return a dictionary from the HTSworkflow API
"""
try:
- apipayload = urllib.urlencode(apidata)
- web = urllib2.urlopen(url, apipayload)
- except urllib2.URLError, e:
+ apipayload = urllib.parse.urlencode(apidata)
+ web = urllib.request.urlopen(url, apipayload)
+ except urllib.request.URLError as e:
if hasattr(e, 'code') and e.code == 404:
LOGGER.info("%s was not found" % (url,))
return None
"""return key suitable for use as secret key"""
try:
source = random.SystemRandom()
- except AttributeError, e:
+ except AttributeError as e:
source = random.random()
bits = source.getrandbits(size)
chars = []
while bits > 0:
byte = bits & 0xff
- chars.append(chr(byte))
+ chars.append(int2byte(byte))
bits >>= 8
- return base64.encodestring("".join(chars)).strip()
+ return base64.encodestring(b"".join(chars)).strip()
if __name__ == "__main__":
from optparse import OptionParser
"""
import logging
import os
-import ConfigParser
+from six.moves import configparser
from htsworkflow.util.api import make_django_secret_key
LOGGER = logging.getLogger(__name__)
-class HTSWConfig(ConfigParser.SafeConfigParser):
+class HTSWConfig(configparser.SafeConfigParser):
'''Customization of SafeConfigParser that can open and save itself.
'''
def __init__(self, path=[os.path.expanduser("~/.htsworkflow.ini"),
'/etc/htsworkflow.ini',]):
# ConfigParser isn't a new-style class? lame
- # super(ConfigParser.SafeConfigParser, self).__init__()
- ConfigParser.SafeConfigParser.__init__(self)
+ # super(configparser.SafeConfigParser, self).__init__()
+ configparser.SafeConfigParser.__init__(self)
read_path = self.read(path)
if len(read_path) > 0:
self.filename = read_path[0]
ini_stream = open(self.filename, 'w')
self.write(ini_stream)
ini_stream.close()
- except IOError, e:
+ except IOError as e:
LOGGER.info("Error saving setting: %s" % (str(e)))
"""
Miscellaneous, more refined type casting functions
"""
+from __future__ import unicode_literals
-def unicode_or_none(value):
+def str_or_none(value):
"""
- Convert value to unicode if its not none.
+ Convert value to unicode string if its not none.
"""
if value is None:
return None
else:
- return unicode(value)
+ return str(value)
def parse_flowcell_id(flowcell_id):
"""
and False if it fails.
"""
try:
- XHTML_RDF_DTD = lxml.etree.DTD(external_id='-//W3C//DTD XHTML+RDFa 1.0//EN')
+ XHTML_RDF_DTD = lxml.etree.DTD(external_id=b'-//W3C//DTD XHTML+RDFa 1.0//EN')
except lxml.etree.DTDParseError as e:
LOGGER.warn("Unable to load XHTML DTD %s" % (str(e),))
return
lane_library = [ (x[0][5], x[1]) for x in fc.items()
if library_id_re.match(x[0]) ]
for lane, library_id in lane_library:
- if not self.library[library_id].has_key('lanes'):
+ if 'lanes' not in self.library[library_id]:
self.library[library_id]['lanes'] = []
self.library[library_id]['lanes'].append((fc_id, lane))
import os
from subprocess import Popen, PIPE
+from django.utils.encoding import smart_text
+
logger = logging.getLogger(__name__)
def make_md5sum(filename):
md5_cache = os.path.join(filename+".md5")
if os.path.exists(md5_cache):
logger.debug("Found md5sum in {0}".format(md5_cache))
- stream = open(md5_cache,'r')
+ stream = open(md5_cache,'rt')
lines = stream.readlines()
md5sum = parse_md5sum_line(lines, filename)
else:
if retcode != 0:
logger.error("Trouble with md5sum for {0}".format(filename))
return None
- lines = stdin.split(os.linesep)
+ lines = stdin.splitlines()
md5sum = parse_md5sum_line(lines, filename)
if md5sum is not None:
logger.debug("Caching sum in {0}".format(md5_cache))
- stream = open(md5_cache, "w")
- stream.write(stdin)
+ stream = open(md5_cache, "wt")
+ stream.write(smart_text(stdin))
stream.close()
return md5sum
def parse_md5sum_line(lines, filename):
- md5sum, md5sum_filename = lines[0].split()
+ md5sum, md5sum_filename = smart_text(lines[0]).split()
md5sum_filename = os.path.basename(md5sum_filename)
filename = os.path.basename(filename)
if md5sum_filename != filename:
import os
import sys
+import logging
+
+LOGGER = logging.getLogger(__name__)
try:
import py_sg
# the 2nd of which is the serial number
return data.strip('\x00').split()[1]
-except ImportError, e:
- print >>sys.stderr, "hdquery requires py_sg"
+except ImportError as e:
+ LOGGER.error("hdquery requires py_sg")
def get_hd_serial_num(device):
raise NotImplemented('get_hd_serial_num is not available for anything other than linux')
import os
import gzip
import bz2
-import types
-import urllib2
+import six
+from six.moves import urllib
+
+if six.PY2:
+ import types
+ FILE_CLASS = types.FileType
+else:
+ import io
+ FILE_CLASS = io.IOBase
def isfilelike(file_ref, mode):
"""Does file_ref have the core file operations?
(AKA does it start with protocol:// ?)
"""
#what if mode is 'w'?
- parsed = urllib2.urlparse.urlparse(file_ref)
+ parsed = urllib.parse.urlparse(file_ref)
schema, netloc, path, params, query, fragment = parsed
return len(schema) > 0
Attempt to intelligently turn file_ref into a readable stream
"""
# catch being passed a file
- if type(file_ref) is types.FileType:
+ if isinstance(file_ref, FILE_CLASS):
return file_ref
# does it look like a file?
elif isfilelike(file_ref, mode):
return file_ref
elif isurllike(file_ref, mode):
- return urllib2.urlopen(file_ref)
+ return urllib.request.urlopen(file_ref)
elif os.path.splitext(file_ref)[1] == ".gz":
return gzip.open(file_ref, mode)
elif os.path.splitext(file_ref)[1] == '.bz2':
"""Helper features for working with librdf
"""
+from __future__ import print_function
+
import collections
from datetime import datetime
from glob import glob
-from urlparse import urlparse, urlunparse
-from urllib2 import urlopen
+import six
+from six.moves import urllib
import logging
import os
import sys
"""
for row in results:
for k, v in row.items()[::-1]:
- print "{0}: {1}".format(k, v)
- print
+ print("{0}: {1}".format(k, v))
+ print()
def html_query_results(result_stream):
from django.conf import settings
new_row[k] = Simplified(v)
results.append(new_row)
context = Context({'results': results,})
- print template.render(context)
+ print(template.render(context))
def blankOrUri(value=None):
"""Return a blank node for None or a resource node for strings.
node = None
if value is None:
node = RDF.Node()
- elif type(value) in types.StringTypes:
+ elif isinstance(value, six.string_types):
node = RDF.Node(uri_string=value)
elif isinstance(value, RDF.Node):
node = value
def toTypedNode(value, language="en"):
"""Convert a python variable to a RDF Node with its closest xsd type
"""
- if type(value) == types.BooleanType:
+ if isinstance(value, bool):
value_type = xsdNS['boolean'].uri
if value:
value = u'1'
else:
value = u'0'
- elif type(value) in (types.IntType, types.LongType):
+ elif isinstance(value, int):
value_type = xsdNS['decimal'].uri
- value = unicode(value)
- elif type(value) == types.FloatType:
+ value = str(value)
+ elif isinstance(value, float):
value_type = xsdNS['float'].uri
- value = unicode(value)
+ value = str(value)
elif isinstance(value, datetime):
value_type = xsdNS['dateTime'].uri
if value.microsecond == 0:
value = value.strftime(ISOFORMAT_MS)
else:
value_type = None
- value = unicode(value)
+ if six.PY3:
+ value = str(value)
+ else:
+ value = unicode(value).encode('utf-8')
if value_type is not None:
node = RDF.Node(literal=value, datatype=value_type)
else:
- node = RDF.Node(literal=unicode(value).encode('utf-8'), language=language)
+ node = RDF.Node(literal=value, language=language)
return node
elif value_type in ('dateTime'):
try:
return datetime.strptime(literal, ISOFORMAT_MS)
- except ValueError, _:
+ except ValueError:
return datetime.strptime(literal, ISOFORMAT_SHORT)
return literal
if isinstance(uri, RDF.Uri):
uri = str(uri)
- parsed = urlparse(uri)
+ parsed = urllib.parse.urlparse(uri)
if len(parsed.query) > 0:
return parsed.query
elif len(parsed.fragment) > 0:
def load_into_model(model, parser_name, path, ns=None):
- if type(ns) in types.StringTypes:
+ if isinstance(ns, six.string_types):
ns = RDF.Uri(ns)
if isinstance(path, RDF.Node):
else:
raise ValueError("url to load can't be a RDF literal")
- url_parts = list(urlparse(path))
+ url_parts = list(urllib.parse.urlparse(path))
if len(url_parts[0]) == 0 or url_parts[0] == 'file':
url_parts[0] = 'file'
url_parts[2] = os.path.abspath(url_parts[2])
if parser_name is None or parser_name == 'guess':
parser_name = guess_parser_by_extension(path)
- url = urlunparse(url_parts)
+ url = urllib.parse.urlunparse(url_parts)
logger.info("Opening {0} with parser {1}".format(url, parser_name))
rdf_parser = RDF.Parser(name=parser_name)
statements = rdf_parser.parse_as_stream(url, ns)
retries = 0
succeeded = True
- except RDF.RedlandError, e:
+ except RDF.RedlandError as e:
errmsg = "RDF.RedlandError: {0} {1} tries remaining"
logger.error(errmsg.format(str(e), retries))
ns = fixup_namespace(ns)
logger.debug("load_string_into_model parser={0}, len={1}".format(
parser_name, len(data)))
- rdf_parser = RDF.Parser(name=parser_name)
+ rdf_parser = RDF.Parser(name=str(parser_name))
for s in rdf_parser.parse_string_as_stream(data, ns):
conditionally_add_statement(model, s, ns)
def fixup_namespace(ns):
if ns is None:
ns = RDF.Uri("http://localhost/")
- elif type(ns) in types.StringTypes:
+ elif isinstance(ns, six.string_types):
ns = RDF.Uri(ns)
elif not(isinstance(ns, RDF.Uri)):
errmsg = "Namespace should be string or uri not {0}"
schemas = resource_listdir(__name__, 'schemas')
for s in schemas:
schema = resource_string(__name__, 'schemas/' + s)
+ if six.PY3:
+ # files must be encoded utf-8
+ schema = schema.decode('utf-8')
namespace = 'file://localhost/htsworkflow/schemas/'+s
add_schema(model, schema, namespace)
for path in schema_path:
for pathname in glob(os.path.join(path, '*.turtle')):
url = 'file://' + os.path.splitext(pathname)[0]
- stream = open(pathname, 'r')
+ stream = open(pathname, 'rt')
add_schema(model, stream, url)
stream.close()
element = lxml.html.fromstring(s)
cleaner = lxml.html.clean.Cleaner(page_structure=False)
element = cleaner.clean_html(element)
- text = lxml.html.tostring(element)
+ if six.PY3:
+ text = lxml.html.tostring(element, encoding=str)
+ else:
+ text = lxml.html.tostring(element)
p_len = 3
slash_p_len = 4
import RDF
from pyld import jsonld
+import six
def load_into_model(model, json_data):
'''Given a PyLD dictionary, load its statements into our Redland model
elif nodetype == 'IRI':
return RDF.Node(uri_string=str(value))
else:
- return RDF.Node(literal=unicode(value).encode('utf-8'),
+ if six.PY2:
+ literal = unicode(value).encode('utf-8')
+ else:
+ literal = value
+ return RDF.Node(literal=literal,
datatype=RDF.Uri(datatype))
import os
from unittest import TestCase
-from htsworkflow.util.alphanum import alphanum
+from htsworkflow.util.alphanum import natural_sort_key
class testAlphanum(TestCase):
def test_string(self):
unsorted = ['z5', 'b3', 'b10', 'a001', 'a2']
sorted = [ 'a001', 'a2', 'b3', 'b10', 'z5']
scratch = copy.copy(unsorted)
- scratch.sort(alphanum)
+ scratch.sort(key=natural_sort_key)
- for i in xrange(len(scratch)):
- self.failIfEqual(scratch[i], unsorted[i])
- for i in xrange(len(scratch)):
- self.failUnlessEqual(scratch[i], sorted[i])
+ for i, s in enumerate(scratch):
+ self.failIfEqual(s, unsorted[i])
+ for i, s in enumerate(scratch):
+ self.failUnlessEqual(s, sorted[i])
def test_numbers(self):
unsorted = [5,7,10,18,-1,3]
sorted = [-1,3,5,7,10,18]
scratch = copy.copy(unsorted)
- scratch.sort(alphanum)
+ scratch.sort(key=natural_sort_key)
- for i in xrange(len(scratch)):
- self.failIfEqual(scratch[i], unsorted[i])
- for i in xrange(len(scratch)):
- self.failUnlessEqual(scratch[i], sorted[i])
+ for i, s in enumerate(scratch):
+ self.failIfEqual(s, unsorted[i])
+ for i, s in enumerate(scratch):
+ self.failUnlessEqual(s, sorted[i])
def test_long_names(self):
unsorted = ["1000X Radonius Maximus","10X Radonius","200X Radonius","20X Radonius","20X Radonius Prime","30X Radonius","40X Radonius","Allegia 50 Clasteron","Allegia 500 Clasteron","Allegia 51 Clasteron","Allegia 51B Clasteron","Allegia 52 Clasteron","Allegia 60 Clasteron","Alpha 100","Alpha 2","Alpha 200","Alpha 2A","Alpha 2A-8000","Alpha 2A-900","Callisto Morphamax","Callisto Morphamax 500","Callisto Morphamax 5000","Callisto Morphamax 600","Callisto Morphamax 700","Callisto Morphamax 7000","Callisto Morphamax 7000 SE","Callisto Morphamax 7000 SE2","QRS-60 Intrinsia Machine","QRS-60F Intrinsia Machine","QRS-62 Intrinsia Machine","QRS-62F Intrinsia Machine","Xiph Xlater 10000","Xiph Xlater 2000","Xiph Xlater 300","Xiph Xlater 40","Xiph Xlater 5","Xiph Xlater 50","Xiph Xlater 500","Xiph Xlater 5000","Xiph Xlater 58"]
expected = ['10X Radonius', '20X Radonius', '20X Radonius Prime', '30X Radonius', '40X Radonius', '200X Radonius', '1000X Radonius Maximus', 'Allegia 50 Clasteron', 'Allegia 51 Clasteron', 'Allegia 51B Clasteron', 'Allegia 52 Clasteron', 'Allegia 60 Clasteron', 'Allegia 500 Clasteron', 'Alpha 2', 'Alpha 2A', 'Alpha 2A-900', 'Alpha 2A-8000', 'Alpha 100', 'Alpha 200', 'Callisto Morphamax', 'Callisto Morphamax 500', 'Callisto Morphamax 600', 'Callisto Morphamax 700', 'Callisto Morphamax 5000', 'Callisto Morphamax 7000', 'Callisto Morphamax 7000 SE', 'Callisto Morphamax 7000 SE2', 'QRS-60 Intrinsia Machine', 'QRS-60F Intrinsia Machine', 'QRS-62 Intrinsia Machine', 'QRS-62F Intrinsia Machine', 'Xiph Xlater 5', 'Xiph Xlater 40', 'Xiph Xlater 50', 'Xiph Xlater 58', 'Xiph Xlater 300', 'Xiph Xlater 500', 'Xiph Xlater 2000', 'Xiph Xlater 5000', 'Xiph Xlater 10000']
s = unsorted[:]
- s.sort(alphanum)
+ s.sort(key=natural_sort_key)
self.failUnlessEqual(s, expected)
def test_bad_input(self):
unsorted = [object(), (1,3j)]
s = unsorted[:]
- self.failUnlessRaises(ValueError, s.sort, alphanum)
+ self.failUnlessRaises(ValueError, s.sort, key=natural_sort_key)
def suite():
+from __future__ import print_function, unicode_literals
+
import os
from unittest import TestCase
try:
from xml.etree import ElementTree
-except ImportError, e:
+except ImportError as e:
from elementtree import ElementTree
from htsworkflow.util.ethelp import indent, flatten
def test_indent(self):
flat_foo = ElementTree.tostring(self.foo_tree)
- self.failUnlessEqual(len(flat_foo.split('\n')), 1)
+ self.failUnlessEqual(len(flat_foo.splitlines()), 1)
indent(self.foo_tree)
pretty_foo = ElementTree.tostring(self.foo_tree)
- self.failUnlessEqual(len(pretty_foo.split('\n')), 5)
+ self.failUnlessEqual(len(pretty_foo.splitlines()), 4)
def test_flatten(self):
self.failUnless(flatten(self.foo_tree), 'asdf')
import os
-from StringIO import StringIO
+from six.moves import StringIO
from unittest import TestCase
from htsworkflow.util import makebed
+from __future__ import print_function
+
import os
import types
from unittest import TestCase
-
from datetime import datetime
+import six
from htsworkflow.util.rdfhelp import \
add_default_schemas, \
s = "Argh matey"
node = toTypedNode(s)
self.assertEqual(fromTypedNode(node), s)
- self.assertEqual(type(fromTypedNode(node)), types.UnicodeType)
+ self.assertTrue(isinstance(fromTypedNode(node), six.text_type))
def test_blank_or_uri_blank(self):
node = blankOrUri()
def test_unicode_node_roundtrip(self):
literal = u'\u5927'
roundtrip = fromTypedNode(toTypedNode(literal))
- self.assertEqual(roundtrip, literal)
- self.assertEqual(type(roundtrip), types.UnicodeType)
+ self.assertTrue(isinstance(roundtrip, six.text_type))
def test_datetime_no_microsecond(self):
dateTimeType = xsdNS['dateTime'].uri
self.assertTrue(model.contains_statement(s))
-except ImportError, e:
- print "Unable to test rdfhelp"
+except ImportError as e:
+ print("Unable to test rdfhelp")
def suite():
from unittest import TestSuite, defaultTestLoader
import os
-from StringIO import StringIO
+from six.moves import StringIO
from unittest import TestCase
from htsworkflow.util import validate
#!/usr/bin/env python
-
from optparse import OptionParser
import os
import re
import sys
+import logging
+
+LOGGER = logging.getLogger(__name__)
def main(cmdline=None):
parser = make_parser()
opts.uniform_lengths,
opts.max_errors)
if errors > 0:
- print "%s failed validation" % (filename,)
+ LOGGER.error("%s failed validation", filename)
error_happened = True
stream.close()
def validate_re(pattern, line, line_number, errmsg):
if pattern.match(line) is None:
- print errmsg, "[%d]: %s" % (line_number, line)
+ LOGGER.error("%s [%d]: %s", errmsg, line_number, line)
return 1
else:
return 0
if line_length is None:
line_length = len(line)
elif len(line) != line_length:
- print errmsg, "%d: %s" %(line_number, line)
+ LOGGER.error("%s %d: %s", errmsg, line_number, line)
error_count = 1
return line_length, error_count
version = None
try:
import pkg_resources
- except ImportError, e:
+ except ImportError as e:
LOGGER.error("Can't find version number, please install setuptools")
raise e
try:
version = pkg_resources.get_distribution("htsworkflow")
- except pkg_resources.DistributionNotFound, e:
+ except pkg_resources.DistributionNotFound as e:
LOGGER.error("Package not installed")
return version
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
from django.contrib import admin
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
from .models import Item
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
import datetime
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
import logging
try:
import uuid
-except ImportError, e:
+except ImportError as e:
# Some systems are using python 2.4, which doesn't have uuid
# this is a stub
LOGGER.warning('Real uuid is not available, initializing fake uuid module')
"""
Assigns a UUID to model on save
"""
- #print 'Entered _assign_uuid'
if instance.uuid is None or len(instance.uuid) != 32:
instance.uuid = uuid.uuid1().hex
name = models.CharField(max_length=256)
url = models.URLField(blank=True, null=True)
- def __unicode__(self):
- return u"%s" % (self.name)
+ def __str__(self):
+ return "%s" % (self.name)
class Location(models.Model):
notes = models.TextField(blank=True, null=True)
- def __unicode__(self):
+ def __str__(self):
if len(self.location_description) > 16:
- return u"%s: %s" % (self.name, self.location_description[0:16]+u"...")
+ return "%s: %s" % (self.name, self.location_description[0:16]+"...")
else:
- return u"%s: %s" % (self.name, self.location_description)
+ return "%s: %s" % (self.name, self.location_description)
post_init.connect(_assign_uuid, sender=Location)
notes = models.TextField(blank=True, null=True)
- def __unicode__(self):
+ def __str__(self):
name = u''
if self.model_id:
- name += u"model:%s " % (self.model_id)
+ name += "model:%s " % (self.model_id)
if self.part_number:
- name += u"part:%s " % (self.part_number)
+ name += "part:%s " % (self.part_number)
if self.lot_number:
- name += u"lot:%s " % (self.lot_number)
+ name += "lot:%s " % (self.lot_number)
- return u"%s: %s" % (name, self.purchase_date)
+ return "%s: %s" % (name, self.purchase_date)
class Meta:
verbose_name_plural = "Item Info"
name = models.CharField(max_length=64, unique=True)
description = models.TextField(blank=True, null=True)
- def __unicode__(self):
- return u"%s" % (self.name)
+ def __str__(self):
+ return "%s" % (self.name)
class ItemStatus(models.Model):
name = models.CharField(max_length=64, unique=True)
notes = models.TextField(blank=True, null=True)
- def __unicode__(self):
+ def __str__(self):
return self.name
class Meta:
notes = models.TextField(blank=True, null=True)
- def __unicode__(self):
+ def __str__(self):
if self.barcode_id is None or len(self.barcode_id) == 0:
- return u"invu|%s" % (self.uuid)
+ return "invu|%s" % (self.uuid)
else:
- return u"invb|%s" % (self.barcode_id)
+ return "invb|%s" % (self.barcode_id)
def get_absolute_url(self):
return '/inventory/%s/' % (self.uuid)
template = models.TextField()
- def __unicode__(self):
+ def __str__(self):
if self.default:
return u'%s %s' % (self.item_type.name, self.printer.name)
else:
creation_date = models.DateTimeField(auto_now_add=True)
modified_date = models.DateTimeField(auto_now=True)
- def __unicode__(self):
- return u"%s: %s" % (str(self.flowcell), ', '.join([str(s) for s in self.storage_devices.iterator()]))
+ def __str__(self):
+ return "%s: %s" % (str(self.flowcell), ', '.join([str(s) for s in self.storage_devices.iterator()]))
class Meta:
verbose_name_plural = "Long Term Storage"
"""
flowcell = models.ForeignKey(FlowCell)
- def __unicode__(self):
- return u"%s: %s" % (str(self.flowcell), ', '.join([str(s) for s in self.reagent.iterator()]))
+ def __str__(self):
+ return "%s: %s" % (str(self.flowcell), ', '.join([str(s) for s in self.reagent.iterator()]))
class ReagentLibrary(ReagentBase):
"""
library = models.ForeignKey(Library)
- def __unicode__(self):
- return u"%s: %s" % (str(self.library), ', '.join([str(s) for s in self.reagent.iterator()]))
+ def __str__(self):
+ return "%s: %s" % (str(self.library), ', '.join([str(s) for s in self.reagent.iterator()]))
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
+from django.utils.encoding import smart_text
from .models import Item, Vendor
from .inventory_factory import ItemFactory, LongTermStorageFactory
self.failUnlessEqual(response.status_code, 200)
model = get_model()
- load_string_into_model(model, 'rdfa', response.content, url)
+ load_string_into_model(model, 'rdfa', smart_text(response.content), url)
itemNode = RDF.Node(RDF.Uri(url))
item_type = fromTypedNode(
+from __future__ import unicode_literals
+
from django.conf.urls import patterns
urlpatterns = patterns('',
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
from django.conf import settings
from django.contrib.auth.decorators import login_required
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
INVENTORY_CONTEXT_DEFAULTS = {
printer_template = PrinterTemplate.objects.get(default=True)
except ObjectDoesNotExist:
msg = "No template for item type (%s) and no default template found" % (item_type.name)
- raise ValueError, msg
+ raise ValueError(msg)
return printer_template
"""
try:
item = Item.objects.get(barcode_id=barcode_id)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
item = None
return item_summary_by_uuid(request, None, msg, item)
if item is None:
try:
item = Item.objects.get(uuid=uuid)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
item = None
context_dict = {
"""
try:
item = Item.objects.get(uuid=uuid)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
item = None
msg = "Item with UUID %s does not exist" % (uuid)
# Retrieve Storage Device
try:
sd = Item.objects.get(barcode_id=serial)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
msg = "Item with barcode_id of %s not found." % (serial)
raise ObjectDoesNotExist(msg)
# Retrieve FlowCell
try:
fc = FlowCell.objects.get(flowcell_id__startswith=flowcell)
- except ObjectDoesNotExist, e:
+ except ObjectDoesNotExist as e:
msg = "FlowCell with flowcell_id of %s not found." % (flowcell)
raise ObjectDoesNotExist(msg)
lts = None
if count > 1:
msg = "There really should only be one longtermstorage object per flowcell"
- raise ValueError, msg
+ raise ValueError(msg)
elif count == 1:
# lts already attached to flowcell
lts = fc.longtermstorage_set.all()[0]
+from __future__ import unicode_literals
+
from django.template import Context, Template
from django.contrib import admin
+from __future__ import unicode_literals
+
from django.db import models
class LabelPrinter(models.Model):
labels = models.CharField(max_length=200)
notes = models.TextField(null=True, blank=True)
- def __unicode__(self):
- return u'%s: %s' % (self.name, self.labels)
+ def __str__(self):
+ return '%s: %s' % (self.name, self.labels)
class LabelTemplate(models.Model):
"""
ZPL_code = models.TextField('template')
- def __unicode__(self):
+ def __str__(self):
return '%s %s' % (self.name, self.printer.name)
class LabelContent(models.Model):
Replace these with more appropriate tests for your application.
"""
+from __future__ import unicode_literals
from django.test import TestCase
+from __future__ import unicode_literals
# Create your views here.
requests>=2.0
wsgiref==0.1.2
factory_boy>=2.4
+six>=1.8
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
from django.contrib import admin
from django.contrib.admin import widgets
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
from django.conf import settings
from django.contrib.auth.backends import ModelBackend
if user.check_password(password):
return user
#except self.user_class.DoesNotExist:
- except Exception, e:
+ except Exception as e:
logger.error(str(e))
return None
try:
return self.user_class.objects.get(pk=user_id)
#except self.user_class.DoesNotExist:
- except Exception, e:
+ except Exception as e:
logger.error(str(e))
return None
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
import django
from django.contrib.admin.views.main import ChangeList
+from __future__ import unicode_literals
+
import types
import logging
-import urlparse
from django.db import models
from django.contrib.auth.models import User, UserManager
from django.core import urlresolvers
from django.db.models.signals import pre_save, post_save
from django.db import connection
+import six
logger = logging.getLogger(__name__)
source = models.CharField(max_length=500, blank=True, null=True, db_index=True)
biology = models.TextField(blank=True, null=True)
notes = models.TextField(blank=True, null=True)
- def __unicode__(self):
- return u'%s - %s' % (self.antigene, self.antibodies)
+ def __str__(self):
+ return '%s - %s' % (self.antigene, self.antibodies)
class Meta:
verbose_name_plural = "antibodies"
ordering = ["antigene"]
db_index=True)
notes = models.TextField(blank=True)
- def __unicode__(self):
- return unicode(self.cellline_name)
+ def __str__(self):
+ return str(self.cellline_name)
class Meta:
ordering = ["cellline_name"]
verbose_name = 'Short Name')
notes = models.TextField(blank=True)
- def __unicode__(self):
- return unicode(self.condition_name)
+ def __str__(self):
+ return str(self.condition_name)
class Meta:
ordering = ["condition_name"]
class ExperimentType(models.Model):
name = models.CharField(max_length=50, unique=True)
- def __unicode__(self):
- return unicode(self.name)
+ def __str__(self):
+ return str(self.name)
class Tag(models.Model):
tag_name = models.CharField(max_length=100, db_index=True,blank=False,null=False)
context = models.CharField(max_length=50,
choices=TAG_CONTEXT, default='Library')
- def __unicode__(self):
- return u'%s' % (self.tag_name)
+ def __str__(self):
+ return '%s' % (self.tag_name)
class Meta:
ordering = ["context","tag_name"]
common_name = models.CharField(max_length=256, blank=True)
#use_genome_build = models.CharField(max_length=100, blank=False, null=False)
- def __unicode__(self):
- return u'%s (%s)' % (self.scientific_name, self.common_name)
+ def __str__(self):
+ return '%s (%s)' % (self.scientific_name, self.common_name)
class Meta:
verbose_name_plural = "species"
users = models.ManyToManyField('HTSUser', null=True, blank=True)
users.admin_order_field = "username"
- def __unicode__(self):
- str = unicode(self.name)
+ def __str__(self):
+ name = str(self.name)
if self.contact is not None and len(self.contact) > 0:
- str += u' ('+self.contact+u')'
- return str
+ name += ' ('+self.contact+')'
+ return name
def Users(self):
users = self.users.all().order_by('username')
- return ", ".join([unicode(a) for a in users ])
+ return ", ".join([str(a) for a in users ])
class Meta:
ordering = ["name","contact"]
can_multiplex = models.BooleanField(default=True,
help_text="Does this adapter provide multiplexing?")
- def __unicode__(self):
- return unicode(self.name)
+ def __str__(self):
+ return str(self.name)
class Meta:
ordering = ["-id"]
undiluted_concentration = models.DecimalField("Concentration",
max_digits=5, decimal_places=2, blank=True, null=True,
- help_text=u"Undiluted concentration (ng/\u00b5l)")
+ help_text = "Undiluted concentration (ng/\u00b5l)")
# note \u00b5 is the micro symbol in unicode
successful_pM = models.DecimalField(max_digits=9,
decimal_places=1, blank=True, null=True)
bioanalyzer_summary = models.TextField(blank=True,default="")
bioanalyzer_concentration = models.DecimalField(max_digits=5,
decimal_places=2, blank=True, null=True,
- help_text=u"(ng/\u00b5l)")
+ help_text="(ng/\u00b5l)")
bioanalyzer_image_url = models.URLField(blank=True,default="")
- def __unicode__(self):
- return u'#%s: %s' % (self.id, self.library_name)
+ def __str__(self):
+ return '#%s: %s' % (self.id, self.library_name)
class Meta:
verbose_name_plural = "libraries"
adapter_type = self.library_type.id,
multiplex_id = multiplex_id)
return multiplex.sequence
- except MultiplexIndex.DoesNotExist, e:
+ except MultiplexIndex.DoesNotExist as e:
return None
def index_sequence_text(self, seperator=' '):
sequences = self.index_sequences()
if sequences is None:
return ""
- if type(sequences) in types.StringTypes:
+ if isinstance(sequences, six.string_types):
return sequences
multiplex_ids = sequences.keys()
multiplex_ids.sort()
tstr = ''
ar = []
for t in affs:
- ar.append(t.__unicode__())
+ ar.append(t.__str__())
return '%s' % (", ".join(ar))
def is_archived(self):
affs = self.tags.all().order_by('tag_name')
ar = []
for t in affs:
- ar.append(t.__unicode__())
- return u'%s' % ( ", ".join(ar))
+ ar.append(t.__str__())
+ return '%s' % ( ", ".join(ar))
def DataRun(self):
str ='<a target=_self href="/admin/experiments/datarun/?q='+self.id+'" title="Check All Data Runs for This Specific Library ..." ">Data Run</a>'
# Check data sanity
if res[2] != "OK":
- return u'<div style="border:solid red 2px">'+res[2]+'</div>'
+ return '<div style="border:solid red 2px">'+res[2]+'</div>'
rc = "%1.2f" % (res[1]/1000000.0)
# Color Scheme: green is more than 10M, blue is more than 5M, orange is more than 3M and red is less. For RNAseq, all those thresholds should be doubled
if res[1] > rc_thr[2]:
bgcolor ='#ffcc33' # Orange
tstr = '<div style="background-color:'+bgcolor+';color:black">'
- tstr += res[0].__unicode__()+' Lanes, '+rc+' M Reads'
+ tstr += res[0].__str__()+' Lanes, '+rc+' M Reads'
tstr += '</div>'
else: tstr = 'not processed yet'
return tstr
def admin_url(self):
return '/admin/%s/%s/%d' % (self._meta.app_label, self._meta.module_name, self.id)
- def __unicode__(self):
- #return unicode(self.username) + u" (" + unicode(self.get_full_name()) + u")"
- return unicode(self.get_full_name()) + u' (' + unicode(self.username) + ')'
+ def __str__(self):
+ #return str(self.username) + " (" + str(self.get_full_name()) + u")"
+ return str(self.get_full_name()) + ' (' + str(self.username) + ')'
def HTSUserInsertID(sender, instance, **kwargs):
"""
+from __future__ import unicode_literals
+
from django.conf import settings
import glob
+from __future__ import unicode_literals
+
import datetime
import factory
from django.test import TestCase, RequestFactory
from django.conf import settings
+from django.utils.encoding import smart_text, smart_str
from .models import Affiliation, ExperimentType, Species, Library
from .views import library_dict, library_json, library
from .samples_factory import *
from htsworkflow.auth import apidata
-from htsworkflow.util.conversion import unicode_or_none
+from htsworkflow.util.conversion import str_or_none
from htsworkflow.util.ethelp import validate_xhtml
class LibraryTestCase(TestCase):
lib_dict = library_dict(library.id)
url = '/samples/library/%s/json' % (library.id,)
lib_response = self.client.get(url, apidata)
- lib_json = json.loads(lib_response.content)['result']
+ lib_json = json.loads(smart_text(lib_response.content))['result']
for d in [lib_dict, lib_json]:
# amplified_from_sample is a link to the library table,
#self.failUnlessEqual(d['amplified_from_sample'], lib.amplified_from_sample)
self.failUnlessEqual(d['antibody_id'], library.antibody_id)
self.failUnlessEqual(d['cell_line_id'], library.cell_line_id)
- self.failUnlessEqual(d['cell_line'], unicode_or_none(library.cell_line))
+ self.failUnlessEqual(d['cell_line'], str_or_none(library.cell_line))
self.failUnlessEqual(d['experiment_type'], library.experiment_type.name)
self.failUnlessEqual(d['experiment_type_id'], library.experiment_type_id)
self.failUnlessEqual(d['gel_cut_size'], library.gel_cut_size)
self.failUnlessEqual(d['stopping_point'], library.stopping_point)
self.failUnlessEqual(d['successful_pM'], library.successful_pM)
self.failUnlessEqual(d['undiluted_concentration'],
- unicode(library.undiluted_concentration))
+ str(library.undiluted_concentration))
def junk(self):
response = self.client.get(library.get_absolute_url())
self.assertEqual(response.status_code, 200)
- content = response.content
+ content = smart_text(response.content)
load_string_into_model(model, 'rdfa', content)
body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
response = self.client.get('/library/')
self.assertEqual(response.status_code, 200)
- load_string_into_model(model, 'rdfa', response.content)
+ load_string_into_model(model, 'rdfa', smart_text(response.content))
errmsgs = list(inference.run_validation())
self.assertEqual(len(errmsgs), 0)
libNS = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#")
from htsworkflow.util.rdfhelp import dump_model
-except ImportError,e:
+except ImportError as e:
HAVE_RDF = False
## request = self.request.get(url)
## lib_response = library(request)
lib_response = self.client.get(url)
- with open('/tmp/body.html', 'w') as outstream:
- outstream.write(lib_response.content)
- self.failIfEqual(len(lib_response.content), 0)
+ lib_body = smart_str(lib_response.content)
+ self.failIfEqual(len(lib_body), 0)
+ with open('/tmp/body.html', 'wt') as outstream:
+ outstream.write(lib_body)
parser.parse_string_into_model(model,
- lib_response.content,
+ lib_body,
'http://localhost'+url)
# help debugging rdf errrors
#with open('/tmp/test.ttl', 'w') as outstream:
self.failUnlessEqual(len(statements), len(values),
"Couln't find %s %s %s" % (s,p,o))
for s in statements:
- self.failUnless(unicode(s.object.uri) in values)
+ self.failUnless(str(s.object.uri) in values)
+from __future__ import unicode_literals
+
from django.conf.urls import patterns, url
urlpatterns = patterns('samples.views',
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
# Create your views here.
-import StringIO
import logging
import os
import sys
try:
import json
-except ImportError, e:
+except ImportError as e:
import simplejson as json
from django.views.decorators.csrf import csrf_exempt
from htsworkflow.pipelines import runfolder
from htsworkflow.pipelines.eland import ResultLane
from htsworkflow.pipelines.samplekey import SampleKey
-from htsworkflow.util.conversion import unicode_or_none, parse_flowcell_id
+from htsworkflow.util.conversion import str_or_none, parse_flowcell_id
from htsworkflow.util import makebed
from htsworkflow.util import opener
summary['lanes_run'] = lanes_run
summary['is_archived'] = lib.is_archived()
records.append(summary)
- cl.result_count = unicode(cl.paginator._count)
+ cl.result_count = str(cl.paginator._count)
return {'library_list': records}
summary_list.append(eland_summary)
- #except Exception, e:
+ #except Exception as e:
# summary_list.append("Summary report needs to be updated.")
# LOGGER.error("Exception: " + str(e))
"""
try:
lib = Library.objects.get(id=library_id)
- except Library.DoesNotExist, e:
+ except Library.DoesNotExist as e:
return None
#lane_info = lane_information(lib.lane_set)
#'antibody_name': lib.antibody_name(), # we have no antibodies.
'antibody_id': lib.antibody_id,
'cell_line_id': lib.cell_line_id,
- 'cell_line': unicode_or_none(lib.cell_line),
+ 'cell_line': str_or_none(lib.cell_line),
'experiment_type': lib.experiment_type.name,
'experiment_type_id': lib.experiment_type_id,
'gel_cut_size': lib.gel_cut_size,
'notes': lib.notes,
'replicate': lib.replicate,
'stopping_point': lib.stopping_point,
- 'successful_pM': unicode_or_none(lib.successful_pM),
- 'undiluted_concentration': unicode_or_none(lib.undiluted_concentration)
+ 'successful_pM': str_or_none(lib.successful_pM),
+ 'undiluted_concentration': str_or_none(lib.undiluted_concentration)
}
if lib.library_type_id is None:
info['library_type'] = None
import os
import re
import sys
-import urllib2
-import urlparse
+from six.moves import urllib
from django.conf import settings
Creates link between flowcell and storage device over http
"""
for fc in flowcells:
- url = urlparse.urljoin(root_url, '%s/%s/' % (fc, serial))
+ url = urllib.parse.urljoin(root_url, '%s/%s/' % (fc, serial))
- req = urllib2.Request(url)
+ req = urllib.request.Request(url)
try:
response = urllib2.urlopen(req)
- except urllib2.URLError, e:
+ except urllib.request.HTTPError, e:
print 'ERROR - HTTP OUTPUT (Return Code: %s); use -v/--verbose for more details.' % (e.code)
if debug:
print e.read()
#!/usr/bin/python
+from __future__ import print_function, unicode_literals
import logging
import optparse
run_commands, pathname_to_run_name
from htsworkflow.pipelines.srf import ILLUMINA2SRF10, ILLUMINA2SRF11, SOLEXA2SRF
+LOGGER = logging.getLogger(__name__)
+
def make_parser():
usage = '%prog: [options] runfolder -l 1,2,3 [runfolder -l 5,6 ...]'
runs = runfolder.get_runs(runfolder_path)
# give up if there are anything other than 1 run
if len(runs) > 1:
- print 'ERROR: Too many run directories in %s' %(runfolder_path,)
+ LOGGER.error('Too many run directories in %s', runfolder_path)
return 1
elif len(runs) == 1:
bustard_dir = runs[0].bustard.pathname
opts.dest_dir,
opts.runfolder_version)
else:
- print "ERROR: Couldn't find a bustard directory in", runfolder_path
+ LOGGER.error("Couldn't find a bustard directory in %s",
+ runfolder_path)
return 1
if not opts.dry_run:
run_commands(cwd, cmd_list, opts.jobs)
else:
for cwd, cmd_list in cmds.items():
- print cwd
- print cmd_list
- print 'jobs: ', opts.jobs
+ print(cwd)
+ print(cmd_list)
+ print('jobs: ', opts.jobs)
return 0
flowcell_info = None
# get info about flowcell from server or shelf
- if not fcdb.has_key(flowcell):
+ if flowcell not in fcdb:
url = api.flowcell_url(baseurl, flowcell)
flowcell_info = api.retrieve_info(url, apidata)
if flowcell_info is not None:
from unittest import TestCase, skipIf
-from StringIO import StringIO
+from six.moves import StringIO
from htsworkflow.automation.solexa import is_runfolder
try:
import os
-from StringIO import StringIO
+from six.moves import StringIO
import sys
from unittest import TestCase
target1.seek(0)
lines1 = target1.readlines()
- self.failUnlessEqual(len(lines1),4)
- self.failUnlessEqual(lines1[0].rstrip(), '@header/1')
- self.failUnlessEqual(lines1[1].rstrip(), 'AGCT')
- self.failUnlessEqual(lines1[2].rstrip(), '+')
- self.failUnlessEqual(lines1[3].rstrip(), 'IIII')
+ self.assertEqual(len(lines1),4)
+ self.assertEqual(lines1[0].rstrip(), '@header/1')
+ self.assertEqual(lines1[1].rstrip(), 'AGCT')
+ self.assertEqual(lines1[2].rstrip(), '+')
+ self.assertEqual(lines1[3].rstrip(), 'IIII')
target2.seek(0)
lines2 = target2.readlines()
- self.failUnlessEqual(len(lines2),4)
- self.failUnlessEqual(lines2[0].rstrip(), '@header/2')
- self.failUnlessEqual(lines2[1].rstrip(), 'TTTT')
- self.failUnlessEqual(lines2[2].rstrip(), '+')
- self.failUnlessEqual(lines2[3].rstrip(), 'B+++')
+ self.assertEqual(len(lines2),4)
+ self.assertEqual(lines2[0].rstrip(), '@header/2')
+ self.assertEqual(lines2[1].rstrip(), 'TTTT')
+ self.assertEqual(lines2[2].rstrip(), '+')
+ self.assertEqual(lines2[3].rstrip(), 'B+++')
def test_split_at_with_header(self):
source = StringIO("""@header1
target1.seek(0)
lines1 = target1.readlines()
- self.failUnlessEqual(len(lines1),8)
- self.failUnlessEqual(lines1[0].rstrip(), '@foo_header1/1')
- self.failUnlessEqual(lines1[1].rstrip(), 'AGCT')
- self.failUnlessEqual(lines1[2].rstrip(), '+')
- self.failUnlessEqual(lines1[3].rstrip(), '@III')
+ self.assertEqual(len(lines1),8)
+ self.assertEqual(lines1[0].rstrip(), '@foo_header1/1')
+ self.assertEqual(lines1[1].rstrip(), 'AGCT')
+ self.assertEqual(lines1[2].rstrip(), '+')
+ self.assertEqual(lines1[3].rstrip(), '@III')
target2.seek(0)
lines2 = target2.readlines()
- self.failUnlessEqual(len(lines2),8)
- self.failUnlessEqual(lines2[0].rstrip(), '@foo_header1/2')
- self.failUnlessEqual(lines2[1].rstrip(), 'TTTT')
- self.failUnlessEqual(lines2[2].rstrip(), '+')
- self.failUnlessEqual(lines2[3].rstrip(), 'B+++')
+ self.assertEqual(len(lines2),8)
+ self.assertEqual(lines2[0].rstrip(), '@foo_header1/2')
+ self.assertEqual(lines2[1].rstrip(), 'TTTT')
+ self.assertEqual(lines2[2].rstrip(), '+')
+ self.assertEqual(lines2[3].rstrip(), 'B+++')
def test_single_at(self):
source = StringIO("""@header1
target1.seek(0)
lines1 = target1.readlines()
- self.failUnlessEqual(len(lines1),8)
- self.failUnlessEqual(lines1[0].rstrip(), '@header1')
- self.failUnlessEqual(lines1[1].rstrip(), 'AGCTTTTT')
- self.failUnlessEqual(lines1[2].rstrip(), '+')
- self.failUnlessEqual(lines1[3].rstrip(), '@IIIB+++')
+ self.assertEqual(len(lines1),8)
+ self.assertEqual(lines1[0].rstrip(), '@header1')
+ self.assertEqual(lines1[1].rstrip(), 'AGCTTTTT')
+ self.assertEqual(lines1[2].rstrip(), '+')
+ self.assertEqual(lines1[3].rstrip(), '@IIIB+++')
def test_single_at_with_header(self):
source = StringIO("""@header1
target1.seek(0)
lines1 = target1.readlines()
- self.failUnlessEqual(len(lines1),8)
- self.failUnlessEqual(lines1[0].rstrip(), '@foo_header1')
- self.failUnlessEqual(lines1[1].rstrip(), 'AGCTTTTT')
- self.failUnlessEqual(lines1[2].rstrip(), '+')
- self.failUnlessEqual(lines1[3].rstrip(), '@IIIB+++')
+ self.assertEqual(len(lines1),8)
+ self.assertEqual(lines1[0].rstrip(), '@foo_header1')
+ self.assertEqual(lines1[1].rstrip(), 'AGCTTTTT')
+ self.assertEqual(lines1[2].rstrip(), '+')
+ self.assertEqual(lines1[3].rstrip(), '@IIIB+++')
def test_is_srf(self):
cnf4_srf = 'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'
cnf1_path = os.path.join(TESTDATA_DIR, cnf1_srf)
is_srf = srf2fastq.is_srf
- self.failUnlessEqual(is_srf(__file__), False)
- self.failUnlessEqual(is_srf(cnf4_path), True)
- self.failUnlessEqual(is_srf(cnf1_path), True)
+ self.assertEqual(is_srf(__file__), False)
+ self.assertEqual(is_srf(cnf4_path), True)
+ self.assertEqual(is_srf(cnf1_path), True)
def test_is_cnf1(self):
cnf4_srf = 'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'
is_cnf1 = srf2fastq.is_cnf1
self.failUnlessRaises(ValueError, is_cnf1, __file__)
- self.failUnlessEqual(is_cnf1(cnf4_path), False)
- self.failUnlessEqual(is_cnf1(cnf1_path), True)
+ self.assertEqual(is_cnf1(cnf4_path), False)
+ self.assertEqual(is_cnf1(cnf1_path), True)
def suite():