+class SequenceLane(ResultLane):
+ XML_VERSION=1
+ LANE = 'SequenceLane'
+ SEQUENCE_TYPE = 'SequenceType'
+
+ NONE_TYPE = None
+ SCARF_TYPE = 1
+ FASTQ_TYPE = 2
+ SEQUENCE_DESCRIPTION = { NONE_TYPE: 'None', SCARF_TYPE: 'SCARF', FASTQ_TYPE: 'FASTQ' }
+
+ def __init__(self, pathname=None, lane_id=None, end=None, xml=None):
+ self.sequence_type = None
+ super(SequenceLane, self).__init__(pathname, lane_id, end, xml)
+
+ def _guess_sequence_type(self, pathname):
+ """
+ Determine if we have a scarf or fastq sequence file
+ """
+ f = open(pathname,'r')
+ l = f.readline()
+ f.close()
+
+ if l[0] == '@':
+ # fastq starts with a @
+ self.sequence_type = SequenceLane.FASTQ_TYPE
+ else:
+ self.sequence_type = SequenceLane.SCARF_TYPE
+ return self.sequence_type
+
+ def _update(self):
+ """
+ Actually read the file and actually count the reads
+ """
+ # can't do anything if we don't have a file to process
+ if self.pathname is None:
+ return
+
+ if os.stat(self.pathname)[stat.ST_SIZE] == 0:
+ raise RuntimeError("Sequencing isn't done, try again later.")
+
+ self._guess_sequence_type(self.pathname)
+
+ logging.info("summarizing results for %s" % (self.pathname))
+ lines = 0
+ f = open(self.pathname)
+ for l in f.xreadlines():
+ lines += 1
+ f.close()
+
+ if self.sequence_type == SequenceLane.SCARF_TYPE:
+ self._reads = lines
+ elif self.sequence_type == SequenceLane.FASTQ_TYPE:
+ self._reads = lines / 4
+ else:
+ raise NotImplementedError("This only supports scarf or fastq squence files")
+
+ def get_elements(self):
+ lane = ElementTree.Element(SequenceLane.LANE,
+ {'version':
+ unicode(SequenceLane.XML_VERSION)})
+ sample_tag = ElementTree.SubElement(lane, SAMPLE_NAME)
+ sample_tag.text = self.sample_name
+ lane_tag = ElementTree.SubElement(lane, LANE_ID)
+ lane_tag.text = str(self.lane_id)
+ if self.end is not None:
+ end_tag = ElementTree.SubElement(lane, END)
+ end_tag.text = str(self.end)
+ reads = ElementTree.SubElement(lane, READS)
+ reads.text = unicode(self.reads)
+ sequence_type = ElementTree.SubElement(lane, SequenceLane.SEQUENCE_TYPE)
+ sequence_type.text = unicode(SequenceLane.SEQUENCE_DESCRIPTION[self.sequence_type])
+
+ return lane
+
+ def set_elements(self, tree):
+ if tree.tag != SequenceLane.LANE:
+ raise ValueError('Exptecting %s' % (SequenceLane.LANE,))
+ lookup_sequence_type = dict([ (v,k) for k,v in SequenceLane.SEQUENCE_DESCRIPTION.items()])
+
+ for element in tree:
+ tag = element.tag.lower()
+ if tag == SAMPLE_NAME.lower():
+ self._sample_name = element.text
+ elif tag == LANE_ID.lower():
+ self.lane_id = int(element.text)
+ elif tag == END.lower():
+ self.end = int(element.text)
+ elif tag == READS.lower():
+ self._reads = int(element.text)
+ elif tag == SequenceLane.SEQUENCE_TYPE.lower():
+ self.sequence_type = lookup_sequence_type.get(element.text, None)
+ print self.sequence_type
+ else:
+ logging.warn("SequenceLane unrecognized tag %s" % (element.tag,))
+