9 from django.conf import settings
10 from django.core.exceptions import ObjectDoesNotExist
11 from django.core import urlresolvers
12 from django.db import models
13 from django.db.models.signals import post_init
15 from htsworkflow.frontend.samples.models import Library
16 from htsworkflow.frontend.samples.results import parse_flowcell_id
17 from htsworkflow.pipelines import runfolder
21 default_pM = int(settings.DEFAULT_PM)
23 logging.error("invalid value for frontend.default_pm")
25 RUN_STATUS_CHOICES = (
26 (0, 'Sequencer running'), ##Solexa Data Pipeline Not Yet Started'),
27 (1, 'Data Pipeline Started'),
28 (2, 'Data Pipeline Interrupted'),
29 (3, 'Data Pipeline Finished'),
30 (4, 'Collect Results Started'),
31 (5, 'Collect Results Finished'),
36 RUN_STATUS_REVERSE_MAP = dict(((v,k) for k,v in RUN_STATUS_CHOICES))
38 class ClusterStation(models.Model):
39 name = models.CharField(max_length=50, unique=True)
41 def __unicode__(self):
42 return unicode(self.name)
44 class Sequencer(models.Model):
45 name = models.CharField(max_length=50, unique=True)
47 def __unicode__(self):
48 return unicode(self.name)
50 class FlowCell(models.Model):
51 flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
52 run_date = models.DateTimeField()
53 advanced_run = models.BooleanField(default=False)
54 paired_end = models.BooleanField(default=False)
55 read_length = models.IntegerField(default=32) #Stanford is currenlty 25
56 control_lane = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(0,'All Lanes')], null=True, blank=True)
58 cluster_station = models.ForeignKey(ClusterStation, default=3)
59 sequencer = models.ForeignKey(Sequencer, default=1)
61 notes = models.TextField(blank=True)
63 def __unicode__(self):
64 return unicode(self.flowcell_id)
68 for lane in self.lane_set.all():
69 cluster_estimate = lane.cluster_estimate
70 if cluster_estimate is not None:
71 cluster_estimate = "%s k" % ((int(cluster_estimate)/1000), )
73 cluster_estimate = 'None'
74 library_id = lane.library_id
75 library = lane.library
76 element = '<tr><td>%d</td><td><a href="%s">%s</a></td><td>%s</td></tr>'
77 html.append(element % (lane.lane_number,
78 library.get_admin_url(),
81 html.append('</table>')
82 return "\n".join(html)
83 Lanes.allow_tags = True
86 ordering = ["-run_date"]
88 def get_admin_url(self):
89 # that's the django way... except it didn't work
90 return urlresolvers.reverse('admin:experiments_flowcell_change',
93 def flowcell_type(self):
95 Convert our boolean 'is paired' flag to a name
103 def get_absolute_url(self):
104 return ('htsworkflow.frontend.experiments.views.flowcell_detail',
105 [str(self.flowcell_id)])
107 def get_raw_data_directory(self):
108 """Return location of where the raw data is stored"""
109 flowcell_id, status = parse_flowcell_id(self.flowcell_id)
111 return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
113 def update_data_runs(self):
114 result_root = self.get_raw_data_directory()
115 if result_root is None:
118 result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
119 run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
121 dataruns = self.datarun_set.all()
122 datarun_result_dirs = [ x.result_dir for x in dataruns ]
125 for dirpath, dirnames, filenames in os.walk(result_root):
126 for filename in filenames:
127 if run_xml_re.match(filename):
128 # we have a run directory
129 relative_pathname = get_relative_pathname(dirpath)
130 if relative_pathname not in datarun_result_dirs:
131 self.import_data_run(relative_pathname, filename)
133 def import_data_run(self, relative_pathname, run_xml_name):
134 """Given a result directory import files"""
135 run_dir = get_absolute_pathname(relative_pathname)
136 run_xml_path = os.path.join(run_dir, run_xml_name)
137 run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
141 run.status = RUN_STATUS_REVERSE_MAP['DONE']
142 run.result_dir = relative_pathname
143 run.runfolder_name = run_xml_data.runfolder_name
144 run.cycle_start = run_xml_data.image_analysis.start
145 run.cycle_stop = run_xml_data.image_analysis.stop
146 run.run_start_time = run_xml_data.image_analysis.date
147 run.last_update_time = datetime.datetime.now()
150 run.update_result_files()
152 # FIXME: should we automatically update dataruns?
153 # Or should we expect someone to call update_data_runs?
154 #def update_flowcell_dataruns(sender, instance, *args, **kwargs):
155 # """Update our dataruns
157 # if not os.path.exists(settings.RESULT_HOME_DIR):
160 # instance.update_data_runs()
161 #post_init.connect(update_flowcell_dataruns, sender=FlowCell)
165 LANE_STATUS_CODES = [(0, 'Failed'),
168 LANE_STATUS_MAP = dict((int(k),v) for k,v in LANE_STATUS_CODES )
169 LANE_STATUS_MAP[None] = "Unknown"
171 class Lane(models.Model):
172 flowcell = models.ForeignKey(FlowCell)
173 lane_number = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8)])
174 library = models.ForeignKey(Library)
175 pM = models.DecimalField(max_digits=5, decimal_places=2,blank=False, null=False,default=default_pM)
176 cluster_estimate = models.IntegerField(blank=True, null=True)
177 status = models.IntegerField(choices=LANE_STATUS_CODES, null=True, blank=True)
178 comment = models.TextField(null=True, blank=True)
181 def get_absolute_url(self):
182 return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
183 [str(self.flowcell.flowcell_id), str(self.lane_number)])
186 ### -----------------------
187 class DataRun(models.Model):
188 flowcell = models.ForeignKey(FlowCell,verbose_name="Flowcell Id")
189 runfolder_name = models.CharField(max_length=50)
190 result_dir = models.CharField(max_length=255)
191 last_update_time = models.DateTimeField()
192 run_start_time = models.DateTimeField()
193 cycle_start = models.IntegerField(null=True, blank=True)
194 cycle_stop = models.IntegerField(null=True, blank=True)
195 run_status = models.IntegerField(choices=RUN_STATUS_CHOICES,
196 null=True, blank=True)
197 comment = models.TextField(blank=True)
199 def update_result_files(self):
200 abs_result_dir = get_absolute_pathname(self.result_dir)
202 for dirname, dirnames, filenames in os.walk(abs_result_dir):
203 for filename in filenames:
204 pathname = os.path.join(dirname, filename)
205 relative_pathname = get_relative_pathname(pathname)
206 datafiles = self.datafile_set.filter(
208 relative_pathname=relative_pathname)
209 if len(datafiles) > 0:
212 metadata = find_file_type_metadata_from_filename(filename)
213 if metadata is not None:
214 metadata['filename'] = filename
216 newfile.data_run = self
217 newfile.file_type = metadata['file_type']
218 newfile.relative_pathname = relative_pathname
220 lane_number = metadata.get('lane', None)
221 if lane_number is not None:
222 lane = self.flowcell.lane_set.get(lane_number = lane_number)
223 newfile.library = lane.library
225 self.datafile_set.add(newfile)
227 self.last_update_time = datetime.datetime.now()
229 def lane_files(self):
232 for datafile in self.datafile_set.all():
233 metadata = datafile.attributes
234 if metadata is not None:
235 lane = metadata.get('lane', None)
237 lane_file_set = lanes.setdefault(lane, {})
238 lane_file_set[datafile.file_type.normalized_name] = datafile
241 def ivc_plots(self, lane):
242 ivc_name = ['IVC All', 'IVC Call',
243 'IVC Percent Base', 'IVC Percent All', 'IVC Percent Call']
246 for rel_filename, metadata in self.get_result_files():
247 if metadata.file_type.name in ivc_name:
248 plots[metadata.file_type.name] = (rel_filename, metadata)
250 class FileType(models.Model):
251 """Represent potential file types
253 regex is a pattern used to detect if a filename matches this type
254 data run currently assumes that there may be a (?P<lane>) and
255 (?P<end>) pattern in the regular expression.
257 name = models.CharField(max_length=50)
258 mimetype = models.CharField(max_length=50, null=True, blank=True)
259 # regular expression from glob.fnmatch.translate
260 regex = models.CharField(max_length=50, null=True, blank=True)
262 def parse_filename(self, pathname):
263 """Does filename match our pattern?
265 Returns None if not, or dictionary of match variables if we do.
267 path, filename = os.path.split(pathname)
268 if len(self.regex) > 0:
269 match = re.match(self.regex, filename)
270 if match is not None:
271 # These are (?P<>) names we know about from our default regexes.
272 results = match.groupdict()
274 # convert int parameters
275 for attribute_name in ['lane', 'end']:
276 value = results.get(attribute_name, None)
277 if value is not None:
278 results[attribute_name] = int(value)
282 def _get_normalized_name(self):
283 """Crush data file name into identifier friendly name"""
284 return self.name.replace(' ', '_').lower()
285 normalized_name = property(_get_normalized_name)
287 def __unicode__(self):
288 #return u"<FileType: %s>" % (self.name,)
292 class DataFile(models.Model):
293 """Store map from random ID to filename"""
294 random_key = models.CharField(max_length=16,
297 data_run = models.ForeignKey(DataRun, db_index=True)
298 library = models.ForeignKey(Library, db_index=True, null=True, blank=True)
299 file_type = models.ForeignKey(FileType)
300 relative_pathname = models.CharField(max_length=255, db_index=True)
302 def _get_attributes(self):
303 return self.file_type.parse_filename(self.relative_pathname)
304 attributes = property(_get_attributes)
306 def _get_pathname(self):
307 return get_absolute_pathname(self.relative_pathname)
308 pathname = property(_get_pathname)
311 def get_absolute_url(self):
312 return ('htsworkflow.frontend.experiments.views.read_result_file',
313 (), {'key': self.random_key })
315 def find_file_type_metadata_from_filename(pathname):
316 path, filename = os.path.split(pathname)
318 for file_type in FileType.objects.all():
319 result = file_type.parse_filename(filename)
320 if result is not None:
321 result['file_type'] = file_type
326 def get_relative_pathname(abspath):
327 """Strip off the result home directory from a path
329 result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
330 relative_pathname = abspath.replace(result_home_dir,'')
331 return relative_pathname
333 def get_absolute_pathname(relative_pathname):
334 """Attach relative path to results home directory"""
335 return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)