9 from django.conf import settings
10 from django.core.exceptions import ObjectDoesNotExist
11 from django.core import urlresolvers
12 from django.db import models
13 from django.db.models.signals import post_init
15 from htsworkflow.frontend.samples.models import Library
16 from htsworkflow.frontend.samples.results import parse_flowcell_id
17 from htsworkflow.pipelines import runfolder
19 logger = logging.getLogger(__name__)
22 default_pM = int(settings.DEFAULT_PM)
24 logger.error("invalid value for frontend.default_pm")
26 RUN_STATUS_CHOICES = (
27 (0, 'Sequencer running'), ##Solexa Data Pipeline Not Yet Started'),
28 (1, 'Data Pipeline Started'),
29 (2, 'Data Pipeline Interrupted'),
30 (3, 'Data Pipeline Finished'),
31 (4, 'Collect Results Started'),
32 (5, 'Collect Results Finished'),
37 RUN_STATUS_REVERSE_MAP = dict(((v,k) for k,v in RUN_STATUS_CHOICES))
39 class ClusterStation(models.Model):
40 name = models.CharField(max_length=50, unique=True)
42 def __unicode__(self):
43 return unicode(self.name)
45 class Sequencer(models.Model):
46 name = models.CharField(max_length=50, unique=True)
48 def __unicode__(self):
49 return unicode(self.name)
51 class FlowCell(models.Model):
52 flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
53 run_date = models.DateTimeField()
54 advanced_run = models.BooleanField(default=False)
55 paired_end = models.BooleanField(default=False)
56 read_length = models.IntegerField(default=32) #Stanford is currenlty 25
57 control_lane = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(0,'All Lanes')], null=True, blank=True)
59 cluster_station = models.ForeignKey(ClusterStation, default=3)
60 sequencer = models.ForeignKey(Sequencer, default=1)
62 notes = models.TextField(blank=True)
64 def __unicode__(self):
65 return unicode(self.flowcell_id)
69 for lane in self.lane_set.all():
70 cluster_estimate = lane.cluster_estimate
71 if cluster_estimate is not None:
72 cluster_estimate = "%s k" % ((int(cluster_estimate)/1000), )
74 cluster_estimate = 'None'
75 library_id = lane.library_id
76 library = lane.library
77 element = '<tr><td>%d</td><td><a href="%s">%s</a></td><td>%s</td></tr>'
78 html.append(element % (lane.lane_number,
79 library.get_admin_url(),
82 html.append('</table>')
83 return "\n".join(html)
84 Lanes.allow_tags = True
87 ordering = ["-run_date"]
89 def get_admin_url(self):
90 # that's the django way... except it didn't work
91 return urlresolvers.reverse('admin:experiments_flowcell_change',
94 def flowcell_type(self):
96 Convert our boolean 'is paired' flag to a name
104 def get_absolute_url(self):
105 flowcell_id, status = parse_flowcell_id(self.flowcell_id)
106 return ('htsworkflow.frontend.experiments.views.flowcell_detail',
109 def get_raw_data_directory(self):
110 """Return location of where the raw data is stored"""
111 flowcell_id, status = parse_flowcell_id(self.flowcell_id)
113 return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
115 def update_data_runs(self):
116 result_root = self.get_raw_data_directory()
117 if result_root is None:
120 result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
121 run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
123 dataruns = self.datarun_set.all()
124 datarun_result_dirs = [ x.result_dir for x in dataruns ]
127 for dirpath, dirnames, filenames in os.walk(result_root):
128 for filename in filenames:
129 if run_xml_re.match(filename):
130 # we have a run directory
131 relative_pathname = get_relative_pathname(dirpath)
132 if relative_pathname not in datarun_result_dirs:
133 self.import_data_run(relative_pathname, filename)
135 def import_data_run(self, relative_pathname, run_xml_name):
136 """Given a result directory import files"""
137 run_dir = get_absolute_pathname(relative_pathname)
138 run_xml_path = os.path.join(run_dir, run_xml_name)
139 run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
143 run.status = RUN_STATUS_REVERSE_MAP['DONE']
144 run.result_dir = relative_pathname
145 run.runfolder_name = run_xml_data.runfolder_name
146 run.cycle_start = run_xml_data.image_analysis.start
147 run.cycle_stop = run_xml_data.image_analysis.stop
148 run.run_start_time = run_xml_data.image_analysis.date
150 run.last_update_time = datetime.datetime.now()
153 run.update_result_files()
156 # FIXME: should we automatically update dataruns?
157 # Or should we expect someone to call update_data_runs?
158 #def update_flowcell_dataruns(sender, instance, *args, **kwargs):
159 # """Update our dataruns
161 # if not os.path.exists(settings.RESULT_HOME_DIR):
164 # instance.update_data_runs()
165 #post_init.connect(update_flowcell_dataruns, sender=FlowCell)
169 LANE_STATUS_CODES = [(0, 'Failed'),
172 LANE_STATUS_MAP = dict((int(k),v) for k,v in LANE_STATUS_CODES )
173 LANE_STATUS_MAP[None] = "Unknown"
175 class Lane(models.Model):
176 flowcell = models.ForeignKey(FlowCell)
177 lane_number = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8)])
178 library = models.ForeignKey(Library)
179 pM = models.DecimalField(max_digits=5, decimal_places=2,blank=False, null=False,default=default_pM)
180 cluster_estimate = models.IntegerField(blank=True, null=True)
181 status = models.IntegerField(choices=LANE_STATUS_CODES, null=True, blank=True)
182 comment = models.TextField(null=True, blank=True)
185 def get_absolute_url(self):
186 return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
187 [str(self.flowcell.flowcell_id), str(self.lane_number)])
190 ### -----------------------
191 class DataRun(models.Model):
192 flowcell = models.ForeignKey(FlowCell,verbose_name="Flowcell Id")
193 runfolder_name = models.CharField(max_length=50)
194 result_dir = models.CharField(max_length=255)
195 last_update_time = models.DateTimeField()
196 run_start_time = models.DateTimeField()
197 cycle_start = models.IntegerField(null=True, blank=True)
198 cycle_stop = models.IntegerField(null=True, blank=True)
199 run_status = models.IntegerField(choices=RUN_STATUS_CHOICES,
200 null=True, blank=True)
201 comment = models.TextField(blank=True)
203 def update_result_files(self):
204 abs_result_dir = get_absolute_pathname(self.result_dir)
206 for dirname, dirnames, filenames in os.walk(abs_result_dir):
207 for filename in filenames:
208 pathname = os.path.join(dirname, filename)
209 relative_pathname = get_relative_pathname(pathname)
210 datafiles = self.datafile_set.filter(
212 relative_pathname=relative_pathname)
213 if len(datafiles) > 0:
216 metadata = find_file_type_metadata_from_filename(filename)
217 if metadata is not None:
218 metadata['filename'] = filename
220 newfile.data_run = self
221 newfile.file_type = metadata['file_type']
222 newfile.relative_pathname = relative_pathname
224 lane_number = metadata.get('lane', None)
225 if lane_number is not None:
226 lane = self.flowcell.lane_set.get(lane_number = lane_number)
227 newfile.library = lane.library
229 self.datafile_set.add(newfile)
231 self.last_update_time = datetime.datetime.now()
233 def lane_files(self):
236 for datafile in self.datafile_set.all():
237 metadata = datafile.attributes
238 if metadata is not None:
239 lane = metadata.get('lane', None)
241 lane_file_set = lanes.setdefault(lane, {})
242 lane_file_set[datafile.file_type.normalized_name] = datafile
245 def ivc_plots(self, lane):
246 ivc_name = ['IVC All', 'IVC Call',
247 'IVC Percent Base', 'IVC Percent All', 'IVC Percent Call']
250 for rel_filename, metadata in self.get_result_files():
251 if metadata.file_type.name in ivc_name:
252 plots[metadata.file_type.name] = (rel_filename, metadata)
254 class FileType(models.Model):
255 """Represent potential file types
257 regex is a pattern used to detect if a filename matches this type
258 data run currently assumes that there may be a (?P<lane>) and
259 (?P<end>) pattern in the regular expression.
261 name = models.CharField(max_length=50)
262 mimetype = models.CharField(max_length=50, null=True, blank=True)
263 # regular expression from glob.fnmatch.translate
264 regex = models.CharField(max_length=50, null=True, blank=True)
266 def parse_filename(self, pathname):
267 """Does filename match our pattern?
269 Returns None if not, or dictionary of match variables if we do.
271 path, filename = os.path.split(pathname)
272 if len(self.regex) > 0:
273 match = re.match(self.regex, filename)
274 if match is not None:
275 # These are (?P<>) names we know about from our default regexes.
276 results = match.groupdict()
278 # convert int parameters
279 for attribute_name in ['lane', 'end']:
280 value = results.get(attribute_name, None)
281 if value is not None:
282 results[attribute_name] = int(value)
286 def _get_normalized_name(self):
287 """Crush data file name into identifier friendly name"""
288 return self.name.replace(' ', '_').lower()
289 normalized_name = property(_get_normalized_name)
291 def __unicode__(self):
292 #return u"<FileType: %s>" % (self.name,)
296 """Helper function to set default UUID in DataFile"""
297 return str(uuid.uuid1())
299 class DataFile(models.Model):
300 """Store map from random ID to filename"""
301 random_key = models.CharField(max_length=64,
304 data_run = models.ForeignKey(DataRun, db_index=True)
305 library = models.ForeignKey(Library, db_index=True, null=True, blank=True)
306 file_type = models.ForeignKey(FileType)
307 relative_pathname = models.CharField(max_length=255, db_index=True)
309 def _get_attributes(self):
310 return self.file_type.parse_filename(self.relative_pathname)
311 attributes = property(_get_attributes)
313 def _get_pathname(self):
314 return get_absolute_pathname(self.relative_pathname)
315 pathname = property(_get_pathname)
318 def get_absolute_url(self):
319 return ('htsworkflow.frontend.experiments.views.read_result_file',
320 (), {'key': self.random_key })
322 def find_file_type_metadata_from_filename(pathname):
323 path, filename = os.path.split(pathname)
325 for file_type in FileType.objects.all():
326 result = file_type.parse_filename(filename)
327 if result is not None:
328 result['file_type'] = file_type
333 def get_relative_pathname(abspath):
334 """Strip off the result home directory from a path
336 result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
337 relative_pathname = abspath.replace(result_home_dir,'')
338 return relative_pathname
340 def get_absolute_pathname(relative_pathname):
341 """Attach relative path to results home directory"""
342 return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)