Improve a settings error message, as it was confusing me while debugging
[htsworkflow.git] / htsworkflow / frontend / experiments / models.py
1 import datetime
2 import glob
3 import logging
4 import os
5 import re
6 import types
7 import uuid
8
9 from django.conf import settings
10 from django.core.exceptions import ObjectDoesNotExist
11 from django.core import urlresolvers
12 from django.db import models
13 from django.db.models.signals import post_init
14
15 from htsworkflow.frontend.samples.models import Library
16 from htsworkflow.util.conversion import parse_flowcell_id
17 from htsworkflow.pipelines import runfolder
18
19 logger = logging.getLogger(__name__)
20 default_pM = 5
21 try:
22     default_pM = int(settings.DEFAULT_PM)
23 except ValueError,e:
24     logger.error("invalid value for frontend.default_pm")
25
26 # how many days to wait before trying to re-import a runfolder
27 RESCAN_DELAY = 1
28 try:
29     RESCAN_DELAY = int(settings.RESCAN_DELAY)
30 except (ValueError, AttributeError):
31     logger.error("Missing or invalid settings.RESCAN_DELAY, "\
32                  "defaulting to %s" % (RESCAN_DELAY,))
33
34 RUN_STATUS_CHOICES = (
35     (0, 'Sequencer running'), ##Solexa Data Pipeline Not Yet Started'),
36     (1, 'Data Pipeline Started'),
37     (2, 'Data Pipeline Interrupted'),
38     (3, 'Data Pipeline Finished'),
39     (4, 'Collect Results Started'),
40     (5, 'Collect Results Finished'),
41     (6, 'QC Started'),
42     (7, 'QC Finished'),
43     (255, 'DONE'),
44   )
45 RUN_STATUS_REVERSE_MAP = dict(((v,k) for k,v in RUN_STATUS_CHOICES))
46
47 class ClusterStation(models.Model):
48   name = models.CharField(max_length=50, unique=True)
49
50   def __unicode__(self):
51     return unicode(self.name)
52
53 class Sequencer(models.Model):
54   name = models.CharField(max_length=50, unique=True)
55
56   def __unicode__(self):
57     return unicode(self.name)
58
59 class FlowCell(models.Model):
60   flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
61   run_date = models.DateTimeField()
62   advanced_run = models.BooleanField(default=False)
63   paired_end = models.BooleanField(default=False)
64   read_length = models.IntegerField(default=32) #Stanford is currenlty 25
65   control_lane = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(0,'All Lanes')], null=True, blank=True)
66
67   cluster_station = models.ForeignKey(ClusterStation, default=3)
68   sequencer = models.ForeignKey(Sequencer, default=1)
69
70   notes = models.TextField(blank=True)
71
72   def __unicode__(self):
73       return unicode(self.flowcell_id)
74
75   def Lanes(self):
76     html = ['<table>']
77     for lane in self.lane_set.order_by('lane_number'):
78         cluster_estimate = lane.cluster_estimate
79         if cluster_estimate is not None:
80             cluster_estimate = "%s k" % ((int(cluster_estimate)/1000), )
81         else:
82             cluster_estimate = 'None'
83         library_id = lane.library_id
84         library = lane.library
85         element = '<tr><td>%d</td><td><a href="%s">%s</a></td><td>%s</td></tr>'
86         html.append(element % (lane.lane_number,
87                                library.get_admin_url(),
88                                library,
89                                cluster_estimate))
90     html.append('</table>')
91     return "\n".join(html)
92   Lanes.allow_tags = True
93
94   class Meta:
95     ordering = ["-run_date"]
96
97   def get_admin_url(self):
98     # that's the django way... except it didn't work
99     return urlresolvers.reverse('admin:experiments_flowcell_change',
100                                 args=(self.id,))
101
102   def flowcell_type(self):
103     """
104     Convert our boolean 'is paired' flag to a name
105     """
106     if self.paired_end:
107       return u"Paired"
108     else:
109       return u"Single"
110
111   @models.permalink
112   def get_absolute_url(self):
113       flowcell_id, status = parse_flowcell_id(self.flowcell_id)
114       return ('htsworkflow.frontend.experiments.views.flowcell_detail',
115               [str(flowcell_id)])
116
117   def get_raw_data_directory(self):
118       """Return location of where the raw data is stored"""
119       flowcell_id, status = parse_flowcell_id(self.flowcell_id)
120
121       return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
122
123   def update_data_runs(self):
124       result_root = self.get_raw_data_directory()
125       logger.debug("Update data runs flowcell root: %s" % (result_root,))
126       if result_root is None:
127           return
128
129       result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
130       run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
131
132       dataruns = dict([ (x.result_dir, x) for x in self.datarun_set.all() ])
133
134       result_dirs = []
135       for dirpath, dirnames, filenames in os.walk(result_root):
136           for filename in filenames:
137               if run_xml_re.match(filename):
138                   # we have a run directory
139                   relative_pathname = get_relative_pathname(dirpath)
140                   cached_run = dataruns.get(relative_pathname, None)
141                   now = datetime.datetime.now()
142                   if (cached_run is None):
143                       self.import_data_run(relative_pathname, filename)
144                   elif (now - cached_run.last_update_time).days > RESCAN_DELAY:
145                       self.import_data_run(relative_pathname,
146                                            filename, cached_run)
147
148   def import_data_run(self, relative_pathname, run_xml_name, run=None):
149       """Given a result directory import files"""
150       run_dir = get_absolute_pathname(relative_pathname)
151       run_xml_path = os.path.join(run_dir, run_xml_name)
152       run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
153       logger.debug("Importing run from %s" % (relative_pathname,))
154
155       if run is None:
156           run = DataRun()
157           run.flowcell = self
158           run.status = RUN_STATUS_REVERSE_MAP['DONE']
159           run.result_dir = relative_pathname
160           run.runfolder_name = run_xml_data.runfolder_name
161           run.cycle_start = run_xml_data.image_analysis.start
162           run.cycle_stop = run_xml_data.image_analysis.stop
163           run.run_start_time = run_xml_data.image_analysis.date
164
165       run.last_update_time = datetime.datetime.now()
166       run.save()
167
168       run.update_result_files()
169
170
171 # FIXME: should we automatically update dataruns?
172 #        Or should we expect someone to call update_data_runs?
173 #def update_flowcell_dataruns(sender, instance, *args, **kwargs):
174 #    """Update our dataruns
175 #    """
176 #    if not os.path.exists(settings.RESULT_HOME_DIR):
177 #       return
178 #
179 #    instance.update_data_runs()
180 #post_init.connect(update_flowcell_dataruns, sender=FlowCell)
181
182
183
184 LANE_STATUS_CODES = [(0, 'Failed'),
185                     (1, 'Marginal'),
186                     (2, 'Good'),]
187 LANE_STATUS_MAP = dict((int(k),v) for k,v in LANE_STATUS_CODES )
188 LANE_STATUS_MAP[None] = "Unknown"
189
190 def is_valid_lane(value):
191     if value >= 1 and value <= 8:
192         return True
193     else:
194           return False
195
196 class Lane(models.Model):
197   flowcell = models.ForeignKey(FlowCell)
198   lane_number = models.IntegerField()
199   library = models.ForeignKey(Library)
200   pM = models.DecimalField(max_digits=5, decimal_places=2,blank=False, null=False,default=default_pM)
201   cluster_estimate = models.IntegerField(blank=True, null=True)
202   status = models.IntegerField(choices=LANE_STATUS_CODES, null=True, blank=True)
203   comment = models.TextField(null=True, blank=True)
204
205   @models.permalink
206   def get_absolute_url(self):
207        return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
208                [str(self.id)])
209
210   def __unicode__(self):
211       return self.flowcell.flowcell_id + ':' + unicode(self.lane_number)
212
213 ### -----------------------
214 class DataRun(models.Model):
215     flowcell = models.ForeignKey(FlowCell,verbose_name="Flowcell Id")
216     runfolder_name = models.CharField(max_length=50)
217     result_dir = models.CharField(max_length=255)
218     last_update_time = models.DateTimeField()
219     run_start_time = models.DateTimeField()
220     cycle_start = models.IntegerField(null=True, blank=True)
221     cycle_stop = models.IntegerField(null=True, blank=True)
222     run_status = models.IntegerField(choices=RUN_STATUS_CHOICES,
223                                      null=True, blank=True)
224     comment = models.TextField(blank=True)
225
226     def update_result_files(self):
227         abs_result_dir = get_absolute_pathname(self.result_dir)
228
229         for dirname, dirnames, filenames in os.walk(abs_result_dir):
230             for filename in filenames:
231                 pathname = os.path.join(dirname, filename)
232                 relative_pathname = get_relative_pathname(pathname)
233                 datafiles = self.datafile_set.filter(
234                   data_run = self,
235                   relative_pathname=relative_pathname)
236                 if len(datafiles) > 0:
237                     continue
238
239                 metadata = find_file_type_metadata_from_filename(filename)
240                 if metadata is not None:
241                     metadata['filename'] = filename
242                     newfile = DataFile()
243                     newfile.data_run = self
244                     newfile.file_type = metadata['file_type']
245                     newfile.relative_pathname = relative_pathname
246
247                     lane_number = metadata.get('lane', None)
248                     if lane_number is not None:
249                         lane = self.flowcell.lane_set.get(lane_number = lane_number)
250                         newfile.library = lane.library
251
252                     self.datafile_set.add(newfile)
253
254         self.last_update_time = datetime.datetime.now()
255
256     def lane_files(self):
257         lanes = {}
258
259         for datafile in self.datafile_set.all():
260             metadata = datafile.attributes
261             if metadata is not None:
262                 lane = metadata.get('lane', None)
263                 if lane is not None:
264                     lane_file_set = lanes.setdefault(lane, {})
265                     lane_file_set[datafile.file_type.normalized_name] = datafile
266         return lanes
267
268     def ivc_plots(self, lane):
269         ivc_name = ['IVC All', 'IVC Call',
270                     'IVC Percent Base', 'IVC Percent All', 'IVC Percent Call']
271
272         plots = {}
273         for rel_filename, metadata in self.get_result_files():
274             if metadata.file_type.name in ivc_name:
275                 plots[metadata.file_type.name] = (rel_filename, metadata)
276
277 class FileType(models.Model):
278     """Represent potential file types
279
280     regex is a pattern used to detect if a filename matches this type
281     data run currently assumes that there may be a (?P<lane>) and
282     (?P<end>) pattern in the regular expression.
283     """
284     name = models.CharField(max_length=50)
285     mimetype = models.CharField(max_length=50, null=True, blank=True)
286     # regular expression from glob.fnmatch.translate
287     regex = models.CharField(max_length=50, null=True, blank=True)
288
289     def parse_filename(self, pathname):
290         """Does filename match our pattern?
291
292         Returns None if not, or dictionary of match variables if we do.
293         """
294         path, filename = os.path.split(pathname)
295         if len(self.regex) > 0:
296             match = re.match(self.regex, filename)
297             if match is not None:
298                 # These are (?P<>) names we know about from our default regexes.
299                 results = match.groupdict()
300
301                 # convert int parameters
302                 for attribute_name in ['lane', 'end']:
303                     value = results.get(attribute_name, None)
304                     if value is not None:
305                         results[attribute_name] = int(value)
306
307                 return results
308
309     def _get_normalized_name(self):
310         """Crush data file name into identifier friendly name"""
311         return self.name.replace(' ', '_').lower()
312     normalized_name = property(_get_normalized_name)
313
314     def __unicode__(self):
315         #return u"<FileType: %s>" % (self.name,)
316         return self.name
317
318 def str_uuid():
319     """Helper function to set default UUID in DataFile"""
320     return str(uuid.uuid1())
321
322 class DataFile(models.Model):
323     """Store map from random ID to filename"""
324     random_key = models.CharField(max_length=64,
325                                   db_index=True,
326                                   default=str_uuid)
327     data_run = models.ForeignKey(DataRun, db_index=True)
328     library = models.ForeignKey(Library, db_index=True, null=True, blank=True)
329     file_type = models.ForeignKey(FileType)
330     relative_pathname = models.CharField(max_length=255, db_index=True)
331
332     def _get_attributes(self):
333         return self.file_type.parse_filename(self.relative_pathname)
334     attributes = property(_get_attributes)
335
336     def _get_pathname(self):
337         return get_absolute_pathname(self.relative_pathname)
338     pathname = property(_get_pathname)
339
340     @models.permalink
341     def get_absolute_url(self):
342         return ('htsworkflow.frontend.experiments.views.read_result_file',
343                 (), {'key': self.random_key })
344
345 def find_file_type_metadata_from_filename(pathname):
346     path, filename = os.path.split(pathname)
347     result = None
348     for file_type in FileType.objects.all():
349         result = file_type.parse_filename(filename)
350         if result is not None:
351             result['file_type'] = file_type
352             return result
353
354     return None
355
356 def get_relative_pathname(abspath):
357     """Strip off the result home directory from a path
358     """
359     result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
360     relative_pathname = abspath.replace(result_home_dir,'')
361     return relative_pathname
362
363 def get_absolute_pathname(relative_pathname):
364     """Attach relative path to  results home directory"""
365     return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)
366