9 from django.conf import settings
10 from django.core.exceptions import ObjectDoesNotExist
11 from django.core import urlresolvers
12 from django.db import models
13 from django.db.models.signals import post_init
15 from htsworkflow.frontend.samples.models import Library
16 from htsworkflow.frontend.samples.results import parse_flowcell_id
17 from htsworkflow.pipelines import runfolder
19 logger = logging.getLogger(__name__)
22 default_pM = int(settings.DEFAULT_PM)
24 logger.error("invalid value for frontend.default_pm")
26 RUN_STATUS_CHOICES = (
27 (0, 'Sequencer running'), ##Solexa Data Pipeline Not Yet Started'),
28 (1, 'Data Pipeline Started'),
29 (2, 'Data Pipeline Interrupted'),
30 (3, 'Data Pipeline Finished'),
31 (4, 'Collect Results Started'),
32 (5, 'Collect Results Finished'),
37 RUN_STATUS_REVERSE_MAP = dict(((v,k) for k,v in RUN_STATUS_CHOICES))
39 class ClusterStation(models.Model):
40 name = models.CharField(max_length=50, unique=True)
42 def __unicode__(self):
43 return unicode(self.name)
45 class Sequencer(models.Model):
46 name = models.CharField(max_length=50, unique=True)
48 def __unicode__(self):
49 return unicode(self.name)
51 class FlowCell(models.Model):
52 flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
53 run_date = models.DateTimeField()
54 advanced_run = models.BooleanField(default=False)
55 paired_end = models.BooleanField(default=False)
56 read_length = models.IntegerField(default=32) #Stanford is currenlty 25
57 control_lane = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(0,'All Lanes')], null=True, blank=True)
59 cluster_station = models.ForeignKey(ClusterStation, default=3)
60 sequencer = models.ForeignKey(Sequencer, default=1)
62 notes = models.TextField(blank=True)
64 def __unicode__(self):
65 return unicode(self.flowcell_id)
69 for lane in self.lane_set.all():
70 cluster_estimate = lane.cluster_estimate
71 if cluster_estimate is not None:
72 cluster_estimate = "%s k" % ((int(cluster_estimate)/1000), )
74 cluster_estimate = 'None'
75 library_id = lane.library_id
76 library = lane.library
77 element = '<tr><td>%d</td><td><a href="%s">%s</a></td><td>%s</td></tr>'
78 html.append(element % (lane.lane_number,
79 library.get_admin_url(),
82 html.append('</table>')
83 return "\n".join(html)
84 Lanes.allow_tags = True
87 ordering = ["-run_date"]
89 def get_admin_url(self):
90 # that's the django way... except it didn't work
91 return urlresolvers.reverse('admin:experiments_flowcell_change',
94 def flowcell_type(self):
96 Convert our boolean 'is paired' flag to a name
104 def get_absolute_url(self):
105 return ('htsworkflow.frontend.experiments.views.flowcell_detail',
106 [str(self.flowcell_id)])
108 def get_raw_data_directory(self):
109 """Return location of where the raw data is stored"""
110 flowcell_id, status = parse_flowcell_id(self.flowcell_id)
112 return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
114 def update_data_runs(self):
115 result_root = self.get_raw_data_directory()
116 if result_root is None:
119 result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
120 run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
122 dataruns = self.datarun_set.all()
123 datarun_result_dirs = [ x.result_dir for x in dataruns ]
126 for dirpath, dirnames, filenames in os.walk(result_root):
127 for filename in filenames:
128 if run_xml_re.match(filename):
129 # we have a run directory
130 relative_pathname = get_relative_pathname(dirpath)
131 if relative_pathname not in datarun_result_dirs:
132 self.import_data_run(relative_pathname, filename)
134 def import_data_run(self, relative_pathname, run_xml_name):
135 """Given a result directory import files"""
136 run_dir = get_absolute_pathname(relative_pathname)
137 run_xml_path = os.path.join(run_dir, run_xml_name)
138 run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
142 run.status = RUN_STATUS_REVERSE_MAP['DONE']
143 run.result_dir = relative_pathname
144 run.runfolder_name = run_xml_data.runfolder_name
145 run.cycle_start = run_xml_data.image_analysis.start
146 run.cycle_stop = run_xml_data.image_analysis.stop
147 run.run_start_time = run_xml_data.image_analysis.date
149 run.last_update_time = datetime.datetime.now()
152 run.update_result_files()
155 # FIXME: should we automatically update dataruns?
156 # Or should we expect someone to call update_data_runs?
157 #def update_flowcell_dataruns(sender, instance, *args, **kwargs):
158 # """Update our dataruns
160 # if not os.path.exists(settings.RESULT_HOME_DIR):
163 # instance.update_data_runs()
164 #post_init.connect(update_flowcell_dataruns, sender=FlowCell)
168 LANE_STATUS_CODES = [(0, 'Failed'),
171 LANE_STATUS_MAP = dict((int(k),v) for k,v in LANE_STATUS_CODES )
172 LANE_STATUS_MAP[None] = "Unknown"
174 class Lane(models.Model):
175 flowcell = models.ForeignKey(FlowCell)
176 lane_number = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8)])
177 library = models.ForeignKey(Library)
178 pM = models.DecimalField(max_digits=5, decimal_places=2,blank=False, null=False,default=default_pM)
179 cluster_estimate = models.IntegerField(blank=True, null=True)
180 status = models.IntegerField(choices=LANE_STATUS_CODES, null=True, blank=True)
181 comment = models.TextField(null=True, blank=True)
184 def get_absolute_url(self):
185 return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
186 [str(self.flowcell.flowcell_id), str(self.lane_number)])
189 ### -----------------------
190 class DataRun(models.Model):
191 flowcell = models.ForeignKey(FlowCell,verbose_name="Flowcell Id")
192 runfolder_name = models.CharField(max_length=50)
193 result_dir = models.CharField(max_length=255)
194 last_update_time = models.DateTimeField()
195 run_start_time = models.DateTimeField()
196 cycle_start = models.IntegerField(null=True, blank=True)
197 cycle_stop = models.IntegerField(null=True, blank=True)
198 run_status = models.IntegerField(choices=RUN_STATUS_CHOICES,
199 null=True, blank=True)
200 comment = models.TextField(blank=True)
202 def update_result_files(self):
203 abs_result_dir = get_absolute_pathname(self.result_dir)
205 for dirname, dirnames, filenames in os.walk(abs_result_dir):
206 for filename in filenames:
207 pathname = os.path.join(dirname, filename)
208 relative_pathname = get_relative_pathname(pathname)
209 datafiles = self.datafile_set.filter(
211 relative_pathname=relative_pathname)
212 if len(datafiles) > 0:
215 metadata = find_file_type_metadata_from_filename(filename)
216 if metadata is not None:
217 metadata['filename'] = filename
219 newfile.data_run = self
220 newfile.file_type = metadata['file_type']
221 newfile.relative_pathname = relative_pathname
223 lane_number = metadata.get('lane', None)
224 if lane_number is not None:
225 lane = self.flowcell.lane_set.get(lane_number = lane_number)
226 newfile.library = lane.library
228 self.datafile_set.add(newfile)
230 self.last_update_time = datetime.datetime.now()
232 def lane_files(self):
235 for datafile in self.datafile_set.all():
236 metadata = datafile.attributes
237 if metadata is not None:
238 lane = metadata.get('lane', None)
240 lane_file_set = lanes.setdefault(lane, {})
241 lane_file_set[datafile.file_type.normalized_name] = datafile
244 def ivc_plots(self, lane):
245 ivc_name = ['IVC All', 'IVC Call',
246 'IVC Percent Base', 'IVC Percent All', 'IVC Percent Call']
249 for rel_filename, metadata in self.get_result_files():
250 if metadata.file_type.name in ivc_name:
251 plots[metadata.file_type.name] = (rel_filename, metadata)
253 class FileType(models.Model):
254 """Represent potential file types
256 regex is a pattern used to detect if a filename matches this type
257 data run currently assumes that there may be a (?P<lane>) and
258 (?P<end>) pattern in the regular expression.
260 name = models.CharField(max_length=50)
261 mimetype = models.CharField(max_length=50, null=True, blank=True)
262 # regular expression from glob.fnmatch.translate
263 regex = models.CharField(max_length=50, null=True, blank=True)
265 def parse_filename(self, pathname):
266 """Does filename match our pattern?
268 Returns None if not, or dictionary of match variables if we do.
270 path, filename = os.path.split(pathname)
271 if len(self.regex) > 0:
272 match = re.match(self.regex, filename)
273 if match is not None:
274 # These are (?P<>) names we know about from our default regexes.
275 results = match.groupdict()
277 # convert int parameters
278 for attribute_name in ['lane', 'end']:
279 value = results.get(attribute_name, None)
280 if value is not None:
281 results[attribute_name] = int(value)
285 def _get_normalized_name(self):
286 """Crush data file name into identifier friendly name"""
287 return self.name.replace(' ', '_').lower()
288 normalized_name = property(_get_normalized_name)
290 def __unicode__(self):
291 #return u"<FileType: %s>" % (self.name,)
295 """Helper function to set default UUID in DataFile"""
296 return str(uuid.uuid1())
298 class DataFile(models.Model):
299 """Store map from random ID to filename"""
300 random_key = models.CharField(max_length=64,
303 data_run = models.ForeignKey(DataRun, db_index=True)
304 library = models.ForeignKey(Library, db_index=True, null=True, blank=True)
305 file_type = models.ForeignKey(FileType)
306 relative_pathname = models.CharField(max_length=255, db_index=True)
308 def _get_attributes(self):
309 return self.file_type.parse_filename(self.relative_pathname)
310 attributes = property(_get_attributes)
312 def _get_pathname(self):
313 return get_absolute_pathname(self.relative_pathname)
314 pathname = property(_get_pathname)
317 def get_absolute_url(self):
318 return ('htsworkflow.frontend.experiments.views.read_result_file',
319 (), {'key': self.random_key })
321 def find_file_type_metadata_from_filename(pathname):
322 path, filename = os.path.split(pathname)
324 for file_type in FileType.objects.all():
325 result = file_type.parse_filename(filename)
326 if result is not None:
327 result['file_type'] = file_type
332 def get_relative_pathname(abspath):
333 """Strip off the result home directory from a path
335 result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
336 relative_pathname = abspath.replace(result_home_dir,'')
337 return relative_pathname
339 def get_absolute_pathname(relative_pathname):
340 """Attach relative path to results home directory"""
341 return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)