+import datetime
+import glob
import logging
+import os
+import re
+import types
+import uuid
+from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.core import urlresolvers
+from django.utils import timezone
from django.db import models
+from django.db.models.signals import post_init, pre_save
+
+from htsworkflow.frontend.samples.models import Library
+from htsworkflow.util.conversion import parse_flowcell_id
+from htsworkflow.pipelines import runfolder
+
+import pytz
+
+LOGGER = logging.getLogger(__name__)
+default_pM = 5
+try:
+ default_pM = int(settings.DEFAULT_PM)
+except AttributeError, e:
+ LOGGER.error("invalid value for frontend.default_pm")
+
+# how many days to wait before trying to re-import a runfolder
+RESCAN_DELAY = 1
+try:
+ RESCAN_DELAY = int(settings.RESCAN_DELAY)
+except (ValueError, AttributeError):
+ LOGGER.error("Missing or invalid settings.RESCAN_DELAY, "\
+ "defaulting to %s" % (RESCAN_DELAY,))
+
+RUN_STATUS_CHOICES = (
+ (0, 'Sequencer running'), # Solexa Data Pipeline Not Yet Started'),
+ (1, 'Data Pipeline Started'),
+ (2, 'Data Pipeline Interrupted'),
+ (3, 'Data Pipeline Finished'),
+ (4, 'Collect Results Started'),
+ (5, 'Collect Results Finished'),
+ (6, 'QC Started'),
+ (7, 'QC Finished'),
+ (255, 'DONE'),
+ )
+RUN_STATUS_REVERSE_MAP = dict(((v, k) for k, v in RUN_STATUS_CHOICES))
-from htsworkflow.frontend.samples.models import *
-from htsworkflow.frontend.settings import options
class ClusterStation(models.Model):
- name = models.CharField(max_length=50, unique=True)
+ """List of cluster stations"""
+ name = models.CharField(max_length=50, unique=True)
+ isdefault = models.BooleanField(default=False, null=False)
+
+ class Meta:
+ ordering = ["-isdefault", "name"]
+
+ def __unicode__(self):
+ return unicode(self.name)
+
+ @classmethod
+ def default(cls):
+ d = cls.objects.filter(isdefault=True).all()
+ if len(d) > 0:
+ return d[0]
+ d = cls.objects.order_by('-id').all()
+ if len(d) > 0:
+ return d[0]
+ return None
- def __unicode__(self):
- return unicode(self.name)
+ @staticmethod
+ def update_isdefault(sender, instance, **kwargs):
+ """Clear default if needed
+ """
+ if instance.isdefault:
+ for c in ClusterStation.objects.filter(isdefault=True).all():
+ if c.id != instance.id:
+ c.isdefault = False
+ c.save()
+
+pre_save.connect(ClusterStation.update_isdefault, sender=ClusterStation)
class Sequencer(models.Model):
- name = models.CharField(max_length=50, unique=True)
+ """Sequencers we've owned
+ """
+ name = models.CharField(max_length=50, db_index=True)
+ instrument_name = models.CharField(max_length=50, db_index=True)
+ serial_number = models.CharField(max_length=50, db_index=True)
+ model = models.CharField(max_length=255)
+ active = models.BooleanField(default=True, null=False)
+ isdefault = models.BooleanField(default=False, null=False)
+ comment = models.CharField(max_length=255)
- def __unicode__(self):
- return unicode(self.name)
+ class Meta:
+ ordering = ["-isdefault", "-active", "name"]
+
+ def __unicode__(self):
+ name = [unicode(self.name)]
+ if self.instrument_name is not None:
+ name.append("(%s)" % (unicode(self.instrument_name),))
+ return " ".join(name)
+
+ @models.permalink
+ def get_absolute_url(self):
+ return ('htsworkflow.frontend.experiments.views.sequencer',
+ [self.id])
+
+ @classmethod
+ def default(cls):
+ d = cls.objects.filter(isdefault=True).all()
+ if len(d) > 0:
+ return d[0]
+ d = cls.objects.order_by('active', '-id').all()
+ if len(d) > 0:
+ return d[0]
+ return None
+
+ @staticmethod
+ def update_isdefault(sender, instance, **kwargs):
+ """Clear default if needed
+ """
+ if instance.isdefault:
+ for s in Sequencer.objects.filter(isdefault=True).all():
+ if s.id != instance.id:
+ s.isdefault = False
+ s.save()
+
+pre_save.connect(Sequencer.update_isdefault, sender=Sequencer)
-default_pM = 5
-try:
- default_pM = int(options.get('frontend', 'default_pm'))
-except ValueError,e:
- logging.error("invalid value for frontend.default_pm")
class FlowCell(models.Model):
-
- flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
- run_date = models.DateTimeField()
- advanced_run = models.BooleanField(default=False)
- paired_end = models.BooleanField(default=False)
- read_length = models.IntegerField(default=32) #Stanford is currenlty 25
- control_lane = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8)], null=True)
-
- cluster_station = models.ForeignKey(ClusterStation, default=3)
- sequencer = models.ForeignKey(Sequencer, default=1)
-
- notes = models.TextField(blank=True)
-
- def __unicode__(self):
- return unicode(self.flowcell_id)
-
- def Create_LOG(self):
- str = ''
- str +='<a target=_balnk href="/experiments/'+self.flowcell_id+'" title="Create XLS like sheet for this Flowcell ..." ">Create LOG</a>'
- try:
- t = DataRun.objects.get(fcid=self.id)
- str +='<br/><a target=_self href="/admin/experiments/datarun/?q='+self.flowcell_id+'" title="Check Data Runs ..." ">DataRun ..</a>'
- except ObjectDoesNotExist:
- str += '<br/><span style="color:red">not sequenced</span>'
- return str
- Create_LOG.allow_tags = True
-
- def Lanes(self):
- library_url = '/admin/samples/library/%s'
- html = ['<table>']
- #for i in range(1,9):
- for lane in self.lane_set.all():
- cluster_estimate = lane.cluster_estimate
- if cluster_estimate is not None:
- cluster_estimate = "%s k" % ((int(cluster_estimate)/1000), )
+ flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
+ run_date = models.DateTimeField()
+ advanced_run = models.BooleanField(default=False)
+ paired_end = models.BooleanField(default=False)
+ read_length = models.IntegerField(default=32) # Stanford is currenlty 25
+ control_lane = models.IntegerField(choices=[(1, 1),
+ (2, 2),
+ (3, 3),
+ (4, 4),
+ (5, 5),
+ (6, 6),
+ (7, 7),
+ (8, 8),
+ (0, 'All Lanes')],
+ null=True,
+ blank=True)
+
+ cluster_station = models.ForeignKey(ClusterStation, default=ClusterStation.default)
+ sequencer = models.ForeignKey(Sequencer, default=Sequencer.default)
+
+ notes = models.TextField(blank=True)
+
+ def __unicode__(self):
+ return unicode(self.flowcell_id)
+
+ def Lanes(self):
+ html = ['<table>']
+ for lane in self.lane_set.order_by('lane_number'):
+ cluster_estimate = lane.cluster_estimate
+ if cluster_estimate is not None:
+ cluster_estimate = "%s k" % ((int(cluster_estimate) / 1000), )
+ else:
+ cluster_estimate = 'None'
+ library_id = lane.library_id
+ library = lane.library
+ element = '<tr><td>%d</td>'\
+ '<td><a href="%s">%s</a></td><td>%s</td></tr>'
+ html.append(element % (lane.lane_number,
+ library.get_admin_url(),
+ library,
+ cluster_estimate))
+ html.append('</table>')
+ return "\n".join(html)
+ Lanes.allow_tags = True
+
+ class Meta:
+ ordering = ["-run_date"]
+
+ def get_admin_url(self):
+ # that's the django way... except it didn't work
+ return urlresolvers.reverse('admin:experiments_flowcell_change',
+ args=(self.id,))
+
+ def flowcell_type(self):
+ """Convert our boolean 'is paired' flag to a name
+ """
+ if self.paired_end:
+ return u"Paired"
else:
- cluster_estimate = 'None'
- library_id = lane.library_id
- library = lane.library
- element = '<tr><td>%d</td><td><a href="%s">%s</a></td><td>%s</td></tr>'
- expanded_library_url = library_url %(library_id,)
- html.append(element % (lane.lane_number, expanded_library_url, library, cluster_estimate))
- html.append('</table>')
- return "\n".join(html)
- Lanes.allow_tags = True
-
- class Meta:
- ordering = ["-run_date"]
-
- def get_admin_url(self):
- # that's the django way... except it didn't work
- #return urlresolvers.reverse('admin_experiments_FlowCell_change', args=(self.id,))
- return '/admin/experiments/flowcell/%s/' % (self.id,)
-
-### -----------------------
-class DataRun(models.Model):
- ConfTemplate = "CONFIG PARAMS WILL BE GENERATED BY THE PIPELINE SCRIPT.\nYOU'LL BE ABLE TO EDIT AFTER IF NEEDED."
- run_folder = models.CharField(max_length=50,unique=True, db_index=True)
- fcid = models.ForeignKey(FlowCell,verbose_name="Flowcell Id")
- config_params = models.TextField(default=ConfTemplate)
- run_start_time = models.DateTimeField()
- RUN_STATUS_CHOICES = (
- (0, 'Sequencer running'), ##Solexa Data Pipeline Not Yet Started'),
- (1, 'Data Pipeline Started'),
- (2, 'Data Pipeline Interrupted'),
- (3, 'Data Pipeline Finished'),
- (4, 'CollectReads Started'),
- (5, 'CollectReads Finished'),
- (6, 'QC Finished'),
- (7, 'DONE'),
- )
- run_status = models.IntegerField(choices=RUN_STATUS_CHOICES, default=0)
- run_note = models.TextField(blank=True)
-
-
- def main_status(self):
- str = '<div'
- if self.run_status >= 5:
- str += ' style="color:green">'
- str += '<b>'+self.RUN_STATUS_CHOICES[self.run_status][1]+'</b>'
- str += '<br/><br/>' #<span style="color:red;font-size:80%;">New!</span>'
- str +='<br/><a target=_balnk href="'+settings.TASKS_PROJS_SERVER+'/Flowcells/'+self.fcid.flowcell_id+'/'+self.fcid.flowcell_id+'_QC_Summary.html" title="View QC Summaries of this run ..." ">View QC Page</a>'
+ return u"Single"
+
+ @models.permalink
+ def get_absolute_url(self):
+ flowcell_id, status = parse_flowcell_id(self.flowcell_id)
+ return ('htsworkflow.frontend.experiments.views.flowcell_detail',
+ [str(flowcell_id)])
+
+ def get_raw_data_directory(self):
+ """Return location of where the raw data is stored"""
+ flowcell_id, status = parse_flowcell_id(self.flowcell_id)
+
+ return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
+
+ def update_data_runs(self):
+ result_root = self.get_raw_data_directory()
+ LOGGER.debug("Update data runs flowcell root: %s" % (result_root,))
+ if result_root is None:
+ return
+
+ result_home_dir = os.path.join(settings.RESULT_HOME_DIR, '')
+ run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
+
+ result_dirs = []
+ for dirpath, dirnames, filenames in os.walk(result_root):
+ for filename in filenames:
+ if run_xml_re.match(filename):
+ # we have a run directory
+ relative_pathname = get_relative_pathname(dirpath)
+ self.import_data_run(relative_pathname, filename)
+
+ def import_data_run(self, relative_pathname, run_xml_name, force=False):
+ """Given a result directory import files"""
+ now = timezone.now()
+ run_dir = get_absolute_pathname(relative_pathname)
+ run_xml_path = os.path.join(run_dir, run_xml_name)
+
+ runs = DataRun.objects.filter(result_dir = relative_pathname)
+ if len(runs) == 0:
+ run = DataRun()
+ created = True
+ elif len(runs) > 1:
+ raise RuntimeError("Too many data runs for %s" % (
+ relative_pathname,))
+ else:
+ run = runs[0]
+ created = False
+
+ if created or force or (now-run.last_update_time).days > RESCAN_DELAY:
+ LOGGER.debug("Importing run from %s" % (relative_pathname,))
+ run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
+ run.flowcell = self
+ run.status = RUN_STATUS_REVERSE_MAP['DONE']
+ run.result_dir = relative_pathname
+ run.runfolder_name = run_xml_data.runfolder_name
+ run.cycle_start = run_xml_data.image_analysis.start
+ run.cycle_stop = run_xml_data.image_analysis.stop
+ naive_run_start_time = datetime.datetime.fromordinal(run_xml_data.image_analysis.date.toordinal())
+ run.run_start_time = pytz.timezone(settings.TIME_ZONE).localize(naive_run_start_time)
+ run.image_software = run_xml_data.image_analysis.software
+ run.image_version = run_xml_data.image_analysis.version
+ run.basecall_software = run_xml_data.bustard.software
+ run.basecall_version = run_xml_data.bustard.version
+ # we're frequently not running alignments
+ if run_xml_data.gerald:
+ run.alignment_software = run_xml_data.gerald.software
+ run.alignment_version = run_xml_data.gerald.version
+
+ run.last_update_time = timezone.now()
+ run.save()
+
+ run.update_result_files()
+
+
+# FIXME: should we automatically update dataruns?
+# Or should we expect someone to call update_data_runs?
+#def update_flowcell_dataruns(sender, instance, *args, **kwargs):
+# """Update our dataruns
+# """
+# if not os.path.exists(settings.RESULT_HOME_DIR):
+# return
+#
+# instance.update_data_runs()
+#post_init.connect(update_flowcell_dataruns, sender=FlowCell)
+
+
+LANE_STATUS_CODES = [(0, 'Failed'),
+ (1, 'Marginal'),
+ (2, 'Good'), ]
+LANE_STATUS_MAP = dict((int(k), v) for k, v in LANE_STATUS_CODES)
+LANE_STATUS_MAP[None] = "Unknown"
+
+
+def is_valid_lane(value):
+ if value >= 1 and value <= 8:
+ return True
else:
- str += '>'+self.RUN_STATUS_CHOICES[self.run_status][1]
-
- str += '</div>'
- return str
- main_status.allow_tags = True
-
- main_status.allow_tags = True
-
- def Flowcell_Info(self):
- str = '<b>'+self.fcid.__str__()+'</b>'
- str += ' (c: '+self.fcid.cluster_mac_id+', s: '+self.fcid.seq_mac_id+')'
- str += '<div style="margin-top:5px;">'
- str +='<a title="View Lane List here ..." onClick="el = document.getElementById(\'LanesOf'+self.fcid.__str__()+'\');if(el) (el.style.display==\'none\'?el.style.display=\'block\':el.style.display=\'none\')" style="cursor:pointer;color: #5b80b2;">View/hide lanes</a>'
- str += '<div id="LanesOf'+self.fcid.__str__()+'" style="display:block;border:solid #cccccc 1px;width:350px">'
- LanesList = '1: '+self.fcid.lane_1_library.__str__()+' ('+self.fcid.lane_1_library.library_species.use_genome_build+')<br/>2: '+self.fcid.lane_2_library.__str__()+' ('+self.fcid.lane_2_library.library_species.use_genome_build+')<br/>3: '+self.fcid.lane_3_library.__str__()+' ('+self.fcid.lane_3_library.library_species.use_genome_build+')<br/>4: '+self.fcid.lane_4_library.__str__()+' ('+self.fcid.lane_4_library.library_species.use_genome_build+')<br/>5: '+self.fcid.lane_5_library.__str__()+' ('+self.fcid.lane_5_library.library_species.use_genome_build+')<br/>6: '+self.fcid.lane_6_library.__str__()+' ('+self.fcid.lane_6_library.library_species.use_genome_build+')<br/>7: '+self.fcid.lane_7_library.__str__()+' ('+self.fcid.lane_7_library.library_species.use_genome_build+')<br/>8: '+self.fcid.lane_8_library.__str__()+' ('+self.fcid.lane_8_library.library_species.use_genome_build+')'
- str += LanesList ## self.fcid.Lanes()
- str += '</div>'
- str += '<div><a title="open Flowcell record" href="/admin/exp_track/flowcell/'+self.fcid.id.__str__()+'/" target=_self>Edit Flowcell record</a>'
- #str += '<span style="color:red;font-size:80%;margin-left:15px;margin-right:3px">New!</span>'
- str +='<a style="margin-left:15px;" target=_balnk href="/exp_track/'+self.fcid.flowcell_id+'" title="View XLS like sheet for this Flowcell LOG ..." ">GA LOG Page</a>'
- str += '</div>'
- str += '</div>'
- return str
- Flowcell_Info.allow_tags = True
+ return False
class Lane(models.Model):
- flowcell = models.ForeignKey(FlowCell)
- lane_number = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8)])
- library = models.ForeignKey(Library)
- pM = models.DecimalField(max_digits=5, decimal_places=2,blank=False, null=False,default=default_pM)
- cluster_estimate = models.IntegerField(blank=True, null=True)
- comment = models.TextField(null=True, blank=True)
+ flowcell = models.ForeignKey(FlowCell)
+ lane_number = models.IntegerField()
+ library = models.ForeignKey(Library)
+ pM = models.DecimalField(max_digits=5,
+ decimal_places=2,
+ blank=False,
+ null=False,
+ default=default_pM)
+ cluster_estimate = models.IntegerField(blank=True, null=True)
+ status = models.IntegerField(choices=LANE_STATUS_CODES,
+ null=True,
+ blank=True)
+ comment = models.TextField(null=True, blank=True)
+
+ @models.permalink
+ def get_absolute_url(self):
+ return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
+ [str(self.id)])
+
+ def __unicode__(self):
+ return self.flowcell.flowcell_id + ':' + unicode(self.lane_number)
+
+
+class DataRun(models.Model):
+ flowcell = models.ForeignKey(FlowCell, verbose_name="Flowcell Id")
+ runfolder_name = models.CharField(max_length=50)
+ result_dir = models.CharField(max_length=255)
+ last_update_time = models.DateTimeField()
+ run_start_time = models.DateTimeField()
+ cycle_start = models.IntegerField(null=True, blank=True)
+ cycle_stop = models.IntegerField(null=True, blank=True)
+ run_status = models.IntegerField(choices=RUN_STATUS_CHOICES,
+ null=True, blank=True)
+ image_software = models.CharField(max_length=50)
+ image_version = models.CharField(max_length=50)
+ basecall_software = models.CharField(max_length=50)
+ basecall_version = models.CharField(max_length=50)
+ alignment_software = models.CharField(max_length=50)
+ alignment_version = models.CharField(max_length=50)
+ comment = models.TextField(blank=True)
+
+ def update_result_files(self):
+ abs_result_dir = get_absolute_pathname(self.result_dir)
+
+ for dirname, dirnames, filenames in os.walk(abs_result_dir):
+ for filename in filenames:
+ pathname = os.path.join(dirname, filename)
+ relative_pathname = get_relative_pathname(pathname)
+ datafiles = self.datafile_set.filter(
+ data_run=self,
+ relative_pathname=relative_pathname)
+ if len(datafiles) > 0:
+ continue
+
+ metadata = find_file_type_metadata_from_filename(filename)
+ if metadata is not None:
+ metadata['filename'] = filename
+ newfile = DataFile()
+ newfile.data_run = self
+ newfile.file_type = metadata['file_type']
+ newfile.relative_pathname = relative_pathname
+
+ lane_number = metadata.get('lane', None)
+ if lane_number is not None:
+ lane = self.flowcell.lane_set.get(
+ lane_number=lane_number)
+ newfile.library = lane.library
+
+ self.datafile_set.add(newfile)
+
+ self.last_update_time = timezone.now()
+
+ def lane_files(self):
+ lanes = {}
+
+ for datafile in self.datafile_set.all():
+ metadata = datafile.attributes
+ if metadata is not None:
+ lane = metadata.get('lane', None)
+ if lane is not None:
+ lane_file_set = lanes.setdefault(lane, {})
+ normalized_name = datafile.file_type.normalized_name
+ lane_file_set[normalized_name] = datafile
+ return lanes
+
+ def ivc_plots(self, lane):
+ ivc_name = ['IVC All', 'IVC Call',
+ 'IVC Percent Base', 'IVC Percent All', 'IVC Percent Call']
+
+ plots = {}
+ for rel_filename, metadata in self.get_result_files():
+ if metadata.file_type.name in ivc_name:
+ plots[metadata.file_type.name] = (rel_filename, metadata)
+
+
+class FileType(models.Model):
+ """Represent potential file types
+
+ regex is a pattern used to detect if a filename matches this type
+ data run currently assumes that there may be a (?P<lane>) and
+ (?P<end>) pattern in the regular expression.
+ """
+ name = models.CharField(max_length=50)
+ mimetype = models.CharField(max_length=50, null=True, blank=True)
+ # regular expression from glob.fnmatch.translate
+ regex = models.TextField(null=True, blank=True)
+
+ def parse_filename(self, pathname):
+ """Does filename match our pattern?
+
+ Returns None if not, or dictionary of match variables if we do.
+ """
+ path, filename = os.path.split(pathname)
+ if len(self.regex) > 0:
+ match = re.match(self.regex, filename)
+ if match is not None:
+ # These are (?P<>) names we know about from our
+ # default regexes.
+ results = match.groupdict()
+
+ # convert int parameters
+ for attribute_name in ['lane', 'end']:
+ value = results.get(attribute_name, None)
+ if value is not None:
+ results[attribute_name] = int(value)
+
+ return results
+
+ def _get_normalized_name(self):
+ """Crush data file name into identifier friendly name"""
+ return self.name.replace(' ', '_').lower()
+ normalized_name = property(_get_normalized_name)
+
+ def __unicode__(self):
+ #return u"<FileType: %s>" % (self.name,)
+ return self.name
+
+
+def str_uuid():
+ """Helper function to set default UUID in DataFile"""
+ return str(uuid.uuid1())
+
+
+class DataFile(models.Model):
+ """Store map from random ID to filename"""
+ random_key = models.CharField(max_length=64,
+ db_index=True,
+ default=str_uuid)
+ data_run = models.ForeignKey(DataRun, db_index=True)
+ library = models.ForeignKey(Library, db_index=True, null=True, blank=True)
+ file_type = models.ForeignKey(FileType)
+ relative_pathname = models.CharField(max_length=255, db_index=True)
+
+ def _get_attributes(self):
+ return self.file_type.parse_filename(self.relative_pathname)
+ attributes = property(_get_attributes)
+
+ def _get_pathname(self):
+ return get_absolute_pathname(self.relative_pathname)
+ pathname = property(_get_pathname)
+
+ @models.permalink
+ def get_absolute_url(self):
+ return ('htsworkflow.frontend.experiments.views.read_result_file',
+ (), {'key': self.random_key})
+
+
+def find_file_type_metadata_from_filename(pathname):
+ path, filename = os.path.split(pathname)
+ result = None
+ for file_type in FileType.objects.all():
+ result = file_type.parse_filename(filename)
+ if result is not None:
+ result['file_type'] = file_type
+ return result
+
+ return None
+
+
+def get_relative_pathname(abspath):
+ """Strip off the result home directory from a path
+ """
+ result_home_dir = os.path.join(settings.RESULT_HOME_DIR, '')
+ relative_pathname = abspath.replace(result_home_dir, '')
+ return relative_pathname
+
+
+def get_absolute_pathname(relative_pathname):
+ """Attach relative path to results home directory"""
+ return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)