From 0a3ba3b6bc269dfd15b9e336e640492b65add6cc Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Tue, 26 Jun 2012 14:27:17 -0700 Subject: [PATCH] reformatting to be more pep8 like --- htsworkflow/frontend/bcmagic/utils.py | 17 +- htsworkflow/frontend/experiments/models.py | 376 +++++++++++---------- 2 files changed, 212 insertions(+), 181 deletions(-) diff --git a/htsworkflow/frontend/bcmagic/utils.py b/htsworkflow/frontend/bcmagic/utils.py index 78919b0..6ffc99f 100644 --- a/htsworkflow/frontend/bcmagic/utils.py +++ b/htsworkflow/frontend/bcmagic/utils.py @@ -5,7 +5,7 @@ import socket import StringIO -def print_zpl(zpl_text, host=settings.BCPRINTER_PRINTER1_HOST): +def print_zpl(zpl_text, host): """ Sends zpl_text to printer """ @@ -13,20 +13,20 @@ def print_zpl(zpl_text, host=settings.BCPRINTER_PRINTER1_HOST): ftp.login() ftp.storlines("STOR printme.txt", StringIO.StringIO(zpl_text)) ftp.quit() - -def print_zpl_socket(zpl_text, host=settings.BCPRINTER_PRINTER1_HOST, port=settings.BCPRINTER_PRINTER1_PORT): + +def print_zpl_socket(zpl_text, host, port): """ Sends zpl_text to printer via a socket - + if zpl_text is a list of zpl_texts, it will print each one in that list. """ - + # Process anyway if zpl_text is a list. if type(zpl_text) is list: zpl_text = '\n'.join(zpl_text) - + s = socket.socket() # PORT 9100 is default for Zebra tabletop/desktop printers # PORT 6101 is default for Zebra mobile printers @@ -34,19 +34,20 @@ def print_zpl_socket(zpl_text, host=settings.BCPRINTER_PRINTER1_HOST, port=setti s.sendall(zpl_text) s.close() + def report_error(message): """ Return a dictionary with a command to display 'message' """ return {'mode': 'Error', 'status': message} - + def redirect_to_url(url): """ Return a bcm dictionary with a command to redirect to 'url' """ return {'mode': 'redirect', 'url': url} - + def autofill(field, value): """ diff --git a/htsworkflow/frontend/experiments/models.py b/htsworkflow/frontend/experiments/models.py index 25707f3..044dee6 100644 --- a/htsworkflow/frontend/experiments/models.py +++ b/htsworkflow/frontend/experiments/models.py @@ -16,23 +16,23 @@ from htsworkflow.frontend.samples.models import Library from htsworkflow.util.conversion import parse_flowcell_id from htsworkflow.pipelines import runfolder -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) default_pM = 5 try: default_pM = int(settings.DEFAULT_PM) -except ValueError,e: - logger.error("invalid value for frontend.default_pm") +except ValueError, e: + LOGGER.error("invalid value for frontend.default_pm") # how many days to wait before trying to re-import a runfolder RESCAN_DELAY = 1 try: RESCAN_DELAY = int(settings.RESCAN_DELAY) except (ValueError, AttributeError): - logger.error("Missing or invalid settings.RESCAN_DELAY, "\ + LOGGER.error("Missing or invalid settings.RESCAN_DELAY, "\ "defaulting to %s" % (RESCAN_DELAY,)) RUN_STATUS_CHOICES = ( - (0, 'Sequencer running'), ##Solexa Data Pipeline Not Yet Started'), + (0, 'Sequencer running'), # Solexa Data Pipeline Not Yet Started'), (1, 'Data Pipeline Started'), (2, 'Data Pipeline Interrupted'), (3, 'Data Pipeline Finished'), @@ -42,154 +42,169 @@ RUN_STATUS_CHOICES = ( (7, 'QC Finished'), (255, 'DONE'), ) -RUN_STATUS_REVERSE_MAP = dict(((v,k) for k,v in RUN_STATUS_CHOICES)) +RUN_STATUS_REVERSE_MAP = dict(((v, k) for k, v in RUN_STATUS_CHOICES)) + class ClusterStation(models.Model): - name = models.CharField(max_length=50, unique=True) + """List of cluster stations""" + name = models.CharField(max_length=50, unique=True) - def __unicode__(self): - return unicode(self.name) + def __unicode__(self): + return unicode(self.name) -class Sequencer(models.Model): - name = models.CharField(max_length=50, db_index=True) - instrument_name = models.CharField(max_length=50, db_index=True) - serial_number = models.CharField(max_length=50, db_index=True) - model = models.CharField(max_length=255) - active = models.BooleanField(default=True, null=False) - comment = models.CharField(max_length=255) - class Meta: - ordering = ["-active", "name"] +class Sequencer(models.Model): + """Sequencers we've owned + """ + name = models.CharField(max_length=50, db_index=True) + instrument_name = models.CharField(max_length=50, db_index=True) + serial_number = models.CharField(max_length=50, db_index=True) + model = models.CharField(max_length=255) + active = models.BooleanField(default=True, null=False) + comment = models.CharField(max_length=255) - def __unicode__(self): - name = [unicode(self.name)] - if self.instrument_name is not None: - name.append("(%s)" % (unicode(self.instrument_name),)) - return " ".join(name) + class Meta: + ordering = ["-active", "name"] + def __unicode__(self): + name = [unicode(self.name)] + if self.instrument_name is not None: + name.append("(%s)" % (unicode(self.instrument_name),)) + return " ".join(name) - @models.permalink - def get_absolute_url(self): - return ('htsworkflow.frontend.experiments.views.sequencer', - [self.id]) + @models.permalink + def get_absolute_url(self): + return ('htsworkflow.frontend.experiments.views.sequencer', + [self.id]) class FlowCell(models.Model): - flowcell_id = models.CharField(max_length=20, unique=True, db_index=True) - run_date = models.DateTimeField() - advanced_run = models.BooleanField(default=False) - paired_end = models.BooleanField(default=False) - read_length = models.IntegerField(default=32) #Stanford is currenlty 25 - control_lane = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(0,'All Lanes')], null=True, blank=True) - - cluster_station = models.ForeignKey(ClusterStation, default=3) - sequencer = models.ForeignKey(Sequencer, default=1) - - notes = models.TextField(blank=True) - - def __unicode__(self): - return unicode(self.flowcell_id) - - def Lanes(self): - html = [''] - for lane in self.lane_set.order_by('lane_number'): - cluster_estimate = lane.cluster_estimate - if cluster_estimate is not None: - cluster_estimate = "%s k" % ((int(cluster_estimate)/1000), ) + flowcell_id = models.CharField(max_length=20, unique=True, db_index=True) + run_date = models.DateTimeField() + advanced_run = models.BooleanField(default=False) + paired_end = models.BooleanField(default=False) + read_length = models.IntegerField(default=32) # Stanford is currenlty 25 + control_lane = models.IntegerField(choices=[(1, 1), + (2, 2), + (3, 3), + (4, 4), + (5, 5), + (6, 6), + (7, 7), + (8, 8), + (0, 'All Lanes')], + null=True, + blank=True) + + cluster_station = models.ForeignKey(ClusterStation, default=3) + sequencer = models.ForeignKey(Sequencer, default=1) + + notes = models.TextField(blank=True) + + def __unicode__(self): + return unicode(self.flowcell_id) + + def Lanes(self): + html = ['
'] + for lane in self.lane_set.order_by('lane_number'): + cluster_estimate = lane.cluster_estimate + if cluster_estimate is not None: + cluster_estimate = "%s k" % ((int(cluster_estimate) / 1000), ) + else: + cluster_estimate = 'None' + library_id = lane.library_id + library = lane.library + element = ''\ + '' + html.append(element % (lane.lane_number, + library.get_admin_url(), + library, + cluster_estimate)) + html.append('
%d%s%s
') + return "\n".join(html) + Lanes.allow_tags = True + + class Meta: + ordering = ["-run_date"] + + def get_admin_url(self): + # that's the django way... except it didn't work + return urlresolvers.reverse('admin:experiments_flowcell_change', + args=(self.id,)) + + def flowcell_type(self): + """Convert our boolean 'is paired' flag to a name + """ + if self.paired_end: + return u"Paired" else: - cluster_estimate = 'None' - library_id = lane.library_id - library = lane.library - element = '%d%s%s' - html.append(element % (lane.lane_number, - library.get_admin_url(), - library, - cluster_estimate)) - html.append('') - return "\n".join(html) - Lanes.allow_tags = True - - class Meta: - ordering = ["-run_date"] - - def get_admin_url(self): - # that's the django way... except it didn't work - return urlresolvers.reverse('admin:experiments_flowcell_change', - args=(self.id,)) - - def flowcell_type(self): - """ - Convert our boolean 'is paired' flag to a name - """ - if self.paired_end: - return u"Paired" - else: - return u"Single" - - @models.permalink - def get_absolute_url(self): - flowcell_id, status = parse_flowcell_id(self.flowcell_id) - return ('htsworkflow.frontend.experiments.views.flowcell_detail', - [str(flowcell_id)]) - - def get_raw_data_directory(self): - """Return location of where the raw data is stored""" - flowcell_id, status = parse_flowcell_id(self.flowcell_id) - - return os.path.join(settings.RESULT_HOME_DIR, flowcell_id) - - def update_data_runs(self): - result_root = self.get_raw_data_directory() - logger.debug("Update data runs flowcell root: %s" % (result_root,)) - if result_root is None: - return - - result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'') - run_xml_re = re.compile(glob.fnmatch.translate('run*.xml')) - - dataruns = dict([ (x.result_dir, x) for x in self.datarun_set.all() ]) - - result_dirs = [] - for dirpath, dirnames, filenames in os.walk(result_root): - for filename in filenames: - if run_xml_re.match(filename): - # we have a run directory - relative_pathname = get_relative_pathname(dirpath) - cached_run = dataruns.get(relative_pathname, None) - now = datetime.datetime.now() - if (cached_run is None): - self.import_data_run(relative_pathname, filename) - elif (now - cached_run.last_update_time).days > RESCAN_DELAY: - self.import_data_run(relative_pathname, - filename, cached_run) - - def import_data_run(self, relative_pathname, run_xml_name, run=None): - """Given a result directory import files""" - run_dir = get_absolute_pathname(relative_pathname) - run_xml_path = os.path.join(run_dir, run_xml_name) - run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path) - logger.debug("Importing run from %s" % (relative_pathname,)) - - if run is None: - run = DataRun() - run.flowcell = self - run.status = RUN_STATUS_REVERSE_MAP['DONE'] - run.result_dir = relative_pathname - run.runfolder_name = run_xml_data.runfolder_name - run.cycle_start = run_xml_data.image_analysis.start - run.cycle_stop = run_xml_data.image_analysis.stop - run.run_start_time = run_xml_data.image_analysis.date - run.image_software = run_xml_data.image_analysis.software - run.image_version = run_xml_data.image_analysis.version - run.basecall_software = run_xml_data.bustard.software - run.basecall_version = run_xml_data.bustard.version - run.alignment_software = run_xml_data.gerald.software - run.alignment_version = run_xml_data.gerald.version - - run.last_update_time = datetime.datetime.now() - run.save() - - run.update_result_files() + return u"Single" + + @models.permalink + def get_absolute_url(self): + flowcell_id, status = parse_flowcell_id(self.flowcell_id) + return ('htsworkflow.frontend.experiments.views.flowcell_detail', + [str(flowcell_id)]) + + def get_raw_data_directory(self): + """Return location of where the raw data is stored""" + flowcell_id, status = parse_flowcell_id(self.flowcell_id) + + return os.path.join(settings.RESULT_HOME_DIR, flowcell_id) + + def update_data_runs(self): + result_root = self.get_raw_data_directory() + LOGGER.debug("Update data runs flowcell root: %s" % (result_root,)) + if result_root is None: + return + + result_home_dir = os.path.join(settings.RESULT_HOME_DIR, '') + run_xml_re = re.compile(glob.fnmatch.translate('run*.xml')) + + dataruns = dict([(x.result_dir, x) for x in self.datarun_set.all()]) + + result_dirs = [] + for dirpath, dirnames, filenames in os.walk(result_root): + for filename in filenames: + if run_xml_re.match(filename): + # we have a run directory + relative_pathname = get_relative_pathname(dirpath) + cached_run = dataruns.get(relative_pathname, None) + now = datetime.datetime.now() + if (cached_run is None): + self.import_data_run(relative_pathname, filename) + elif (now - cached_run.last_update_time).days > \ + RESCAN_DELAY: + self.import_data_run(relative_pathname, + filename, cached_run) + + def import_data_run(self, relative_pathname, run_xml_name, run=None): + """Given a result directory import files""" + run_dir = get_absolute_pathname(relative_pathname) + run_xml_path = os.path.join(run_dir, run_xml_name) + run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path) + LOGGER.debug("Importing run from %s" % (relative_pathname,)) + + if run is None: + run = DataRun() + run.flowcell = self + run.status = RUN_STATUS_REVERSE_MAP['DONE'] + run.result_dir = relative_pathname + run.runfolder_name = run_xml_data.runfolder_name + run.cycle_start = run_xml_data.image_analysis.start + run.cycle_stop = run_xml_data.image_analysis.stop + run.run_start_time = run_xml_data.image_analysis.date + run.image_software = run_xml_data.image_analysis.software + run.image_version = run_xml_data.image_analysis.version + run.basecall_software = run_xml_data.bustard.software + run.basecall_version = run_xml_data.bustard.version + run.alignment_software = run_xml_data.gerald.software + run.alignment_version = run_xml_data.gerald.version + + run.last_update_time = datetime.datetime.now() + run.save() + + run.update_result_files() # FIXME: should we automatically update dataruns? @@ -204,39 +219,46 @@ class FlowCell(models.Model): #post_init.connect(update_flowcell_dataruns, sender=FlowCell) - LANE_STATUS_CODES = [(0, 'Failed'), - (1, 'Marginal'), - (2, 'Good'),] -LANE_STATUS_MAP = dict((int(k),v) for k,v in LANE_STATUS_CODES ) + (1, 'Marginal'), + (2, 'Good'), ] +LANE_STATUS_MAP = dict((int(k), v) for k, v in LANE_STATUS_CODES) LANE_STATUS_MAP[None] = "Unknown" + def is_valid_lane(value): if value >= 1 and value <= 8: return True else: - return False + return False + class Lane(models.Model): - flowcell = models.ForeignKey(FlowCell) - lane_number = models.IntegerField() - library = models.ForeignKey(Library) - pM = models.DecimalField(max_digits=5, decimal_places=2,blank=False, null=False,default=default_pM) - cluster_estimate = models.IntegerField(blank=True, null=True) - status = models.IntegerField(choices=LANE_STATUS_CODES, null=True, blank=True) - comment = models.TextField(null=True, blank=True) - - @models.permalink - def get_absolute_url(self): - return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail', - [str(self.id)]) - - def __unicode__(self): - return self.flowcell.flowcell_id + ':' + unicode(self.lane_number) - -### ----------------------- + flowcell = models.ForeignKey(FlowCell) + lane_number = models.IntegerField() + library = models.ForeignKey(Library) + pM = models.DecimalField(max_digits=5, + decimal_places=2, + blank=False, + null=False, + default=default_pM) + cluster_estimate = models.IntegerField(blank=True, null=True) + status = models.IntegerField(choices=LANE_STATUS_CODES, + null=True, + blank=True) + comment = models.TextField(null=True, blank=True) + + @models.permalink + def get_absolute_url(self): + return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail', + [str(self.id)]) + + def __unicode__(self): + return self.flowcell.flowcell_id + ':' + unicode(self.lane_number) + + class DataRun(models.Model): - flowcell = models.ForeignKey(FlowCell,verbose_name="Flowcell Id") + flowcell = models.ForeignKey(FlowCell, verbose_name="Flowcell Id") runfolder_name = models.CharField(max_length=50) result_dir = models.CharField(max_length=255) last_update_time = models.DateTimeField() @@ -261,8 +283,8 @@ class DataRun(models.Model): pathname = os.path.join(dirname, filename) relative_pathname = get_relative_pathname(pathname) datafiles = self.datafile_set.filter( - data_run = self, - relative_pathname=relative_pathname) + data_run=self, + relative_pathname=relative_pathname) if len(datafiles) > 0: continue @@ -276,7 +298,8 @@ class DataRun(models.Model): lane_number = metadata.get('lane', None) if lane_number is not None: - lane = self.flowcell.lane_set.get(lane_number = lane_number) + lane = self.flowcell.lane_set.get( + lane_number=lane_number) newfile.library = lane.library self.datafile_set.add(newfile) @@ -292,7 +315,8 @@ class DataRun(models.Model): lane = metadata.get('lane', None) if lane is not None: lane_file_set = lanes.setdefault(lane, {}) - lane_file_set[datafile.file_type.normalized_name] = datafile + normalized_name = datafile.file_type.normalized_name + lane_file_set[normalized_name] = datafile return lanes def ivc_plots(self, lane): @@ -304,6 +328,7 @@ class DataRun(models.Model): if metadata.file_type.name in ivc_name: plots[metadata.file_type.name] = (rel_filename, metadata) + class FileType(models.Model): """Represent potential file types @@ -325,7 +350,8 @@ class FileType(models.Model): if len(self.regex) > 0: match = re.match(self.regex, filename) if match is not None: - # These are (?P<>) names we know about from our default regexes. + # These are (?P<>) names we know about from our + # default regexes. results = match.groupdict() # convert int parameters @@ -345,10 +371,12 @@ class FileType(models.Model): #return u"" % (self.name,) return self.name + def str_uuid(): """Helper function to set default UUID in DataFile""" return str(uuid.uuid1()) + class DataFile(models.Model): """Store map from random ID to filename""" random_key = models.CharField(max_length=64, @@ -370,7 +398,8 @@ class DataFile(models.Model): @models.permalink def get_absolute_url(self): return ('htsworkflow.frontend.experiments.views.read_result_file', - (), {'key': self.random_key }) + (), {'key': self.random_key}) + def find_file_type_metadata_from_filename(pathname): path, filename = os.path.split(pathname) @@ -383,14 +412,15 @@ def find_file_type_metadata_from_filename(pathname): return None + def get_relative_pathname(abspath): """Strip off the result home directory from a path """ - result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'') - relative_pathname = abspath.replace(result_home_dir,'') + result_home_dir = os.path.join(settings.RESULT_HOME_DIR, '') + relative_pathname = abspath.replace(result_home_dir, '') return relative_pathname + def get_absolute_pathname(relative_pathname): """Attach relative path to results home directory""" return os.path.join(settings.RESULT_HOME_DIR, relative_pathname) - -- 2.30.2