9 from django.conf import settings
10 from django.core.exceptions import ObjectDoesNotExist
11 from django.core import urlresolvers
12 from django.db import models
13 from django.db.models.signals import post_init
15 from htsworkflow.frontend.samples.models import Library
16 from htsworkflow.util.conversion import parse_flowcell_id
17 from htsworkflow.pipelines import runfolder
19 logger = logging.getLogger(__name__)
22 default_pM = int(settings.DEFAULT_PM)
24 logger.error("invalid value for frontend.default_pm")
26 RUN_STATUS_CHOICES = (
27 (0, 'Sequencer running'), ##Solexa Data Pipeline Not Yet Started'),
28 (1, 'Data Pipeline Started'),
29 (2, 'Data Pipeline Interrupted'),
30 (3, 'Data Pipeline Finished'),
31 (4, 'Collect Results Started'),
32 (5, 'Collect Results Finished'),
37 RUN_STATUS_REVERSE_MAP = dict(((v,k) for k,v in RUN_STATUS_CHOICES))
39 class ClusterStation(models.Model):
40 name = models.CharField(max_length=50, unique=True)
42 def __unicode__(self):
43 return unicode(self.name)
45 class Sequencer(models.Model):
46 name = models.CharField(max_length=50, unique=True)
48 def __unicode__(self):
49 return unicode(self.name)
51 class FlowCell(models.Model):
52 flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
53 run_date = models.DateTimeField()
54 advanced_run = models.BooleanField(default=False)
55 paired_end = models.BooleanField(default=False)
56 read_length = models.IntegerField(default=32) #Stanford is currenlty 25
57 control_lane = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(0,'All Lanes')], null=True, blank=True)
59 cluster_station = models.ForeignKey(ClusterStation, default=3)
60 sequencer = models.ForeignKey(Sequencer, default=1)
62 notes = models.TextField(blank=True)
64 def __unicode__(self):
65 return unicode(self.flowcell_id)
69 for lane in self.lane_set.all():
70 cluster_estimate = lane.cluster_estimate
71 if cluster_estimate is not None:
72 cluster_estimate = "%s k" % ((int(cluster_estimate)/1000), )
74 cluster_estimate = 'None'
75 library_id = lane.library_id
76 library = lane.library
77 element = '<tr><td>%d</td><td><a href="%s">%s</a></td><td>%s</td></tr>'
78 html.append(element % (lane.lane_number,
79 library.get_admin_url(),
82 html.append('</table>')
83 return "\n".join(html)
84 Lanes.allow_tags = True
87 ordering = ["-run_date"]
89 def get_admin_url(self):
90 # that's the django way... except it didn't work
91 return urlresolvers.reverse('admin:experiments_flowcell_change',
94 def flowcell_type(self):
96 Convert our boolean 'is paired' flag to a name
104 def get_absolute_url(self):
105 flowcell_id, status = parse_flowcell_id(self.flowcell_id)
106 return ('htsworkflow.frontend.experiments.views.flowcell_detail',
109 def get_raw_data_directory(self):
110 """Return location of where the raw data is stored"""
111 flowcell_id, status = parse_flowcell_id(self.flowcell_id)
113 return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
115 def update_data_runs(self):
116 result_root = self.get_raw_data_directory()
117 if result_root is None:
120 result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
121 run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
123 dataruns = self.datarun_set.all()
124 datarun_result_dirs = [ x.result_dir for x in dataruns ]
127 for dirpath, dirnames, filenames in os.walk(result_root):
128 for filename in filenames:
129 if run_xml_re.match(filename):
130 # we have a run directory
131 relative_pathname = get_relative_pathname(dirpath)
132 if relative_pathname not in datarun_result_dirs:
133 self.import_data_run(relative_pathname, filename)
135 def import_data_run(self, relative_pathname, run_xml_name):
136 """Given a result directory import files"""
137 run_dir = get_absolute_pathname(relative_pathname)
138 run_xml_path = os.path.join(run_dir, run_xml_name)
139 run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
143 run.status = RUN_STATUS_REVERSE_MAP['DONE']
144 run.result_dir = relative_pathname
145 run.runfolder_name = run_xml_data.runfolder_name
146 run.cycle_start = run_xml_data.image_analysis.start
147 run.cycle_stop = run_xml_data.image_analysis.stop
148 run.run_start_time = run_xml_data.image_analysis.date
150 run.last_update_time = datetime.datetime.now()
153 run.update_result_files()
156 # FIXME: should we automatically update dataruns?
157 # Or should we expect someone to call update_data_runs?
158 #def update_flowcell_dataruns(sender, instance, *args, **kwargs):
159 # """Update our dataruns
161 # if not os.path.exists(settings.RESULT_HOME_DIR):
164 # instance.update_data_runs()
165 #post_init.connect(update_flowcell_dataruns, sender=FlowCell)
169 LANE_STATUS_CODES = [(0, 'Failed'),
172 LANE_STATUS_MAP = dict((int(k),v) for k,v in LANE_STATUS_CODES )
173 LANE_STATUS_MAP[None] = "Unknown"
175 class Lane(models.Model):
176 flowcell = models.ForeignKey(FlowCell)
177 lane_number = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8)])
178 library = models.ForeignKey(Library)
179 pM = models.DecimalField(max_digits=5, decimal_places=2,blank=False, null=False,default=default_pM)
180 cluster_estimate = models.IntegerField(blank=True, null=True)
181 status = models.IntegerField(choices=LANE_STATUS_CODES, null=True, blank=True)
182 comment = models.TextField(null=True, blank=True)
185 def get_absolute_url(self):
186 flowcell_id, status = parse_flowcell_id(self.flowcell.flowcell_id)
187 return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
188 [str(flowcell_id), str(self.lane_number)])
191 ### -----------------------
192 class DataRun(models.Model):
193 flowcell = models.ForeignKey(FlowCell,verbose_name="Flowcell Id")
194 runfolder_name = models.CharField(max_length=50)
195 result_dir = models.CharField(max_length=255)
196 last_update_time = models.DateTimeField()
197 run_start_time = models.DateTimeField()
198 cycle_start = models.IntegerField(null=True, blank=True)
199 cycle_stop = models.IntegerField(null=True, blank=True)
200 run_status = models.IntegerField(choices=RUN_STATUS_CHOICES,
201 null=True, blank=True)
202 comment = models.TextField(blank=True)
204 def update_result_files(self):
205 abs_result_dir = get_absolute_pathname(self.result_dir)
207 for dirname, dirnames, filenames in os.walk(abs_result_dir):
208 for filename in filenames:
209 pathname = os.path.join(dirname, filename)
210 relative_pathname = get_relative_pathname(pathname)
211 datafiles = self.datafile_set.filter(
213 relative_pathname=relative_pathname)
214 if len(datafiles) > 0:
217 metadata = find_file_type_metadata_from_filename(filename)
218 if metadata is not None:
219 metadata['filename'] = filename
221 newfile.data_run = self
222 newfile.file_type = metadata['file_type']
223 newfile.relative_pathname = relative_pathname
225 lane_number = metadata.get('lane', None)
226 if lane_number is not None:
227 lane = self.flowcell.lane_set.get(lane_number = lane_number)
228 newfile.library = lane.library
230 self.datafile_set.add(newfile)
232 self.last_update_time = datetime.datetime.now()
234 def lane_files(self):
237 for datafile in self.datafile_set.all():
238 metadata = datafile.attributes
239 if metadata is not None:
240 lane = metadata.get('lane', None)
242 lane_file_set = lanes.setdefault(lane, {})
243 lane_file_set[datafile.file_type.normalized_name] = datafile
246 def ivc_plots(self, lane):
247 ivc_name = ['IVC All', 'IVC Call',
248 'IVC Percent Base', 'IVC Percent All', 'IVC Percent Call']
251 for rel_filename, metadata in self.get_result_files():
252 if metadata.file_type.name in ivc_name:
253 plots[metadata.file_type.name] = (rel_filename, metadata)
255 class FileType(models.Model):
256 """Represent potential file types
258 regex is a pattern used to detect if a filename matches this type
259 data run currently assumes that there may be a (?P<lane>) and
260 (?P<end>) pattern in the regular expression.
262 name = models.CharField(max_length=50)
263 mimetype = models.CharField(max_length=50, null=True, blank=True)
264 # regular expression from glob.fnmatch.translate
265 regex = models.CharField(max_length=50, null=True, blank=True)
267 def parse_filename(self, pathname):
268 """Does filename match our pattern?
270 Returns None if not, or dictionary of match variables if we do.
272 path, filename = os.path.split(pathname)
273 if len(self.regex) > 0:
274 match = re.match(self.regex, filename)
275 if match is not None:
276 # These are (?P<>) names we know about from our default regexes.
277 results = match.groupdict()
279 # convert int parameters
280 for attribute_name in ['lane', 'end']:
281 value = results.get(attribute_name, None)
282 if value is not None:
283 results[attribute_name] = int(value)
287 def _get_normalized_name(self):
288 """Crush data file name into identifier friendly name"""
289 return self.name.replace(' ', '_').lower()
290 normalized_name = property(_get_normalized_name)
292 def __unicode__(self):
293 #return u"<FileType: %s>" % (self.name,)
297 """Helper function to set default UUID in DataFile"""
298 return str(uuid.uuid1())
300 class DataFile(models.Model):
301 """Store map from random ID to filename"""
302 random_key = models.CharField(max_length=64,
305 data_run = models.ForeignKey(DataRun, db_index=True)
306 library = models.ForeignKey(Library, db_index=True, null=True, blank=True)
307 file_type = models.ForeignKey(FileType)
308 relative_pathname = models.CharField(max_length=255, db_index=True)
310 def _get_attributes(self):
311 return self.file_type.parse_filename(self.relative_pathname)
312 attributes = property(_get_attributes)
314 def _get_pathname(self):
315 return get_absolute_pathname(self.relative_pathname)
316 pathname = property(_get_pathname)
319 def get_absolute_url(self):
320 return ('htsworkflow.frontend.experiments.views.read_result_file',
321 (), {'key': self.random_key })
323 def find_file_type_metadata_from_filename(pathname):
324 path, filename = os.path.split(pathname)
326 for file_type in FileType.objects.all():
327 result = file_type.parse_filename(filename)
328 if result is not None:
329 result['file_type'] = file_type
334 def get_relative_pathname(abspath):
335 """Strip off the result home directory from a path
337 result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
338 relative_pathname = abspath.replace(result_home_dir,'')
339 return relative_pathname
341 def get_absolute_pathname(relative_pathname):
342 """Attach relative path to results home directory"""
343 return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)