9 from django.conf import settings
10 from django.core.exceptions import ObjectDoesNotExist
11 from django.core import urlresolvers
12 from django.db import models
13 from django.db.models.signals import post_init
15 from htsworkflow.frontend.samples.models import Library
16 from htsworkflow.util.conversion import parse_flowcell_id
17 from htsworkflow.pipelines import runfolder
19 logger = logging.getLogger(__name__)
22 default_pM = int(settings.DEFAULT_PM)
24 logger.error("invalid value for frontend.default_pm")
26 # how many days to wait before trying to re-import a runfolder
29 RESCAN_DELAY = int(settings.RESCAN_DELAY)
30 except (ValueError, AttributeError):
31 logger.error("Missing or invalid settings.RESCAN_DELAY, "\
32 "defaulting to %s" % (RESCAN_DELAY,))
34 RUN_STATUS_CHOICES = (
35 (0, 'Sequencer running'), ##Solexa Data Pipeline Not Yet Started'),
36 (1, 'Data Pipeline Started'),
37 (2, 'Data Pipeline Interrupted'),
38 (3, 'Data Pipeline Finished'),
39 (4, 'Collect Results Started'),
40 (5, 'Collect Results Finished'),
45 RUN_STATUS_REVERSE_MAP = dict(((v,k) for k,v in RUN_STATUS_CHOICES))
47 class ClusterStation(models.Model):
48 name = models.CharField(max_length=50, unique=True)
50 def __unicode__(self):
51 return unicode(self.name)
53 class Sequencer(models.Model):
54 name = models.CharField(max_length=50, db_index=True)
55 instrument_name = models.CharField(max_length=50, db_index=True)
56 serial_number = models.CharField(max_length=50, db_index=True)
57 model = models.CharField(max_length=255)
58 comment = models.CharField(max_length=255)
60 def __unicode__(self):
61 name = [unicode(self.name)]
62 if self.instrument_name is not None:
63 name.append("(%s)" % (unicode(self.instrument_name),))
67 def get_absolute_url(self):
68 return ('htsworkflow.frontend.experiments.views.sequencer',
72 class FlowCell(models.Model):
73 flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
74 run_date = models.DateTimeField()
75 advanced_run = models.BooleanField(default=False)
76 paired_end = models.BooleanField(default=False)
77 read_length = models.IntegerField(default=32) #Stanford is currenlty 25
78 control_lane = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(0,'All Lanes')], null=True, blank=True)
80 cluster_station = models.ForeignKey(ClusterStation, default=3)
81 sequencer = models.ForeignKey(Sequencer, default=1)
83 notes = models.TextField(blank=True)
85 def __unicode__(self):
86 return unicode(self.flowcell_id)
90 for lane in self.lane_set.order_by('lane_number'):
91 cluster_estimate = lane.cluster_estimate
92 if cluster_estimate is not None:
93 cluster_estimate = "%s k" % ((int(cluster_estimate)/1000), )
95 cluster_estimate = 'None'
96 library_id = lane.library_id
97 library = lane.library
98 element = '<tr><td>%d</td><td><a href="%s">%s</a></td><td>%s</td></tr>'
99 html.append(element % (lane.lane_number,
100 library.get_admin_url(),
103 html.append('</table>')
104 return "\n".join(html)
105 Lanes.allow_tags = True
108 ordering = ["-run_date"]
110 def get_admin_url(self):
111 # that's the django way... except it didn't work
112 return urlresolvers.reverse('admin:experiments_flowcell_change',
115 def flowcell_type(self):
117 Convert our boolean 'is paired' flag to a name
125 def get_absolute_url(self):
126 flowcell_id, status = parse_flowcell_id(self.flowcell_id)
127 return ('htsworkflow.frontend.experiments.views.flowcell_detail',
130 def get_raw_data_directory(self):
131 """Return location of where the raw data is stored"""
132 flowcell_id, status = parse_flowcell_id(self.flowcell_id)
134 return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
136 def update_data_runs(self):
137 result_root = self.get_raw_data_directory()
138 logger.debug("Update data runs flowcell root: %s" % (result_root,))
139 if result_root is None:
142 result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
143 run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
145 dataruns = dict([ (x.result_dir, x) for x in self.datarun_set.all() ])
148 for dirpath, dirnames, filenames in os.walk(result_root):
149 for filename in filenames:
150 if run_xml_re.match(filename):
151 # we have a run directory
152 relative_pathname = get_relative_pathname(dirpath)
153 cached_run = dataruns.get(relative_pathname, None)
154 now = datetime.datetime.now()
155 if (cached_run is None):
156 self.import_data_run(relative_pathname, filename)
157 elif (now - cached_run.last_update_time).days > RESCAN_DELAY:
158 self.import_data_run(relative_pathname,
159 filename, cached_run)
161 def import_data_run(self, relative_pathname, run_xml_name, run=None):
162 """Given a result directory import files"""
163 run_dir = get_absolute_pathname(relative_pathname)
164 run_xml_path = os.path.join(run_dir, run_xml_name)
165 run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
166 logger.debug("Importing run from %s" % (relative_pathname,))
171 run.status = RUN_STATUS_REVERSE_MAP['DONE']
172 run.result_dir = relative_pathname
173 run.runfolder_name = run_xml_data.runfolder_name
174 run.cycle_start = run_xml_data.image_analysis.start
175 run.cycle_stop = run_xml_data.image_analysis.stop
176 run.run_start_time = run_xml_data.image_analysis.date
178 run.last_update_time = datetime.datetime.now()
181 run.update_result_files()
184 # FIXME: should we automatically update dataruns?
185 # Or should we expect someone to call update_data_runs?
186 #def update_flowcell_dataruns(sender, instance, *args, **kwargs):
187 # """Update our dataruns
189 # if not os.path.exists(settings.RESULT_HOME_DIR):
192 # instance.update_data_runs()
193 #post_init.connect(update_flowcell_dataruns, sender=FlowCell)
197 LANE_STATUS_CODES = [(0, 'Failed'),
200 LANE_STATUS_MAP = dict((int(k),v) for k,v in LANE_STATUS_CODES )
201 LANE_STATUS_MAP[None] = "Unknown"
203 def is_valid_lane(value):
204 if value >= 1 and value <= 8:
209 class Lane(models.Model):
210 flowcell = models.ForeignKey(FlowCell)
211 lane_number = models.IntegerField()
212 library = models.ForeignKey(Library)
213 pM = models.DecimalField(max_digits=5, decimal_places=2,blank=False, null=False,default=default_pM)
214 cluster_estimate = models.IntegerField(blank=True, null=True)
215 status = models.IntegerField(choices=LANE_STATUS_CODES, null=True, blank=True)
216 comment = models.TextField(null=True, blank=True)
219 def get_absolute_url(self):
220 return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
223 def __unicode__(self):
224 return self.flowcell.flowcell_id + ':' + unicode(self.lane_number)
226 ### -----------------------
227 class DataRun(models.Model):
228 flowcell = models.ForeignKey(FlowCell,verbose_name="Flowcell Id")
229 runfolder_name = models.CharField(max_length=50)
230 result_dir = models.CharField(max_length=255)
231 last_update_time = models.DateTimeField()
232 run_start_time = models.DateTimeField()
233 cycle_start = models.IntegerField(null=True, blank=True)
234 cycle_stop = models.IntegerField(null=True, blank=True)
235 run_status = models.IntegerField(choices=RUN_STATUS_CHOICES,
236 null=True, blank=True)
237 comment = models.TextField(blank=True)
239 def update_result_files(self):
240 abs_result_dir = get_absolute_pathname(self.result_dir)
242 for dirname, dirnames, filenames in os.walk(abs_result_dir):
243 for filename in filenames:
244 pathname = os.path.join(dirname, filename)
245 relative_pathname = get_relative_pathname(pathname)
246 datafiles = self.datafile_set.filter(
248 relative_pathname=relative_pathname)
249 if len(datafiles) > 0:
252 metadata = find_file_type_metadata_from_filename(filename)
253 if metadata is not None:
254 metadata['filename'] = filename
256 newfile.data_run = self
257 newfile.file_type = metadata['file_type']
258 newfile.relative_pathname = relative_pathname
260 lane_number = metadata.get('lane', None)
261 if lane_number is not None:
262 lane = self.flowcell.lane_set.get(lane_number = lane_number)
263 newfile.library = lane.library
265 self.datafile_set.add(newfile)
267 self.last_update_time = datetime.datetime.now()
269 def lane_files(self):
272 for datafile in self.datafile_set.all():
273 metadata = datafile.attributes
274 if metadata is not None:
275 lane = metadata.get('lane', None)
277 lane_file_set = lanes.setdefault(lane, {})
278 lane_file_set[datafile.file_type.normalized_name] = datafile
281 def ivc_plots(self, lane):
282 ivc_name = ['IVC All', 'IVC Call',
283 'IVC Percent Base', 'IVC Percent All', 'IVC Percent Call']
286 for rel_filename, metadata in self.get_result_files():
287 if metadata.file_type.name in ivc_name:
288 plots[metadata.file_type.name] = (rel_filename, metadata)
290 class FileType(models.Model):
291 """Represent potential file types
293 regex is a pattern used to detect if a filename matches this type
294 data run currently assumes that there may be a (?P<lane>) and
295 (?P<end>) pattern in the regular expression.
297 name = models.CharField(max_length=50)
298 mimetype = models.CharField(max_length=50, null=True, blank=True)
299 # regular expression from glob.fnmatch.translate
300 regex = models.CharField(max_length=50, null=True, blank=True)
302 def parse_filename(self, pathname):
303 """Does filename match our pattern?
305 Returns None if not, or dictionary of match variables if we do.
307 path, filename = os.path.split(pathname)
308 if len(self.regex) > 0:
309 match = re.match(self.regex, filename)
310 if match is not None:
311 # These are (?P<>) names we know about from our default regexes.
312 results = match.groupdict()
314 # convert int parameters
315 for attribute_name in ['lane', 'end']:
316 value = results.get(attribute_name, None)
317 if value is not None:
318 results[attribute_name] = int(value)
322 def _get_normalized_name(self):
323 """Crush data file name into identifier friendly name"""
324 return self.name.replace(' ', '_').lower()
325 normalized_name = property(_get_normalized_name)
327 def __unicode__(self):
328 #return u"<FileType: %s>" % (self.name,)
332 """Helper function to set default UUID in DataFile"""
333 return str(uuid.uuid1())
335 class DataFile(models.Model):
336 """Store map from random ID to filename"""
337 random_key = models.CharField(max_length=64,
340 data_run = models.ForeignKey(DataRun, db_index=True)
341 library = models.ForeignKey(Library, db_index=True, null=True, blank=True)
342 file_type = models.ForeignKey(FileType)
343 relative_pathname = models.CharField(max_length=255, db_index=True)
345 def _get_attributes(self):
346 return self.file_type.parse_filename(self.relative_pathname)
347 attributes = property(_get_attributes)
349 def _get_pathname(self):
350 return get_absolute_pathname(self.relative_pathname)
351 pathname = property(_get_pathname)
354 def get_absolute_url(self):
355 return ('htsworkflow.frontend.experiments.views.read_result_file',
356 (), {'key': self.random_key })
358 def find_file_type_metadata_from_filename(pathname):
359 path, filename = os.path.split(pathname)
361 for file_type in FileType.objects.all():
362 result = file_type.parse_filename(filename)
363 if result is not None:
364 result['file_type'] = file_type
369 def get_relative_pathname(abspath):
370 """Strip off the result home directory from a path
372 result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
373 relative_pathname = abspath.replace(result_home_dir,'')
374 return relative_pathname
376 def get_absolute_pathname(relative_pathname):
377 """Attach relative path to results home directory"""
378 return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)