reformatting to be more pep8 like
[htsworkflow.git] / htsworkflow / frontend / experiments / models.py
1 import datetime
2 import glob
3 import logging
4 import os
5 import re
6 import types
7 import uuid
8
9 from django.conf import settings
10 from django.core.exceptions import ObjectDoesNotExist
11 from django.core import urlresolvers
12 from django.db import models
13 from django.db.models.signals import post_init
14
15 from htsworkflow.frontend.samples.models import Library
16 from htsworkflow.util.conversion import parse_flowcell_id
17 from htsworkflow.pipelines import runfolder
18
19 LOGGER = logging.getLogger(__name__)
20 default_pM = 5
21 try:
22     default_pM = int(settings.DEFAULT_PM)
23 except ValueError, e:
24     LOGGER.error("invalid value for frontend.default_pm")
25
26 # how many days to wait before trying to re-import a runfolder
27 RESCAN_DELAY = 1
28 try:
29     RESCAN_DELAY = int(settings.RESCAN_DELAY)
30 except (ValueError, AttributeError):
31     LOGGER.error("Missing or invalid settings.RESCAN_DELAY, "\
32                  "defaulting to %s" % (RESCAN_DELAY,))
33
34 RUN_STATUS_CHOICES = (
35     (0, 'Sequencer running'),  # Solexa Data Pipeline Not Yet Started'),
36     (1, 'Data Pipeline Started'),
37     (2, 'Data Pipeline Interrupted'),
38     (3, 'Data Pipeline Finished'),
39     (4, 'Collect Results Started'),
40     (5, 'Collect Results Finished'),
41     (6, 'QC Started'),
42     (7, 'QC Finished'),
43     (255, 'DONE'),
44   )
45 RUN_STATUS_REVERSE_MAP = dict(((v, k) for k, v in RUN_STATUS_CHOICES))
46
47
48 class ClusterStation(models.Model):
49     """List of cluster stations"""
50     name = models.CharField(max_length=50, unique=True)
51
52     def __unicode__(self):
53         return unicode(self.name)
54
55
56 class Sequencer(models.Model):
57     """Sequencers we've owned
58     """
59     name = models.CharField(max_length=50, db_index=True)
60     instrument_name = models.CharField(max_length=50, db_index=True)
61     serial_number = models.CharField(max_length=50, db_index=True)
62     model = models.CharField(max_length=255)
63     active = models.BooleanField(default=True, null=False)
64     comment = models.CharField(max_length=255)
65
66     class Meta:
67         ordering = ["-active", "name"]
68
69     def __unicode__(self):
70         name = [unicode(self.name)]
71         if self.instrument_name is not None:
72             name.append("(%s)" % (unicode(self.instrument_name),))
73         return " ".join(name)
74
75     @models.permalink
76     def get_absolute_url(self):
77         return ('htsworkflow.frontend.experiments.views.sequencer',
78                 [self.id])
79
80
81 class FlowCell(models.Model):
82     flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
83     run_date = models.DateTimeField()
84     advanced_run = models.BooleanField(default=False)
85     paired_end = models.BooleanField(default=False)
86     read_length = models.IntegerField(default=32)  # Stanford is currenlty 25
87     control_lane = models.IntegerField(choices=[(1, 1),
88                                                 (2, 2),
89                                                 (3, 3),
90                                                 (4, 4),
91                                                 (5, 5),
92                                                 (6, 6),
93                                                 (7, 7),
94                                                 (8, 8),
95                                                 (0, 'All Lanes')],
96                                        null=True,
97                                        blank=True)
98
99     cluster_station = models.ForeignKey(ClusterStation, default=3)
100     sequencer = models.ForeignKey(Sequencer, default=1)
101
102     notes = models.TextField(blank=True)
103
104     def __unicode__(self):
105         return unicode(self.flowcell_id)
106
107     def Lanes(self):
108         html = ['<table>']
109         for lane in self.lane_set.order_by('lane_number'):
110             cluster_estimate = lane.cluster_estimate
111             if cluster_estimate is not None:
112                 cluster_estimate = "%s k" % ((int(cluster_estimate) / 1000), )
113             else:
114                 cluster_estimate = 'None'
115             library_id = lane.library_id
116             library = lane.library
117             element = '<tr><td>%d</td>'\
118                       '<td><a href="%s">%s</a></td><td>%s</td></tr>'
119             html.append(element % (lane.lane_number,
120                                    library.get_admin_url(),
121                                    library,
122                                    cluster_estimate))
123         html.append('</table>')
124         return "\n".join(html)
125     Lanes.allow_tags = True
126
127     class Meta:
128         ordering = ["-run_date"]
129
130     def get_admin_url(self):
131         # that's the django way... except it didn't work
132         return urlresolvers.reverse('admin:experiments_flowcell_change',
133                                     args=(self.id,))
134
135     def flowcell_type(self):
136         """Convert our boolean 'is paired' flag to a name
137         """
138         if self.paired_end:
139             return u"Paired"
140         else:
141             return u"Single"
142
143     @models.permalink
144     def get_absolute_url(self):
145         flowcell_id, status = parse_flowcell_id(self.flowcell_id)
146         return ('htsworkflow.frontend.experiments.views.flowcell_detail',
147                 [str(flowcell_id)])
148
149     def get_raw_data_directory(self):
150         """Return location of where the raw data is stored"""
151         flowcell_id, status = parse_flowcell_id(self.flowcell_id)
152
153         return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
154
155     def update_data_runs(self):
156         result_root = self.get_raw_data_directory()
157         LOGGER.debug("Update data runs flowcell root: %s" % (result_root,))
158         if result_root is None:
159             return
160
161         result_home_dir = os.path.join(settings.RESULT_HOME_DIR, '')
162         run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
163
164         dataruns = dict([(x.result_dir, x) for x in self.datarun_set.all()])
165
166         result_dirs = []
167         for dirpath, dirnames, filenames in os.walk(result_root):
168             for filename in filenames:
169                 if run_xml_re.match(filename):
170                     # we have a run directory
171                     relative_pathname = get_relative_pathname(dirpath)
172                     cached_run = dataruns.get(relative_pathname, None)
173                     now = datetime.datetime.now()
174                     if (cached_run is None):
175                         self.import_data_run(relative_pathname, filename)
176                     elif (now - cached_run.last_update_time).days > \
177                              RESCAN_DELAY:
178                         self.import_data_run(relative_pathname,
179                                              filename, cached_run)
180
181     def import_data_run(self, relative_pathname, run_xml_name, run=None):
182         """Given a result directory import files"""
183         run_dir = get_absolute_pathname(relative_pathname)
184         run_xml_path = os.path.join(run_dir, run_xml_name)
185         run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
186         LOGGER.debug("Importing run from %s" % (relative_pathname,))
187
188         if run is None:
189             run = DataRun()
190             run.flowcell = self
191             run.status = RUN_STATUS_REVERSE_MAP['DONE']
192             run.result_dir = relative_pathname
193             run.runfolder_name = run_xml_data.runfolder_name
194             run.cycle_start = run_xml_data.image_analysis.start
195             run.cycle_stop = run_xml_data.image_analysis.stop
196             run.run_start_time = run_xml_data.image_analysis.date
197             run.image_software = run_xml_data.image_analysis.software
198             run.image_version = run_xml_data.image_analysis.version
199             run.basecall_software = run_xml_data.bustard.software
200             run.basecall_version = run_xml_data.bustard.version
201             run.alignment_software = run_xml_data.gerald.software
202             run.alignment_version = run_xml_data.gerald.version
203
204         run.last_update_time = datetime.datetime.now()
205         run.save()
206
207         run.update_result_files()
208
209
210 # FIXME: should we automatically update dataruns?
211 #        Or should we expect someone to call update_data_runs?
212 #def update_flowcell_dataruns(sender, instance, *args, **kwargs):
213 #    """Update our dataruns
214 #    """
215 #    if not os.path.exists(settings.RESULT_HOME_DIR):
216 #       return
217 #
218 #    instance.update_data_runs()
219 #post_init.connect(update_flowcell_dataruns, sender=FlowCell)
220
221
222 LANE_STATUS_CODES = [(0, 'Failed'),
223                      (1, 'Marginal'),
224                      (2, 'Good'), ]
225 LANE_STATUS_MAP = dict((int(k), v) for k, v in LANE_STATUS_CODES)
226 LANE_STATUS_MAP[None] = "Unknown"
227
228
229 def is_valid_lane(value):
230     if value >= 1 and value <= 8:
231         return True
232     else:
233         return False
234
235
236 class Lane(models.Model):
237     flowcell = models.ForeignKey(FlowCell)
238     lane_number = models.IntegerField()
239     library = models.ForeignKey(Library)
240     pM = models.DecimalField(max_digits=5,
241                              decimal_places=2,
242                              blank=False,
243                              null=False,
244                              default=default_pM)
245     cluster_estimate = models.IntegerField(blank=True, null=True)
246     status = models.IntegerField(choices=LANE_STATUS_CODES,
247                                  null=True,
248                                  blank=True)
249     comment = models.TextField(null=True, blank=True)
250
251     @models.permalink
252     def get_absolute_url(self):
253         return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
254                 [str(self.id)])
255
256     def __unicode__(self):
257         return self.flowcell.flowcell_id + ':' + unicode(self.lane_number)
258
259
260 class DataRun(models.Model):
261     flowcell = models.ForeignKey(FlowCell, verbose_name="Flowcell Id")
262     runfolder_name = models.CharField(max_length=50)
263     result_dir = models.CharField(max_length=255)
264     last_update_time = models.DateTimeField()
265     run_start_time = models.DateTimeField()
266     cycle_start = models.IntegerField(null=True, blank=True)
267     cycle_stop = models.IntegerField(null=True, blank=True)
268     run_status = models.IntegerField(choices=RUN_STATUS_CHOICES,
269                                      null=True, blank=True)
270     image_software = models.CharField(max_length=50)
271     image_version = models.CharField(max_length=50)
272     basecall_software = models.CharField(max_length=50)
273     basecall_version = models.CharField(max_length=50)
274     alignment_software = models.CharField(max_length=50)
275     alignment_version = models.CharField(max_length=50)
276     comment = models.TextField(blank=True)
277
278     def update_result_files(self):
279         abs_result_dir = get_absolute_pathname(self.result_dir)
280
281         for dirname, dirnames, filenames in os.walk(abs_result_dir):
282             for filename in filenames:
283                 pathname = os.path.join(dirname, filename)
284                 relative_pathname = get_relative_pathname(pathname)
285                 datafiles = self.datafile_set.filter(
286                     data_run=self,
287                     relative_pathname=relative_pathname)
288                 if len(datafiles) > 0:
289                     continue
290
291                 metadata = find_file_type_metadata_from_filename(filename)
292                 if metadata is not None:
293                     metadata['filename'] = filename
294                     newfile = DataFile()
295                     newfile.data_run = self
296                     newfile.file_type = metadata['file_type']
297                     newfile.relative_pathname = relative_pathname
298
299                     lane_number = metadata.get('lane', None)
300                     if lane_number is not None:
301                         lane = self.flowcell.lane_set.get(
302                             lane_number=lane_number)
303                         newfile.library = lane.library
304
305                     self.datafile_set.add(newfile)
306
307         self.last_update_time = datetime.datetime.now()
308
309     def lane_files(self):
310         lanes = {}
311
312         for datafile in self.datafile_set.all():
313             metadata = datafile.attributes
314             if metadata is not None:
315                 lane = metadata.get('lane', None)
316                 if lane is not None:
317                     lane_file_set = lanes.setdefault(lane, {})
318                     normalized_name = datafile.file_type.normalized_name
319                     lane_file_set[normalized_name] = datafile
320         return lanes
321
322     def ivc_plots(self, lane):
323         ivc_name = ['IVC All', 'IVC Call',
324                     'IVC Percent Base', 'IVC Percent All', 'IVC Percent Call']
325
326         plots = {}
327         for rel_filename, metadata in self.get_result_files():
328             if metadata.file_type.name in ivc_name:
329                 plots[metadata.file_type.name] = (rel_filename, metadata)
330
331
332 class FileType(models.Model):
333     """Represent potential file types
334
335     regex is a pattern used to detect if a filename matches this type
336     data run currently assumes that there may be a (?P<lane>) and
337     (?P<end>) pattern in the regular expression.
338     """
339     name = models.CharField(max_length=50)
340     mimetype = models.CharField(max_length=50, null=True, blank=True)
341     # regular expression from glob.fnmatch.translate
342     regex = models.CharField(max_length=50, null=True, blank=True)
343
344     def parse_filename(self, pathname):
345         """Does filename match our pattern?
346
347         Returns None if not, or dictionary of match variables if we do.
348         """
349         path, filename = os.path.split(pathname)
350         if len(self.regex) > 0:
351             match = re.match(self.regex, filename)
352             if match is not None:
353                 # These are (?P<>) names we know about from our
354                 # default regexes.
355                 results = match.groupdict()
356
357                 # convert int parameters
358                 for attribute_name in ['lane', 'end']:
359                     value = results.get(attribute_name, None)
360                     if value is not None:
361                         results[attribute_name] = int(value)
362
363                 return results
364
365     def _get_normalized_name(self):
366         """Crush data file name into identifier friendly name"""
367         return self.name.replace(' ', '_').lower()
368     normalized_name = property(_get_normalized_name)
369
370     def __unicode__(self):
371         #return u"<FileType: %s>" % (self.name,)
372         return self.name
373
374
375 def str_uuid():
376     """Helper function to set default UUID in DataFile"""
377     return str(uuid.uuid1())
378
379
380 class DataFile(models.Model):
381     """Store map from random ID to filename"""
382     random_key = models.CharField(max_length=64,
383                                   db_index=True,
384                                   default=str_uuid)
385     data_run = models.ForeignKey(DataRun, db_index=True)
386     library = models.ForeignKey(Library, db_index=True, null=True, blank=True)
387     file_type = models.ForeignKey(FileType)
388     relative_pathname = models.CharField(max_length=255, db_index=True)
389
390     def _get_attributes(self):
391         return self.file_type.parse_filename(self.relative_pathname)
392     attributes = property(_get_attributes)
393
394     def _get_pathname(self):
395         return get_absolute_pathname(self.relative_pathname)
396     pathname = property(_get_pathname)
397
398     @models.permalink
399     def get_absolute_url(self):
400         return ('htsworkflow.frontend.experiments.views.read_result_file',
401                 (), {'key': self.random_key})
402
403
404 def find_file_type_metadata_from_filename(pathname):
405     path, filename = os.path.split(pathname)
406     result = None
407     for file_type in FileType.objects.all():
408         result = file_type.parse_filename(filename)
409         if result is not None:
410             result['file_type'] = file_type
411             return result
412
413     return None
414
415
416 def get_relative_pathname(abspath):
417     """Strip off the result home directory from a path
418     """
419     result_home_dir = os.path.join(settings.RESULT_HOME_DIR, '')
420     relative_pathname = abspath.replace(result_home_dir, '')
421     return relative_pathname
422
423
424 def get_absolute_pathname(relative_pathname):
425     """Attach relative path to  results home directory"""
426     return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)