Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
[htsworkflow.git] / htsworkflow / frontend / experiments / models.py
1 import datetime
2 import glob
3 import logging
4 import os
5 import re
6 import types
7 import uuid
8
9 from django.conf import settings
10 from django.core.exceptions import ObjectDoesNotExist
11 from django.core import urlresolvers
12 from django.db import models
13 from django.db.models.signals import post_init
14
15 from htsworkflow.frontend.samples.models import Library
16 from htsworkflow.util.conversion import parse_flowcell_id
17 from htsworkflow.pipelines import runfolder
18
19 logger = logging.getLogger(__name__)
20 default_pM = 5
21 try:
22   default_pM = int(settings.DEFAULT_PM)
23 except ValueError,e:
24   logger.error("invalid value for frontend.default_pm")
25
26 RUN_STATUS_CHOICES = (
27     (0, 'Sequencer running'), ##Solexa Data Pipeline Not Yet Started'),
28     (1, 'Data Pipeline Started'),
29     (2, 'Data Pipeline Interrupted'),
30     (3, 'Data Pipeline Finished'),
31     (4, 'Collect Results Started'),
32     (5, 'Collect Results Finished'),
33     (6, 'QC Started'),
34     (7, 'QC Finished'),
35     (255, 'DONE'),
36   )
37 RUN_STATUS_REVERSE_MAP = dict(((v,k) for k,v in RUN_STATUS_CHOICES))
38
39 class ClusterStation(models.Model):
40   name = models.CharField(max_length=50, unique=True)
41
42   def __unicode__(self):
43     return unicode(self.name)
44
45 class Sequencer(models.Model):
46   name = models.CharField(max_length=50, unique=True)
47
48   def __unicode__(self):
49     return unicode(self.name)
50
51 class FlowCell(models.Model):
52   flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
53   run_date = models.DateTimeField()
54   advanced_run = models.BooleanField(default=False)
55   paired_end = models.BooleanField(default=False)
56   read_length = models.IntegerField(default=32) #Stanford is currenlty 25
57   control_lane = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(0,'All Lanes')], null=True, blank=True)
58
59   cluster_station = models.ForeignKey(ClusterStation, default=3)
60   sequencer = models.ForeignKey(Sequencer, default=1)
61   
62   notes = models.TextField(blank=True)
63
64   def __unicode__(self):
65       return unicode(self.flowcell_id) 
66
67   def Lanes(self):
68     html = ['<table>']
69     for lane in self.lane_set.order_by('lane_number'):
70         cluster_estimate = lane.cluster_estimate
71         if cluster_estimate is not None:
72             cluster_estimate = "%s k" % ((int(cluster_estimate)/1000), )
73         else:
74             cluster_estimate = 'None'
75         library_id = lane.library_id
76         library = lane.library
77         element = '<tr><td>%d</td><td><a href="%s">%s</a></td><td>%s</td></tr>'
78         html.append(element % (lane.lane_number,
79                                library.get_admin_url(),
80                                library,
81                                cluster_estimate))
82     html.append('</table>')
83     return "\n".join(html)
84   Lanes.allow_tags = True
85
86   class Meta:
87     ordering = ["-run_date"]
88
89   def get_admin_url(self):
90     # that's the django way... except it didn't work
91     return urlresolvers.reverse('admin:experiments_flowcell_change',
92                                 args=(self.id,))
93
94   def flowcell_type(self):
95     """
96     Convert our boolean 'is paired' flag to a name
97     """
98     if self.paired_end:
99       return u"Paired"
100     else:
101       return u"Single"
102
103   @models.permalink
104   def get_absolute_url(self):
105       flowcell_id, status = parse_flowcell_id(self.flowcell_id)
106       return ('htsworkflow.frontend.experiments.views.flowcell_detail',
107               [str(flowcell_id)])
108     
109   def get_raw_data_directory(self):
110       """Return location of where the raw data is stored"""
111       flowcell_id, status = parse_flowcell_id(self.flowcell_id)
112
113       return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
114
115   def update_data_runs(self):
116       result_root = self.get_raw_data_directory()
117       if result_root is None:
118           return
119
120       result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
121       run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
122       
123       dataruns = self.datarun_set.all()
124       datarun_result_dirs = [ x.result_dir for x in dataruns ]
125
126       result_dirs = []
127       for dirpath, dirnames, filenames in os.walk(result_root):
128           for filename in filenames:
129               if run_xml_re.match(filename):
130                   # we have a run directory
131                   relative_pathname = get_relative_pathname(dirpath)
132                   if relative_pathname not in datarun_result_dirs:
133                       self.import_data_run(relative_pathname, filename)
134                 
135   def import_data_run(self, relative_pathname, run_xml_name):
136       """Given a result directory import files"""
137       run_dir = get_absolute_pathname(relative_pathname)
138       run_xml_path = os.path.join(run_dir, run_xml_name)
139       run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
140                                   
141       run = DataRun()
142       run.flowcell = self
143       run.status = RUN_STATUS_REVERSE_MAP['DONE']
144       run.result_dir = relative_pathname
145       run.runfolder_name = run_xml_data.runfolder_name
146       run.cycle_start = run_xml_data.image_analysis.start
147       run.cycle_stop = run_xml_data.image_analysis.stop
148       run.run_start_time = run_xml_data.image_analysis.date
149
150       run.last_update_time = datetime.datetime.now()
151       run.save()
152
153       run.update_result_files()
154
155       
156 # FIXME: should we automatically update dataruns?
157 #        Or should we expect someone to call update_data_runs?
158 #def update_flowcell_dataruns(sender, instance, *args, **kwargs):
159 #    """Update our dataruns
160 #    """
161 #    if not os.path.exists(settings.RESULT_HOME_DIR):
162 #       return
163 #
164 #    instance.update_data_runs()    
165 #post_init.connect(update_flowcell_dataruns, sender=FlowCell)
166
167
168
169 LANE_STATUS_CODES = [(0, 'Failed'),
170                     (1, 'Marginal'),
171                     (2, 'Good'),]
172 LANE_STATUS_MAP = dict((int(k),v) for k,v in LANE_STATUS_CODES )
173 LANE_STATUS_MAP[None] = "Unknown"
174
175 def is_valid_lane(value):
176     if value >= 1 and value <= 8:
177         return True
178     else:
179           return False
180
181 class Lane(models.Model):
182   flowcell = models.ForeignKey(FlowCell)
183   lane_number = models.IntegerField() 
184   library = models.ForeignKey(Library)
185   pM = models.DecimalField(max_digits=5, decimal_places=2,blank=False, null=False,default=default_pM)
186   cluster_estimate = models.IntegerField(blank=True, null=True)                                       
187   status = models.IntegerField(choices=LANE_STATUS_CODES, null=True, blank=True) 
188   comment = models.TextField(null=True, blank=True)
189
190   @models.permalink
191   def get_absolute_url(self):
192        return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
193                [str(self.id)])
194
195   def __unicode__(self):
196       return self.flowcell.flowcell_id + ':' + unicode(self.lane_number)
197
198 ### -----------------------
199 class DataRun(models.Model):
200     flowcell = models.ForeignKey(FlowCell,verbose_name="Flowcell Id")
201     runfolder_name = models.CharField(max_length=50)
202     result_dir = models.CharField(max_length=255)
203     last_update_time = models.DateTimeField()
204     run_start_time = models.DateTimeField()
205     cycle_start = models.IntegerField(null=True, blank=True)
206     cycle_stop = models.IntegerField(null=True, blank=True)
207     run_status = models.IntegerField(choices=RUN_STATUS_CHOICES, 
208                                      null=True, blank=True)
209     comment = models.TextField(blank=True)
210
211     def update_result_files(self):
212         abs_result_dir = get_absolute_pathname(self.result_dir)
213
214         for dirname, dirnames, filenames in os.walk(abs_result_dir):
215             for filename in filenames:
216                 pathname = os.path.join(dirname, filename)
217                 relative_pathname = get_relative_pathname(pathname)
218                 datafiles = self.datafile_set.filter(
219                   data_run = self,
220                   relative_pathname=relative_pathname)
221                 if len(datafiles) > 0:
222                     continue
223                   
224                 metadata = find_file_type_metadata_from_filename(filename)
225                 if metadata is not None:
226                     metadata['filename'] = filename
227                     newfile = DataFile()
228                     newfile.data_run = self
229                     newfile.file_type = metadata['file_type']
230                     newfile.relative_pathname = relative_pathname
231
232                     lane_number = metadata.get('lane', None)
233                     if lane_number is not None:
234                         lane = self.flowcell.lane_set.get(lane_number = lane_number)
235                         newfile.library = lane.library
236                     
237                     self.datafile_set.add(newfile)
238                     
239         self.last_update_time = datetime.datetime.now()
240
241     def lane_files(self):
242         lanes = {}
243         
244         for datafile in self.datafile_set.all():
245             metadata = datafile.attributes
246             if metadata is not None:
247                 lane = metadata.get('lane', None)
248                 if lane is not None:
249                     lane_file_set = lanes.setdefault(lane, {})
250                     lane_file_set[datafile.file_type.normalized_name] = datafile
251         return lanes
252
253     def ivc_plots(self, lane):
254         ivc_name = ['IVC All', 'IVC Call',
255                     'IVC Percent Base', 'IVC Percent All', 'IVC Percent Call']
256
257         plots = {}
258         for rel_filename, metadata in self.get_result_files():
259             if metadata.file_type.name in ivc_name:
260                 plots[metadata.file_type.name] = (rel_filename, metadata)
261                 
262 class FileType(models.Model):
263     """Represent potential file types
264
265     regex is a pattern used to detect if a filename matches this type
266     data run currently assumes that there may be a (?P<lane>) and
267     (?P<end>) pattern in the regular expression.
268     """
269     name = models.CharField(max_length=50)
270     mimetype = models.CharField(max_length=50, null=True, blank=True)
271     # regular expression from glob.fnmatch.translate
272     regex = models.CharField(max_length=50, null=True, blank=True)
273
274     def parse_filename(self, pathname):
275         """Does filename match our pattern?
276
277         Returns None if not, or dictionary of match variables if we do.
278         """
279         path, filename = os.path.split(pathname)
280         if len(self.regex) > 0:
281             match = re.match(self.regex, filename)
282             if match is not None:
283                 # These are (?P<>) names we know about from our default regexes.
284                 results = match.groupdict()
285
286                 # convert int parameters
287                 for attribute_name in ['lane', 'end']:
288                     value = results.get(attribute_name, None)
289                     if value is not None:
290                         results[attribute_name] = int(value)
291                     
292                 return results
293
294     def _get_normalized_name(self):
295         """Crush data file name into identifier friendly name"""
296         return self.name.replace(' ', '_').lower()
297     normalized_name = property(_get_normalized_name)
298               
299     def __unicode__(self):
300         #return u"<FileType: %s>" % (self.name,)
301         return self.name
302
303 def str_uuid():
304     """Helper function to set default UUID in DataFile"""
305     return str(uuid.uuid1())
306
307 class DataFile(models.Model):
308     """Store map from random ID to filename"""
309     random_key = models.CharField(max_length=64,
310                                   db_index=True,
311                                   default=str_uuid)
312     data_run = models.ForeignKey(DataRun, db_index=True)
313     library = models.ForeignKey(Library, db_index=True, null=True, blank=True)
314     file_type = models.ForeignKey(FileType)
315     relative_pathname = models.CharField(max_length=255, db_index=True)
316
317     def _get_attributes(self):
318         return self.file_type.parse_filename(self.relative_pathname)
319     attributes = property(_get_attributes)
320
321     def _get_pathname(self):
322         return get_absolute_pathname(self.relative_pathname)
323     pathname = property(_get_pathname)
324
325     @models.permalink
326     def get_absolute_url(self):
327         return ('htsworkflow.frontend.experiments.views.read_result_file',
328                 (), {'key': self.random_key })
329
330 def find_file_type_metadata_from_filename(pathname):
331     path, filename = os.path.split(pathname)
332     result = None
333     for file_type in FileType.objects.all():
334         result = file_type.parse_filename(filename)
335         if result is not None:
336             result['file_type'] = file_type
337             return result
338
339     return None
340   
341 def get_relative_pathname(abspath):
342     """Strip off the result home directory from a path
343     """
344     result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
345     relative_pathname = abspath.replace(result_home_dir,'')
346     return relative_pathname
347
348 def get_absolute_pathname(relative_pathname):
349     """Attach relative path to  results home directory"""
350     return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)
351