from django.core.exceptions import ObjectDoesNotExist
from django.test import TestCase
from django.test.utils import setup_test_environment, teardown_test_environment
+from django.db import connection
+from django.conf import settings
from htsworkflow.frontend.experiments import models
from htsworkflow.frontend.experiments import experiments
from htsworkflow.frontend.auth import apidata
NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
+from django.db import connection
+OLD_DB_NAME = settings.DATABASE_NAME
+VERBOSITY = 0
+def setUpModule():
+ setup_test_environment()
+ settings.DEBUG = False
+ connection.creation.create_test_db(VERBOSITY)
+
+def tearDownModule():
+ connection.creation.destroy_test_db(OLD_DB_NAME, VERBOSITY)
+ teardown_test_environment()
+
class ClusterStationTestCases(TestCase):
fixtures = ['test_flowcells.json']
file_type_objects = models.FileType.objects
name = 'QSEQ tarfile'
file_type_object = file_type_objects.get(name=name)
- self.assertEqual(u"<FileType: QSEQ tarfile>",
+ self.assertEqual(u"QSEQ tarfile",
unicode(file_type_object))
-class TestFileType(TestCase):
def test_find_file_type(self):
file_type_objects = models.FileType.objects
cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
class TestEmailNotify(TestCase):
fixtures = ['test_flowcells.json']
- @classmethod
- def setUpClass(self):
- # isolate django mail when running under unittest2
- setup_test_environment()
-
- @classmethod
- def tearDownClass(self):
- # isolate django mail when running under unittest2
- teardown_test_environment()
-
-
def test_started_email_not_logged_in(self):
response = self.client.get('/experiments/started/153/')
self.assertEqual(response.status_code, 302)
errmsgs = list(inference.run_validation())
self.assertEqual(len(errmsgs), 0)
+
+
+OLD_DB = settings.DATABASES['default']['NAME']
+def setUpModule():
+ setup_test_environment()
+ connection.creation.create_test_db()
+
+def tearDownModule():
+ connection.creation.destroy_test_db(OLD_DB)
+ teardown_test_environment()
+
+def suite():
+ from unittest2 import TestSuite, defaultTestLoader
+ suite = TestSuite()
+ for testcase in [ClusterStationTestCases,
+ SequencerTestCases,
+ ExerimentsTestCases,
+ TestFileType,
+ TestEmailNotify,
+ TestSequencer]:
+ suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
+ return suite
+
+if __name__ == "__main__":
+ from unittest2 import main
+ main(defaultTest="suite")
-"""
-Slightly modified version of the django admin component that handles filters and searches
-"""
-from django.contrib.admin.filterspecs import FilterSpec
-from django.contrib.admin.options import IncorrectLookupParameters
-from django.core.paginator import Paginator, InvalidPage, EmptyPage
-from django.db import models
-from django.db.models.query import QuerySet
-from django.utils.encoding import force_unicode, smart_str
-from django.utils.translation import ugettext
-from django.utils.http import urlencode
-import operator
+from django.contrib.admin.views.main import ChangeList
+
+class HTSChangeList(ChangeList):
+ def __init__(self, request, model, list_filter, search_fields,
+ list_per_page, model_admin, extra_filters=None):
+ """Simplification of the django model filter view
+
+ The new parameter "extra_filter" should be a mapping
+ of that will be passed as keyword arguments to
+ queryset.filter
+ """
+ self.extra_filters = extra_filters
+ super(HTSChangeList, self).__init__(
+ request, #request
+ model, #model
+ [], # list_display
+ None, # list_display_links
+ list_filter, #list_filter
+ None, # date_hierarchy
+ search_fields, #search_fields
+ None, # list_select_related,
+ list_per_page, #list_per_page
+ 20000, #list_max_show_all
+ None, # list_editable
+ model_admin #model_admin
+ )
+
+ self.is_popup = False
+ # I removed to field in the first version
-try:
- set
-except NameError:
- from sets import Set as set # Python 2.3 fallback
-
-# The system will display a "Show all" link on the change list only if the
-# total result count is less than or equal to this setting.
-MAX_SHOW_ALL_ALLOWED = 20000
-
-# Changelist settings
-ALL_VAR = 'all'
-ORDER_VAR = 'o'
-ORDER_TYPE_VAR = 'ot'
-PAGE_VAR = 'p'
-SEARCH_VAR = 'q'
-TO_FIELD_VAR = 't'
-IS_POPUP_VAR = 'pop'
-ERROR_FLAG = 'e'
-
-# Text to display within change-list table cells if the value is blank.
-EMPTY_CHANGELIST_VALUE = '(None)'
-
-class ChangeList(object):
-
- #def __init__(self, request, model, list_display, list_display_links, list_filter, date_hierarchy, search_fields, list_select_related, list_per_page, list_editable, model_admin):
- def __init__(self, request, model, list_filter, search_fields, list_per_page, queryset=None):
- self.model = model
- self.opts = model._meta
- self.lookup_opts = self.opts
- if queryset is None:
- self.root_query_set = model.objects.all()
- else:
- self.root_query_set = queryset
- self.list_display = []
- self.list_display_links = None
- self.list_filter = list_filter
- #self.date_hierarchy = date_hierarchy
- self.search_fields = search_fields
- self.list_select_related = None
- self.list_per_page = list_per_page
- #self.list_editable = list_editable
- self.model_admin = None
-
- # Get search parameters from the query string.
- try:
- self.page_num = int(request.GET.get(PAGE_VAR, '0'))
- except ValueError:
- self.page_num = 0
- self.show_all = 'all' in request.GET
- #self.is_popup = IS_POPUP_VAR in request.GET
- #self.to_field = request.GET.get(TO_FIELD_VAR)
- self.params = dict(request.GET.items())
- if PAGE_VAR in self.params:
- del self.params[PAGE_VAR]
- #if TO_FIELD_VAR in self.params:
- # del self.params[TO_FIELD_VAR]
- if ERROR_FLAG in self.params:
- del self.params[ERROR_FLAG]
-
self.multi_page = True
self.can_show_all = False
- self.order_field, self.order_type = self.get_ordering()
- self.query = request.GET.get(SEARCH_VAR, '')
- self.query_set = self.get_query_set()
- self.get_results(request)
- #self.title = (self.is_popup and ugettext('Select %s') % force_unicode(self.opts.verbose_name) or ugettext('Select %s to change') % force_unicode(self.opts.verbose_name))
- self.filter_specs, self.has_filters = self.get_filters(request)
- #self.pk_attname = self.lookup_opts.pk.attname
-
- def get_filters(self, request):
- filter_specs = []
- if self.list_filter:
- filter_fields = [self.lookup_opts.get_field(field_name) for field_name in self.list_filter]
- for f in filter_fields:
- spec = FilterSpec.create(f, request, self.params, self.model, self.model_admin)
- if spec and spec.has_output():
- filter_specs.append(spec)
- return filter_specs, bool(filter_specs)
-
- def get_query_string(self, new_params=None, remove=None):
- if new_params is None: new_params = {}
- if remove is None: remove = []
- p = self.params.copy()
- for r in remove:
- for k in p.keys():
- if k.startswith(r):
- del p[k]
- for k, v in new_params.items():
- if v is None:
- if k in p:
- del p[k]
- else:
- p[k] = v
- return '?%s' % urlencode(p)
-
- def get_results(self, request):
- paginator = Paginator(self.query_set, self.list_per_page)
- # Get the number of objects, with admin filters applied.
- result_count = paginator.count
-
- # Get the total number of objects, with no admin filters applied.
- # Perform a slight optimization: Check to see whether any filters were
- # given. If not, use paginator.hits to calculate the number of objects,
- # because we've already done paginator.hits and the value is cached.
- if not self.query_set.query.where:
- full_result_count = result_count
- else:
- full_result_count = self.root_query_set.count()
-
- can_show_all = result_count <= MAX_SHOW_ALL_ALLOWED
- multi_page = result_count > self.list_per_page
-
- # Get the list of objects to display on this page.
- if (self.show_all and can_show_all) or not multi_page:
- result_list = self.query_set._clone()
- else:
- try:
- result_list = paginator.page(self.page_num+1).object_list
- except InvalidPage:
- result_list = ()
-
- self.result_count = result_count
- self.full_result_count = full_result_count
- self.result_list = result_list
- self.can_show_all = can_show_all
- self.multi_page = multi_page
- self.paginator = paginator
-
- def get_ordering(self):
- lookup_opts, params = self.lookup_opts, self.params
- # For ordering, first check the "ordering" parameter in the admin
- # options, then check the object's default ordering. If neither of
- # those exist, order descending by ID by default. Finally, look for
- # manually-specified ordering from the query string.
- ordering = lookup_opts.ordering or ['-' + lookup_opts.pk.name]
-
- if ordering[0].startswith('-'):
- order_field, order_type = ordering[0][1:], 'desc'
- else:
- order_field, order_type = ordering[0], 'asc'
- if ORDER_VAR in params:
- try:
- field_name = self.list_display[int(params[ORDER_VAR])]
- try:
- f = lookup_opts.get_field(field_name)
- except models.FieldDoesNotExist:
- # See whether field_name is a name of a non-field
- # that allows sorting.
- try:
- if callable(field_name):
- attr = field_name
- elif hasattr(self.model_admin, field_name):
- attr = getattr(self.model_admin, field_name)
- else:
- attr = getattr(self.model, field_name)
- order_field = attr.admin_order_field
- except AttributeError:
- pass
- else:
- order_field = f.name
- except (IndexError, ValueError):
- pass # Invalid ordering specified. Just use the default.
- if ORDER_TYPE_VAR in params and params[ORDER_TYPE_VAR] in ('asc', 'desc'):
- order_type = params[ORDER_TYPE_VAR]
- return order_field, order_type
-
- def get_query_set(self):
- qs = self.root_query_set
- lookup_params = self.params.copy() # a dictionary of the query string
- for i in (ALL_VAR, ORDER_VAR, ORDER_TYPE_VAR, SEARCH_VAR, IS_POPUP_VAR):
- if i in lookup_params:
- del lookup_params[i]
- for key, value in lookup_params.items():
- if not isinstance(key, str):
- # 'key' will be used as a keyword argument later, so Python
- # requires it to be a string.
- del lookup_params[key]
- lookup_params[smart_str(key)] = value
-
- # if key ends with __in, split parameter into separate values
- if key.endswith('__in'):
- lookup_params[key] = value.split(',')
-
- # Apply lookup parameters from the query string.
- try:
- qs = qs.filter(**lookup_params)
- # Naked except! Because we don't have any other way of validating "params".
- # They might be invalid if the keyword arguments are incorrect, or if the
- # values are not in the correct type, so we might get FieldError, ValueError,
- # ValicationError, or ? from a custom field that raises yet something else
- # when handed impossible data.
- except:
- raise IncorrectLookupParameters
-
- # Use select_related() if one of the list_display options is a field
- # with a relationship and the provided queryset doesn't already have
- # select_related defined.
- if not qs.query.select_related:
- if self.list_select_related:
- qs = qs.select_related()
- else:
- for field_name in self.list_display:
- try:
- f = self.lookup_opts.get_field(field_name)
- except models.FieldDoesNotExist:
- pass
- else:
- if isinstance(f.rel, models.ManyToOneRel):
- qs = qs.select_related()
- break
-
- # Set ordering.
- if self.order_field:
- qs = qs.order_by('%s%s' % ((self.order_type == 'desc' and '-' or ''), self.order_field))
-
- # Apply keyword searches.
- def construct_search(field_name):
- if field_name.startswith('^'):
- return "%s__istartswith" % field_name[1:]
- elif field_name.startswith('='):
- return "%s__iexact" % field_name[1:]
- elif field_name.startswith('@'):
- return "%s__search" % field_name[1:]
- else:
- return "%s__icontains" % field_name
-
- if self.search_fields and self.query:
- for bit in self.query.split():
- or_queries = [models.Q(**{construct_search(str(field_name)): bit}) for field_name in self.search_fields]
- qs = qs.filter(reduce(operator.or_, or_queries))
- for field_name in self.search_fields:
- if '__' in field_name:
- qs = qs.distinct()
- break
-
+ def get_query_set(self, request):
+ qs = super(HTSChangeList, self).get_query_set(request)
+ print qs
+ if self.extra_filters:
+ new_qs = qs.filter(**self.extra_filters)
+ if new_qs is not None:
+ qs = new_qs
return qs
-
- #def url_for_result(self, result):
- # return "%s/" % quote(getattr(result, self.pk_attname))
except ImportError, e:
import simplejson as json
-from django.contrib.csrf.middleware import csrf_exempt
+from django.views.decorators.csrf import csrf_exempt
from htsworkflow.frontend.auth import require_api_key
from htsworkflow.frontend.experiments.models import FlowCell, Lane, LANE_STATUS_MAP
-from htsworkflow.frontend.samples.changelist import ChangeList
+from htsworkflow.frontend.experiments.admin import LaneOptions
+from htsworkflow.frontend.samples.changelist import HTSChangeList
from htsworkflow.frontend.samples.models import Antibody, Library, Species, HTSUser
+from htsworkflow.frontend.samples.admin import LibraryOptions
from htsworkflow.frontend.samples.results import get_flowcell_result_dict
from htsworkflow.frontend.bcmagic.forms import BarcodeMagicForm
from htsworkflow.pipelines.runfolder import load_pipeline_run_xml
def library(request, todo_only=False):
queryset = Library.objects.filter(hidden__exact=0)
+ filters = {'hidden__exact': 0}
if todo_only:
- queryset = queryset.filter(lane=None)
+ filters[lane] = None
# build changelist
- fcl = ChangeList(request, Library,
+ fcl = HTSChangeList(request, Library,
list_filter=['affiliations', 'library_species'],
search_fields=['id', 'library_name', 'amplified_from_sample__id'],
list_per_page=200,
- queryset=queryset
+ model_admin=LibraryOptions(Library, None),
+ extra_filters=filters
)
context = { 'cl': fcl, 'title': 'Library Index', 'todo_only': todo_only}
if username is not None:
user = HTSUser.objects.get(username=username)
query.update({'library__affiliations__users__id':user.id})
- fcl = ChangeList(request, Lane,
+ fcl = HTSChangeList(request, Lane,
list_filter=[],
search_fields=['flowcell__flowcell_id', 'library__id', 'library__library_name'],
list_per_page=200,
+ model_admin=LaneOptions,
queryset=Lane.objects.filter(**query)
)
context.update(SAMPLES_CONTEXT_DEFAULTS)
return render_to_response('registration/profile.html', context,
context_instance=RequestContext(request))
-
-