return None
time_element = self.tree.xpath('TIME_STAMP')
if len(time_element) == 1:
- timetuple = time.strptime(
- time_element[0].text.strip(),
- "%a %b %d %H:%M:%S %Y")
- return datetime(*timetuple[:6])
+ timetuple = time.strptime(
+ time_element[0].text.strip(),
+ "%a %b %d %H:%M:%S %Y")
+ return datetime(*timetuple[:6])
return super(CASAVA, self)._get_date()
date = property(_get_date)
class PipelineRun(object):
"""Capture "interesting" information about a pipeline run
-
+
:Variables:
- `pathname` location of the root of this runfolder
- `serialization_filename` read only property containing name of run xml file
def __init__(self, pathname=None, flowcell_id=None, xml=None):
"""Initialize a PipelineRun object
-
+
:Parameters:
- `pathname` the root directory of this run folder.
- `flowcell_id` the flowcell ID in case it can't be determined
- `xml` Allows initializing an object from a serialized xml file.
-
+
:Types:
- `pathname` str
- `flowcell_id` str
def _get_flowcell_id(self):
"""Return the flowcell ID
-
+
Attempts to find the flowcell ID through several mechanisms.
"""
# extract flowcell ID
def _get_flowcell_id_from_flowcellid(self):
"""Extract flowcell id from a Config/FlowcellId.xml file
-
+
:return: flowcell_id or None if not found
"""
config_dir = os.path.join(self.pathname, 'Config')
def _get_run_dirname(self):
"""Return name of directory to hold result files from one analysis
-
+
For pre-multiplexing runs this is just the cycle range C1-123
- For post-multiplexing runs the "suffix" that we add to
+ For post-multiplexing runs the "suffix" that we add to
differentiate runs will be added to the range.
E.g. Unaligned_6mm may produce C1-200_6mm
"""
def get_elements(self):
"""make one master xml file from all of our sub-components.
-
+
:return: an ElementTree containing all available pipeline
run xml compoents.
"""
def _get_serialization_filename(self):
"""Compute the filename for the run xml file
-
- Attempts to find the latest date from all of the run
+
+ Attempts to find the latest date from all of the run
components.
-
+
:return: filename run_{flowcell id}_{timestamp}.xml
:rtype: str
"""
def save(self, destdir=None):
"""Save a run xml file.
-
+
:param destdir: Directory name to save too, uses current directory
if not specified.
:type destdir: str
def load(self, filename):
"""Load a run xml into this object.
-
+
:Parameters:
- `filename` location of a run xml file
-
+
:Types:
- `filename` str
"""
def get_runs(runfolder, flowcell_id=None):
"""Find all runs associated with a runfolder.
-
+
We end up with multiple analysis runs as we sometimes
need to try with different parameters. This attempts
to return a list of all the various runs.
p.gerald = gerald.gerald(aligned)
runs.append(p)
except (IOError, RuntimeError) as e:
- LOGGER.error("Exception %s", str(e))
+ LOGGER.error("Exception %s", str(e))
LOGGER.error("Skipping run in %s", flowcell_id)
return len(runs) - start
for run in runs:
# print a run name?
report.append('Summary for %s' % (run.serialization_filename,))
- # sort the report
- if run.gerald:
+ # sort the report
+ if run.gerald:
eland_keys = sorted(run.gerald.eland_results.keys())
else:
report.append("Alignment not done, no report possible")
clean_process = subprocess.Popen(['make', 'clean_intermediate'],
cwd=run.image_analysis.pathname,)
clean_process.wait()
-
-
-