9 from django.conf import settings
10 from django.core.exceptions import ObjectDoesNotExist
11 from django.core import urlresolvers
12 from django.db import models
13 from django.db.models.signals import post_init
15 from htsworkflow.frontend.samples.models import Library
16 from htsworkflow.util.conversion import parse_flowcell_id
17 from htsworkflow.pipelines import runfolder
19 LOGGER = logging.getLogger(__name__)
22 default_pM = int(settings.DEFAULT_PM)
24 LOGGER.error("invalid value for frontend.default_pm")
26 # how many days to wait before trying to re-import a runfolder
29 RESCAN_DELAY = int(settings.RESCAN_DELAY)
30 except (ValueError, AttributeError):
31 LOGGER.error("Missing or invalid settings.RESCAN_DELAY, "\
32 "defaulting to %s" % (RESCAN_DELAY,))
34 RUN_STATUS_CHOICES = (
35 (0, 'Sequencer running'), # Solexa Data Pipeline Not Yet Started'),
36 (1, 'Data Pipeline Started'),
37 (2, 'Data Pipeline Interrupted'),
38 (3, 'Data Pipeline Finished'),
39 (4, 'Collect Results Started'),
40 (5, 'Collect Results Finished'),
45 RUN_STATUS_REVERSE_MAP = dict(((v, k) for k, v in RUN_STATUS_CHOICES))
48 class ClusterStation(models.Model):
49 """List of cluster stations"""
50 name = models.CharField(max_length=50, unique=True)
51 isdefault = models.BooleanField(default=False, null=False)
54 ordering = ["-isdefault", "name"]
56 def __unicode__(self):
57 return unicode(self.name)
61 d = cls.objects.filter(isdefault=True).all()
64 d = cls.objects.order_by('-id').all()
70 class Sequencer(models.Model):
71 """Sequencers we've owned
73 name = models.CharField(max_length=50, db_index=True)
74 instrument_name = models.CharField(max_length=50, db_index=True)
75 serial_number = models.CharField(max_length=50, db_index=True)
76 model = models.CharField(max_length=255)
77 active = models.BooleanField(default=True, null=False)
78 isdefault = models.BooleanField(default=False, null=False)
79 comment = models.CharField(max_length=255)
82 ordering = ["-isdefault", "-active", "name"]
84 def __unicode__(self):
85 name = [unicode(self.name)]
86 if self.instrument_name is not None:
87 name.append("(%s)" % (unicode(self.instrument_name),))
91 def get_absolute_url(self):
92 return ('htsworkflow.frontend.experiments.views.sequencer',
97 d = cls.objects.filter(isdefault=True).all()
100 d = cls.objects.order_by('active', '-id').all()
106 class FlowCell(models.Model):
107 flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
108 run_date = models.DateTimeField()
109 advanced_run = models.BooleanField(default=False)
110 paired_end = models.BooleanField(default=False)
111 read_length = models.IntegerField(default=32) # Stanford is currenlty 25
112 control_lane = models.IntegerField(choices=[(1, 1),
124 cluster_station = models.ForeignKey(ClusterStation, default=ClusterStation.default)
125 sequencer = models.ForeignKey(Sequencer, default=Sequencer.default)
127 notes = models.TextField(blank=True)
129 def __unicode__(self):
130 return unicode(self.flowcell_id)
134 for lane in self.lane_set.order_by('lane_number'):
135 cluster_estimate = lane.cluster_estimate
136 if cluster_estimate is not None:
137 cluster_estimate = "%s k" % ((int(cluster_estimate) / 1000), )
139 cluster_estimate = 'None'
140 library_id = lane.library_id
141 library = lane.library
142 element = '<tr><td>%d</td>'\
143 '<td><a href="%s">%s</a></td><td>%s</td></tr>'
144 html.append(element % (lane.lane_number,
145 library.get_admin_url(),
148 html.append('</table>')
149 return "\n".join(html)
150 Lanes.allow_tags = True
153 ordering = ["-run_date"]
155 def get_admin_url(self):
156 # that's the django way... except it didn't work
157 return urlresolvers.reverse('admin:experiments_flowcell_change',
160 def flowcell_type(self):
161 """Convert our boolean 'is paired' flag to a name
169 def get_absolute_url(self):
170 flowcell_id, status = parse_flowcell_id(self.flowcell_id)
171 return ('htsworkflow.frontend.experiments.views.flowcell_detail',
174 def get_raw_data_directory(self):
175 """Return location of where the raw data is stored"""
176 flowcell_id, status = parse_flowcell_id(self.flowcell_id)
178 return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
180 def update_data_runs(self):
181 result_root = self.get_raw_data_directory()
182 LOGGER.debug("Update data runs flowcell root: %s" % (result_root,))
183 if result_root is None:
186 result_home_dir = os.path.join(settings.RESULT_HOME_DIR, '')
187 run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
189 dataruns = dict([(x.result_dir, x) for x in self.datarun_set.all()])
192 for dirpath, dirnames, filenames in os.walk(result_root):
193 for filename in filenames:
194 if run_xml_re.match(filename):
195 # we have a run directory
196 relative_pathname = get_relative_pathname(dirpath)
197 cached_run = dataruns.get(relative_pathname, None)
198 now = datetime.datetime.now()
199 if (cached_run is None):
200 self.import_data_run(relative_pathname, filename)
201 elif (now - cached_run.last_update_time).days > \
203 self.import_data_run(relative_pathname,
204 filename, cached_run)
206 def import_data_run(self, relative_pathname, run_xml_name, run=None):
207 """Given a result directory import files"""
208 run_dir = get_absolute_pathname(relative_pathname)
209 run_xml_path = os.path.join(run_dir, run_xml_name)
210 run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
211 LOGGER.debug("Importing run from %s" % (relative_pathname,))
216 run.status = RUN_STATUS_REVERSE_MAP['DONE']
217 run.result_dir = relative_pathname
218 run.runfolder_name = run_xml_data.runfolder_name
219 run.cycle_start = run_xml_data.image_analysis.start
220 run.cycle_stop = run_xml_data.image_analysis.stop
221 run.run_start_time = run_xml_data.image_analysis.date
222 run.image_software = run_xml_data.image_analysis.software
223 run.image_version = run_xml_data.image_analysis.version
224 run.basecall_software = run_xml_data.bustard.software
225 run.basecall_version = run_xml_data.bustard.version
226 run.alignment_software = run_xml_data.gerald.software
227 run.alignment_version = run_xml_data.gerald.version
229 run.last_update_time = datetime.datetime.now()
232 run.update_result_files()
235 # FIXME: should we automatically update dataruns?
236 # Or should we expect someone to call update_data_runs?
237 #def update_flowcell_dataruns(sender, instance, *args, **kwargs):
238 # """Update our dataruns
240 # if not os.path.exists(settings.RESULT_HOME_DIR):
243 # instance.update_data_runs()
244 #post_init.connect(update_flowcell_dataruns, sender=FlowCell)
247 LANE_STATUS_CODES = [(0, 'Failed'),
250 LANE_STATUS_MAP = dict((int(k), v) for k, v in LANE_STATUS_CODES)
251 LANE_STATUS_MAP[None] = "Unknown"
254 def is_valid_lane(value):
255 if value >= 1 and value <= 8:
261 class Lane(models.Model):
262 flowcell = models.ForeignKey(FlowCell)
263 lane_number = models.IntegerField()
264 library = models.ForeignKey(Library)
265 pM = models.DecimalField(max_digits=5,
270 cluster_estimate = models.IntegerField(blank=True, null=True)
271 status = models.IntegerField(choices=LANE_STATUS_CODES,
274 comment = models.TextField(null=True, blank=True)
277 def get_absolute_url(self):
278 return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
281 def __unicode__(self):
282 return self.flowcell.flowcell_id + ':' + unicode(self.lane_number)
285 class DataRun(models.Model):
286 flowcell = models.ForeignKey(FlowCell, verbose_name="Flowcell Id")
287 runfolder_name = models.CharField(max_length=50)
288 result_dir = models.CharField(max_length=255)
289 last_update_time = models.DateTimeField()
290 run_start_time = models.DateTimeField()
291 cycle_start = models.IntegerField(null=True, blank=True)
292 cycle_stop = models.IntegerField(null=True, blank=True)
293 run_status = models.IntegerField(choices=RUN_STATUS_CHOICES,
294 null=True, blank=True)
295 image_software = models.CharField(max_length=50)
296 image_version = models.CharField(max_length=50)
297 basecall_software = models.CharField(max_length=50)
298 basecall_version = models.CharField(max_length=50)
299 alignment_software = models.CharField(max_length=50)
300 alignment_version = models.CharField(max_length=50)
301 comment = models.TextField(blank=True)
303 def update_result_files(self):
304 abs_result_dir = get_absolute_pathname(self.result_dir)
306 for dirname, dirnames, filenames in os.walk(abs_result_dir):
307 for filename in filenames:
308 pathname = os.path.join(dirname, filename)
309 relative_pathname = get_relative_pathname(pathname)
310 datafiles = self.datafile_set.filter(
312 relative_pathname=relative_pathname)
313 if len(datafiles) > 0:
316 metadata = find_file_type_metadata_from_filename(filename)
317 if metadata is not None:
318 metadata['filename'] = filename
320 newfile.data_run = self
321 newfile.file_type = metadata['file_type']
322 newfile.relative_pathname = relative_pathname
324 lane_number = metadata.get('lane', None)
325 if lane_number is not None:
326 lane = self.flowcell.lane_set.get(
327 lane_number=lane_number)
328 newfile.library = lane.library
330 self.datafile_set.add(newfile)
332 self.last_update_time = datetime.datetime.now()
334 def lane_files(self):
337 for datafile in self.datafile_set.all():
338 metadata = datafile.attributes
339 if metadata is not None:
340 lane = metadata.get('lane', None)
342 lane_file_set = lanes.setdefault(lane, {})
343 normalized_name = datafile.file_type.normalized_name
344 lane_file_set[normalized_name] = datafile
347 def ivc_plots(self, lane):
348 ivc_name = ['IVC All', 'IVC Call',
349 'IVC Percent Base', 'IVC Percent All', 'IVC Percent Call']
352 for rel_filename, metadata in self.get_result_files():
353 if metadata.file_type.name in ivc_name:
354 plots[metadata.file_type.name] = (rel_filename, metadata)
357 class FileType(models.Model):
358 """Represent potential file types
360 regex is a pattern used to detect if a filename matches this type
361 data run currently assumes that there may be a (?P<lane>) and
362 (?P<end>) pattern in the regular expression.
364 name = models.CharField(max_length=50)
365 mimetype = models.CharField(max_length=50, null=True, blank=True)
366 # regular expression from glob.fnmatch.translate
367 regex = models.CharField(max_length=50, null=True, blank=True)
369 def parse_filename(self, pathname):
370 """Does filename match our pattern?
372 Returns None if not, or dictionary of match variables if we do.
374 path, filename = os.path.split(pathname)
375 if len(self.regex) > 0:
376 match = re.match(self.regex, filename)
377 if match is not None:
378 # These are (?P<>) names we know about from our
380 results = match.groupdict()
382 # convert int parameters
383 for attribute_name in ['lane', 'end']:
384 value = results.get(attribute_name, None)
385 if value is not None:
386 results[attribute_name] = int(value)
390 def _get_normalized_name(self):
391 """Crush data file name into identifier friendly name"""
392 return self.name.replace(' ', '_').lower()
393 normalized_name = property(_get_normalized_name)
395 def __unicode__(self):
396 #return u"<FileType: %s>" % (self.name,)
401 """Helper function to set default UUID in DataFile"""
402 return str(uuid.uuid1())
405 class DataFile(models.Model):
406 """Store map from random ID to filename"""
407 random_key = models.CharField(max_length=64,
410 data_run = models.ForeignKey(DataRun, db_index=True)
411 library = models.ForeignKey(Library, db_index=True, null=True, blank=True)
412 file_type = models.ForeignKey(FileType)
413 relative_pathname = models.CharField(max_length=255, db_index=True)
415 def _get_attributes(self):
416 return self.file_type.parse_filename(self.relative_pathname)
417 attributes = property(_get_attributes)
419 def _get_pathname(self):
420 return get_absolute_pathname(self.relative_pathname)
421 pathname = property(_get_pathname)
424 def get_absolute_url(self):
425 return ('htsworkflow.frontend.experiments.views.read_result_file',
426 (), {'key': self.random_key})
429 def find_file_type_metadata_from_filename(pathname):
430 path, filename = os.path.split(pathname)
432 for file_type in FileType.objects.all():
433 result = file_type.parse_filename(filename)
434 if result is not None:
435 result['file_type'] = file_type
441 def get_relative_pathname(abspath):
442 """Strip off the result home directory from a path
444 result_home_dir = os.path.join(settings.RESULT_HOME_DIR, '')
445 relative_pathname = abspath.replace(result_home_dir, '')
446 return relative_pathname
449 def get_absolute_pathname(relative_pathname):
450 """Attach relative path to results home directory"""
451 return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)