reformatting to be more pep8 like
authorDiane Trout <diane@caltech.edu>
Tue, 26 Jun 2012 21:27:17 +0000 (14:27 -0700)
committerDiane Trout <diane@caltech.edu>
Tue, 26 Jun 2012 21:27:17 +0000 (14:27 -0700)
htsworkflow/frontend/bcmagic/utils.py
htsworkflow/frontend/experiments/models.py

index 78919b00f99a8a834b90884b219fc2145f2e9f68..6ffc99f8ee045a8a217c5be195386eee348deb84 100644 (file)
@@ -5,7 +5,7 @@ import socket
 import StringIO
 
 
-def print_zpl(zpl_text, host=settings.BCPRINTER_PRINTER1_HOST):
+def print_zpl(zpl_text, host):
     """
     Sends zpl_text to printer
     """
@@ -13,20 +13,20 @@ def print_zpl(zpl_text, host=settings.BCPRINTER_PRINTER1_HOST):
     ftp.login()
     ftp.storlines("STOR printme.txt", StringIO.StringIO(zpl_text))
     ftp.quit()
-    
 
-def print_zpl_socket(zpl_text, host=settings.BCPRINTER_PRINTER1_HOST, port=settings.BCPRINTER_PRINTER1_PORT):
+
+def print_zpl_socket(zpl_text, host, port):
     """
     Sends zpl_text to printer via a socket
-    
+
     if zpl_text is a list of zpl_texts, it will print each one
     in that list.
     """
-    
+
     # Process anyway if zpl_text is a list.
     if type(zpl_text) is list:
         zpl_text = '\n'.join(zpl_text)
-    
+
     s = socket.socket()
     # PORT 9100 is default for Zebra tabletop/desktop printers
     # PORT 6101 is default for Zebra mobile printers
@@ -34,19 +34,20 @@ def print_zpl_socket(zpl_text, host=settings.BCPRINTER_PRINTER1_HOST, port=setti
     s.sendall(zpl_text)
     s.close()
 
+
 def report_error(message):
     """
     Return a dictionary with a command to display 'message'
     """
     return {'mode': 'Error', 'status': message}
-    
+
 
 def redirect_to_url(url):
     """
     Return a bcm dictionary with a command to redirect to 'url'
     """
     return {'mode': 'redirect', 'url': url}
-    
+
 
 def autofill(field, value):
     """
index 25707f3166aefcb681946a40ee23fbd0f3017ec2..044dee617b5b7705df063f03a6abf9e097aa601e 100644 (file)
@@ -16,23 +16,23 @@ from htsworkflow.frontend.samples.models import Library
 from htsworkflow.util.conversion import parse_flowcell_id
 from htsworkflow.pipelines import runfolder
 
-logger = logging.getLogger(__name__)
+LOGGER = logging.getLogger(__name__)
 default_pM = 5
 try:
     default_pM = int(settings.DEFAULT_PM)
-except ValueError,e:
-    logger.error("invalid value for frontend.default_pm")
+except ValueError, e:
+    LOGGER.error("invalid value for frontend.default_pm")
 
 # how many days to wait before trying to re-import a runfolder
 RESCAN_DELAY = 1
 try:
     RESCAN_DELAY = int(settings.RESCAN_DELAY)
 except (ValueError, AttributeError):
-    logger.error("Missing or invalid settings.RESCAN_DELAY, "\
+    LOGGER.error("Missing or invalid settings.RESCAN_DELAY, "\
                  "defaulting to %s" % (RESCAN_DELAY,))
 
 RUN_STATUS_CHOICES = (
-    (0, 'Sequencer running'), ##Solexa Data Pipeline Not Yet Started'),
+    (0, 'Sequencer running'),  # Solexa Data Pipeline Not Yet Started'),
     (1, 'Data Pipeline Started'),
     (2, 'Data Pipeline Interrupted'),
     (3, 'Data Pipeline Finished'),
@@ -42,154 +42,169 @@ RUN_STATUS_CHOICES = (
     (7, 'QC Finished'),
     (255, 'DONE'),
   )
-RUN_STATUS_REVERSE_MAP = dict(((v,k) for k,v in RUN_STATUS_CHOICES))
+RUN_STATUS_REVERSE_MAP = dict(((v, k) for k, v in RUN_STATUS_CHOICES))
+
 
 class ClusterStation(models.Model):
-  name = models.CharField(max_length=50, unique=True)
+    """List of cluster stations"""
+    name = models.CharField(max_length=50, unique=True)
 
-  def __unicode__(self):
-    return unicode(self.name)
+    def __unicode__(self):
+        return unicode(self.name)
 
-class Sequencer(models.Model):
-  name = models.CharField(max_length=50, db_index=True)
-  instrument_name = models.CharField(max_length=50, db_index=True)
-  serial_number = models.CharField(max_length=50, db_index=True)
-  model = models.CharField(max_length=255)
-  active = models.BooleanField(default=True, null=False)
-  comment = models.CharField(max_length=255)
 
-  class Meta:
-    ordering = ["-active", "name"]
+class Sequencer(models.Model):
+    """Sequencers we've owned
+    """
+    name = models.CharField(max_length=50, db_index=True)
+    instrument_name = models.CharField(max_length=50, db_index=True)
+    serial_number = models.CharField(max_length=50, db_index=True)
+    model = models.CharField(max_length=255)
+    active = models.BooleanField(default=True, null=False)
+    comment = models.CharField(max_length=255)
 
-  def __unicode__(self):
-      name = [unicode(self.name)]
-      if self.instrument_name is not None:
-          name.append("(%s)" % (unicode(self.instrument_name),))
-      return " ".join(name)
+    class Meta:
+        ordering = ["-active", "name"]
 
+    def __unicode__(self):
+        name = [unicode(self.name)]
+        if self.instrument_name is not None:
+            name.append("(%s)" % (unicode(self.instrument_name),))
+        return " ".join(name)
 
-  @models.permalink
-  def get_absolute_url(self):
-      return ('htsworkflow.frontend.experiments.views.sequencer',
-              [self.id])
+    @models.permalink
+    def get_absolute_url(self):
+        return ('htsworkflow.frontend.experiments.views.sequencer',
+                [self.id])
 
 
 class FlowCell(models.Model):
-  flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
-  run_date = models.DateTimeField()
-  advanced_run = models.BooleanField(default=False)
-  paired_end = models.BooleanField(default=False)
-  read_length = models.IntegerField(default=32) #Stanford is currenlty 25
-  control_lane = models.IntegerField(choices=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(0,'All Lanes')], null=True, blank=True)
-
-  cluster_station = models.ForeignKey(ClusterStation, default=3)
-  sequencer = models.ForeignKey(Sequencer, default=1)
-
-  notes = models.TextField(blank=True)
-
-  def __unicode__(self):
-      return unicode(self.flowcell_id)
-
-  def Lanes(self):
-    html = ['<table>']
-    for lane in self.lane_set.order_by('lane_number'):
-        cluster_estimate = lane.cluster_estimate
-        if cluster_estimate is not None:
-            cluster_estimate = "%s k" % ((int(cluster_estimate)/1000), )
+    flowcell_id = models.CharField(max_length=20, unique=True, db_index=True)
+    run_date = models.DateTimeField()
+    advanced_run = models.BooleanField(default=False)
+    paired_end = models.BooleanField(default=False)
+    read_length = models.IntegerField(default=32)  # Stanford is currenlty 25
+    control_lane = models.IntegerField(choices=[(1, 1),
+                                                (2, 2),
+                                                (3, 3),
+                                                (4, 4),
+                                                (5, 5),
+                                                (6, 6),
+                                                (7, 7),
+                                                (8, 8),
+                                                (0, 'All Lanes')],
+                                       null=True,
+                                       blank=True)
+
+    cluster_station = models.ForeignKey(ClusterStation, default=3)
+    sequencer = models.ForeignKey(Sequencer, default=1)
+
+    notes = models.TextField(blank=True)
+
+    def __unicode__(self):
+        return unicode(self.flowcell_id)
+
+    def Lanes(self):
+        html = ['<table>']
+        for lane in self.lane_set.order_by('lane_number'):
+            cluster_estimate = lane.cluster_estimate
+            if cluster_estimate is not None:
+                cluster_estimate = "%s k" % ((int(cluster_estimate) / 1000), )
+            else:
+                cluster_estimate = 'None'
+            library_id = lane.library_id
+            library = lane.library
+            element = '<tr><td>%d</td>'\
+                      '<td><a href="%s">%s</a></td><td>%s</td></tr>'
+            html.append(element % (lane.lane_number,
+                                   library.get_admin_url(),
+                                   library,
+                                   cluster_estimate))
+        html.append('</table>')
+        return "\n".join(html)
+    Lanes.allow_tags = True
+
+    class Meta:
+        ordering = ["-run_date"]
+
+    def get_admin_url(self):
+        # that's the django way... except it didn't work
+        return urlresolvers.reverse('admin:experiments_flowcell_change',
+                                    args=(self.id,))
+
+    def flowcell_type(self):
+        """Convert our boolean 'is paired' flag to a name
+        """
+        if self.paired_end:
+            return u"Paired"
         else:
-            cluster_estimate = 'None'
-        library_id = lane.library_id
-        library = lane.library
-        element = '<tr><td>%d</td><td><a href="%s">%s</a></td><td>%s</td></tr>'
-        html.append(element % (lane.lane_number,
-                               library.get_admin_url(),
-                               library,
-                               cluster_estimate))
-    html.append('</table>')
-    return "\n".join(html)
-  Lanes.allow_tags = True
-
-  class Meta:
-    ordering = ["-run_date"]
-
-  def get_admin_url(self):
-    # that's the django way... except it didn't work
-    return urlresolvers.reverse('admin:experiments_flowcell_change',
-                                args=(self.id,))
-
-  def flowcell_type(self):
-    """
-    Convert our boolean 'is paired' flag to a name
-    """
-    if self.paired_end:
-      return u"Paired"
-    else:
-      return u"Single"
-
-  @models.permalink
-  def get_absolute_url(self):
-      flowcell_id, status = parse_flowcell_id(self.flowcell_id)
-      return ('htsworkflow.frontend.experiments.views.flowcell_detail',
-              [str(flowcell_id)])
-
-  def get_raw_data_directory(self):
-      """Return location of where the raw data is stored"""
-      flowcell_id, status = parse_flowcell_id(self.flowcell_id)
-
-      return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
-
-  def update_data_runs(self):
-      result_root = self.get_raw_data_directory()
-      logger.debug("Update data runs flowcell root: %s" % (result_root,))
-      if result_root is None:
-          return
-
-      result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
-      run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
-
-      dataruns = dict([ (x.result_dir, x) for x in self.datarun_set.all() ])
-
-      result_dirs = []
-      for dirpath, dirnames, filenames in os.walk(result_root):
-          for filename in filenames:
-              if run_xml_re.match(filename):
-                  # we have a run directory
-                  relative_pathname = get_relative_pathname(dirpath)
-                  cached_run = dataruns.get(relative_pathname, None)
-                  now = datetime.datetime.now()
-                  if (cached_run is None):
-                      self.import_data_run(relative_pathname, filename)
-                  elif (now - cached_run.last_update_time).days > RESCAN_DELAY:
-                      self.import_data_run(relative_pathname,
-                                           filename, cached_run)
-
-  def import_data_run(self, relative_pathname, run_xml_name, run=None):
-      """Given a result directory import files"""
-      run_dir = get_absolute_pathname(relative_pathname)
-      run_xml_path = os.path.join(run_dir, run_xml_name)
-      run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
-      logger.debug("Importing run from %s" % (relative_pathname,))
-
-      if run is None:
-          run = DataRun()
-          run.flowcell = self
-          run.status = RUN_STATUS_REVERSE_MAP['DONE']
-          run.result_dir = relative_pathname
-          run.runfolder_name = run_xml_data.runfolder_name
-          run.cycle_start = run_xml_data.image_analysis.start
-          run.cycle_stop = run_xml_data.image_analysis.stop
-          run.run_start_time = run_xml_data.image_analysis.date
-          run.image_software = run_xml_data.image_analysis.software
-          run.image_version = run_xml_data.image_analysis.version
-          run.basecall_software = run_xml_data.bustard.software
-          run.basecall_version = run_xml_data.bustard.version
-          run.alignment_software = run_xml_data.gerald.software
-          run.alignment_version = run_xml_data.gerald.version
-
-      run.last_update_time = datetime.datetime.now()
-      run.save()
-
-      run.update_result_files()
+            return u"Single"
+
+    @models.permalink
+    def get_absolute_url(self):
+        flowcell_id, status = parse_flowcell_id(self.flowcell_id)
+        return ('htsworkflow.frontend.experiments.views.flowcell_detail',
+                [str(flowcell_id)])
+
+    def get_raw_data_directory(self):
+        """Return location of where the raw data is stored"""
+        flowcell_id, status = parse_flowcell_id(self.flowcell_id)
+
+        return os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
+
+    def update_data_runs(self):
+        result_root = self.get_raw_data_directory()
+        LOGGER.debug("Update data runs flowcell root: %s" % (result_root,))
+        if result_root is None:
+            return
+
+        result_home_dir = os.path.join(settings.RESULT_HOME_DIR, '')
+        run_xml_re = re.compile(glob.fnmatch.translate('run*.xml'))
+
+        dataruns = dict([(x.result_dir, x) for x in self.datarun_set.all()])
+
+        result_dirs = []
+        for dirpath, dirnames, filenames in os.walk(result_root):
+            for filename in filenames:
+                if run_xml_re.match(filename):
+                    # we have a run directory
+                    relative_pathname = get_relative_pathname(dirpath)
+                    cached_run = dataruns.get(relative_pathname, None)
+                    now = datetime.datetime.now()
+                    if (cached_run is None):
+                        self.import_data_run(relative_pathname, filename)
+                    elif (now - cached_run.last_update_time).days > \
+                             RESCAN_DELAY:
+                        self.import_data_run(relative_pathname,
+                                             filename, cached_run)
+
+    def import_data_run(self, relative_pathname, run_xml_name, run=None):
+        """Given a result directory import files"""
+        run_dir = get_absolute_pathname(relative_pathname)
+        run_xml_path = os.path.join(run_dir, run_xml_name)
+        run_xml_data = runfolder.load_pipeline_run_xml(run_xml_path)
+        LOGGER.debug("Importing run from %s" % (relative_pathname,))
+
+        if run is None:
+            run = DataRun()
+            run.flowcell = self
+            run.status = RUN_STATUS_REVERSE_MAP['DONE']
+            run.result_dir = relative_pathname
+            run.runfolder_name = run_xml_data.runfolder_name
+            run.cycle_start = run_xml_data.image_analysis.start
+            run.cycle_stop = run_xml_data.image_analysis.stop
+            run.run_start_time = run_xml_data.image_analysis.date
+            run.image_software = run_xml_data.image_analysis.software
+            run.image_version = run_xml_data.image_analysis.version
+            run.basecall_software = run_xml_data.bustard.software
+            run.basecall_version = run_xml_data.bustard.version
+            run.alignment_software = run_xml_data.gerald.software
+            run.alignment_version = run_xml_data.gerald.version
+
+        run.last_update_time = datetime.datetime.now()
+        run.save()
+
+        run.update_result_files()
 
 
 # FIXME: should we automatically update dataruns?
@@ -204,39 +219,46 @@ class FlowCell(models.Model):
 #post_init.connect(update_flowcell_dataruns, sender=FlowCell)
 
 
-
 LANE_STATUS_CODES = [(0, 'Failed'),
-                    (1, 'Marginal'),
-                    (2, 'Good'),]
-LANE_STATUS_MAP = dict((int(k),v) for k,v in LANE_STATUS_CODES )
+                     (1, 'Marginal'),
+                     (2, 'Good'), ]
+LANE_STATUS_MAP = dict((int(k), v) for k, v in LANE_STATUS_CODES)
 LANE_STATUS_MAP[None] = "Unknown"
 
+
 def is_valid_lane(value):
     if value >= 1 and value <= 8:
         return True
     else:
-          return False
+        return False
+
 
 class Lane(models.Model):
-  flowcell = models.ForeignKey(FlowCell)
-  lane_number = models.IntegerField()
-  library = models.ForeignKey(Library)
-  pM = models.DecimalField(max_digits=5, decimal_places=2,blank=False, null=False,default=default_pM)
-  cluster_estimate = models.IntegerField(blank=True, null=True)
-  status = models.IntegerField(choices=LANE_STATUS_CODES, null=True, blank=True)
-  comment = models.TextField(null=True, blank=True)
-
-  @models.permalink
-  def get_absolute_url(self):
-       return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
-               [str(self.id)])
-
-  def __unicode__(self):
-      return self.flowcell.flowcell_id + ':' + unicode(self.lane_number)
-
-### -----------------------
+    flowcell = models.ForeignKey(FlowCell)
+    lane_number = models.IntegerField()
+    library = models.ForeignKey(Library)
+    pM = models.DecimalField(max_digits=5,
+                             decimal_places=2,
+                             blank=False,
+                             null=False,
+                             default=default_pM)
+    cluster_estimate = models.IntegerField(blank=True, null=True)
+    status = models.IntegerField(choices=LANE_STATUS_CODES,
+                                 null=True,
+                                 blank=True)
+    comment = models.TextField(null=True, blank=True)
+
+    @models.permalink
+    def get_absolute_url(self):
+        return ('htsworkflow.frontend.experiments.views.flowcell_lane_detail',
+                [str(self.id)])
+
+    def __unicode__(self):
+        return self.flowcell.flowcell_id + ':' + unicode(self.lane_number)
+
+
 class DataRun(models.Model):
-    flowcell = models.ForeignKey(FlowCell,verbose_name="Flowcell Id")
+    flowcell = models.ForeignKey(FlowCell, verbose_name="Flowcell Id")
     runfolder_name = models.CharField(max_length=50)
     result_dir = models.CharField(max_length=255)
     last_update_time = models.DateTimeField()
@@ -261,8 +283,8 @@ class DataRun(models.Model):
                 pathname = os.path.join(dirname, filename)
                 relative_pathname = get_relative_pathname(pathname)
                 datafiles = self.datafile_set.filter(
-                  data_run = self,
-                  relative_pathname=relative_pathname)
+                    data_run=self,
+                    relative_pathname=relative_pathname)
                 if len(datafiles) > 0:
                     continue
 
@@ -276,7 +298,8 @@ class DataRun(models.Model):
 
                     lane_number = metadata.get('lane', None)
                     if lane_number is not None:
-                        lane = self.flowcell.lane_set.get(lane_number = lane_number)
+                        lane = self.flowcell.lane_set.get(
+                            lane_number=lane_number)
                         newfile.library = lane.library
 
                     self.datafile_set.add(newfile)
@@ -292,7 +315,8 @@ class DataRun(models.Model):
                 lane = metadata.get('lane', None)
                 if lane is not None:
                     lane_file_set = lanes.setdefault(lane, {})
-                    lane_file_set[datafile.file_type.normalized_name] = datafile
+                    normalized_name = datafile.file_type.normalized_name
+                    lane_file_set[normalized_name] = datafile
         return lanes
 
     def ivc_plots(self, lane):
@@ -304,6 +328,7 @@ class DataRun(models.Model):
             if metadata.file_type.name in ivc_name:
                 plots[metadata.file_type.name] = (rel_filename, metadata)
 
+
 class FileType(models.Model):
     """Represent potential file types
 
@@ -325,7 +350,8 @@ class FileType(models.Model):
         if len(self.regex) > 0:
             match = re.match(self.regex, filename)
             if match is not None:
-                # These are (?P<>) names we know about from our default regexes.
+                # These are (?P<>) names we know about from our
+                # default regexes.
                 results = match.groupdict()
 
                 # convert int parameters
@@ -345,10 +371,12 @@ class FileType(models.Model):
         #return u"<FileType: %s>" % (self.name,)
         return self.name
 
+
 def str_uuid():
     """Helper function to set default UUID in DataFile"""
     return str(uuid.uuid1())
 
+
 class DataFile(models.Model):
     """Store map from random ID to filename"""
     random_key = models.CharField(max_length=64,
@@ -370,7 +398,8 @@ class DataFile(models.Model):
     @models.permalink
     def get_absolute_url(self):
         return ('htsworkflow.frontend.experiments.views.read_result_file',
-                (), {'key': self.random_key })
+                (), {'key': self.random_key})
+
 
 def find_file_type_metadata_from_filename(pathname):
     path, filename = os.path.split(pathname)
@@ -383,14 +412,15 @@ def find_file_type_metadata_from_filename(pathname):
 
     return None
 
+
 def get_relative_pathname(abspath):
     """Strip off the result home directory from a path
     """
-    result_home_dir = os.path.join(settings.RESULT_HOME_DIR,'')
-    relative_pathname = abspath.replace(result_home_dir,'')
+    result_home_dir = os.path.join(settings.RESULT_HOME_DIR, '')
+    relative_pathname = abspath.replace(result_home_dir, '')
     return relative_pathname
 
+
 def get_absolute_pathname(relative_pathname):
     """Attach relative path to  results home directory"""
     return os.path.join(settings.RESULT_HOME_DIR, relative_pathname)
-