X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=blobdiff_plain;f=htsworkflow%2Fpipelines%2Frunfolder.py;h=26c592ce5f7eb6d65175031f9c9010dc400d9410;hp=b072dd9cb8ee135236721384240a14e364a5910a;hb=5644e2bfe7c1f7e5af99289cd918f783b61c441a;hpb=ea083869640f95843b72c0778d16bba39086dee5 diff --git a/htsworkflow/pipelines/runfolder.py b/htsworkflow/pipelines/runfolder.py index b072dd9..26c592c 100644 --- a/htsworkflow/pipelines/runfolder.py +++ b/htsworkflow/pipelines/runfolder.py @@ -21,6 +21,7 @@ from htsworkflow.pipelines import ElementTree, \ EUROPEAN_STRPTIME, EUROPEAN_DATE_RE, \ VERSION_RE, USER_RE, \ LANES_PER_FLOWCELL, LANE_LIST +from htsworkflow.pipelines.samplekey import LANE_SAMPLE_KEYS from htsworkflow.util.alphanum import alphanum from htsworkflow.util.ethelp import indent, flatten from htsworkflow.util.queuecommands import QueueCommands @@ -32,7 +33,7 @@ class PipelineRun(object): :Variables: - `pathname` location of the root of this runfolder - - `name` read only property containing name of run xml file + - `serialization_filename` read only property containing name of run xml file - `flowcell_id` read-only property containing flowcell id (bar code) - `datadir` location of the runfolder data dir. - `image_analysis` generic name for Firecrest or IPAR image analysis @@ -63,6 +64,7 @@ class PipelineRun(object): self._name = None self._flowcell_id = flowcell_id self.datadir = None + self.suffix = None self.image_analysis = None self.bustard = None self.gerald = None @@ -127,22 +129,33 @@ class PipelineRun(object): return path_fields[-1] def _get_runfolder_name(self): - if self.gerald is None: - return None - else: + if self.gerald: return self.gerald.runfolder_name + elif hasattr(self.image_analysis, 'runfolder_name'): + return self.image_analysis.runfolder_name + else: + return None runfolder_name = property(_get_runfolder_name) - def _get_run_id(self): - """Return a identifer for a run. + def _get_run_dirname(self): + """Return name of directory to hold result files from one analysis For pre-multiplexing runs this is just the cycle range C1-123 For post-multiplexing runs the "suffix" that we add to differentiate runs will be added to the range. E.g. Unaligned_6mm may produce C1-200_6mm """ - pass - + if self.image_analysis is None: + raise ValueError("Not initialized yet") + start = self.image_analysis.start + stop = self.image_analysis.stop + cycle_fragment = "C%d-%d" % (start, stop) + if self.suffix: + cycle_fragment += self.suffix + + return cycle_fragment + run_dirname = property(_get_run_dirname) + def get_elements(self): """make one master xml file from all of our sub-components. @@ -187,21 +200,22 @@ class PipelineRun(object): else: LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,)) - def _get_run_name(self): - """Compute the run name for the run xml file + def _get_serialization_filename(self): + """Compute the filename for the run xml file Attempts to find the latest date from all of the run components. - :return: run xml name + :return: filename run_{flowcell id}_{timestamp}.xml :rtype: str """ if self._name is None: - tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time) + components = [self.image_analysis, self.bustard, self.gerald] + tmax = max([ c.time for c in components if c ]) timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax)) self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml' return self._name - name = property(_get_run_name) + serialization_filename = property(_get_serialization_filename) def save(self, destdir=None): """Save a run xml file. @@ -212,10 +226,10 @@ class PipelineRun(object): """ if destdir is None: destdir = '' - LOGGER.info("Saving run report " + self.name) + LOGGER.info("Saving run report " + self.serialization_filename) xml = self.get_elements() indent(xml) - dest_pathname = os.path.join(destdir, self.name) + dest_pathname = os.path.join(destdir, self.serialization_filename) ElementTree.ElementTree(xml).write(dest_pathname) def load(self, filename): @@ -338,17 +352,18 @@ def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id): matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths) LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths)) - for aligned, unaligned in matched_paths: + for aligned, unaligned, suffix in matched_paths: if unaligned is None: LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned) continue - print "scan for aligned then remove them from unaligned list" try: p = PipelineRun(runfolder, flowcell_id) p.datadir = datadir + p.suffix = suffix p.image_analysis = image_analysis p.bustard = bustard.bustard(unaligned) + assert p.bustard if aligned: p.gerald = gerald.gerald(aligned) runs.append(p) @@ -370,7 +385,7 @@ def hiseq_match_aligned_unaligned(aligned, unaligned): for key in keys: a = aligned_by_suffix.get(key) u = unaligned_by_suffix.get(key) - matches.append((a, u)) + matches.append((a, u, key)) return matches def build_dir_dict_by_suffix(prefix, dirnames): @@ -524,11 +539,16 @@ def summary_report(runs): Summarize cluster numbers and mapped read counts for a runfolder """ report = [] + eland_keys = [] for run in runs: # print a run name? - report.append('Summary for %s' % (run.name,)) + report.append('Summary for %s' % (run.serialization_filename,)) # sort the report - eland_keys = sorted(run.gerald.eland_results.keys()) + if run.gerald: + eland_keys = sorted(run.gerald.eland_results.keys()) + else: + report.append("Alignment not done, no report possible") + for lane_id in eland_keys: report.extend(summarize_lane(run.gerald, lane_id)) report.append('---') @@ -543,14 +563,14 @@ def is_compressed(filename): else: return False -def save_flowcell_reports(data_dir, cycle_dir): +def save_flowcell_reports(data_dir, run_dirname): """ Save the flowcell quality reports """ data_dir = os.path.abspath(data_dir) status_file = os.path.join(data_dir, 'Status.xml') reports_dir = os.path.join(data_dir, 'reports') - reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2') + reports_dest = os.path.join(run_dirname, 'flowcell-reports.tar.bz2') if os.path.exists(reports_dir): cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ] if os.path.exists(status_file): @@ -563,21 +583,21 @@ def save_flowcell_reports(data_dir, cycle_dir): os.chdir(cwd) -def save_summary_file(pipeline, cycle_dir): +def save_summary_file(pipeline, run_dirname): # Copy Summary.htm gerald_object = pipeline.gerald gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm') status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm') if os.path.exists(gerald_summary): - LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir)) - shutil.copy(gerald_summary, cycle_dir) + LOGGER.info('Copying %s to %s' % (gerald_summary, run_dirname)) + shutil.copy(gerald_summary, run_dirname) elif os.path.exists(status_files_summary): - LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir)) - shutil.copy(status_files_summary, cycle_dir) + LOGGER.info('Copying %s to %s' % (status_files_summary, run_dirname)) + shutil.copy(status_files_summary, run_dirname) else: LOGGER.info('Summary file %s was not found' % (summary_path,)) -def save_ivc_plot(bustard_object, cycle_dir): +def save_ivc_plot(bustard_object, run_dirname): """ Save the IVC page and its supporting images """ @@ -585,12 +605,12 @@ def save_ivc_plot(bustard_object, cycle_dir): plot_image_path = os.path.join(bustard_object.pathname, 'Plots') plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png') - plot_target_path = os.path.join(cycle_dir, 'Plots') + plot_target_path = os.path.join(run_dirname, 'Plots') if os.path.exists(plot_html): LOGGER.debug("Saving %s" % (plot_html,)) LOGGER.debug("Saving %s" % (plot_images,)) - shutil.copy(plot_html, cycle_dir) + shutil.copy(plot_html, run_dirname) if not os.path.exists(plot_target_path): os.mkdir(plot_target_path) for plot_file in glob(plot_images): @@ -599,7 +619,7 @@ def save_ivc_plot(bustard_object, cycle_dir): LOGGER.warning('Missing IVC.html file, not archiving') -def compress_score_files(bustard_object, cycle_dir): +def compress_score_files(bustard_object, run_dirname): """ Compress score files into our result directory """ @@ -617,7 +637,7 @@ def compress_score_files(bustard_object, cycle_dir): tar_cmd = ['tar', 'c'] + score_files bzip_cmd = [ 'bzip2', '-9', '-c' ] - tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2') + tar_dest_name = os.path.join(run_dirname, 'scores.tar.bz2') tar_dest = open(tar_dest_name, 'w') LOGGER.info("Compressing score files from %s" % (scores_path,)) LOGGER.info("Running tar: " + " ".join(tar_cmd[:10])) @@ -631,7 +651,7 @@ def compress_score_files(bustard_object, cycle_dir): tar.wait() -def compress_eland_results(gerald_object, cycle_dir, num_jobs=1): +def compress_eland_results(gerald_object, run_dirname, num_jobs=1): """ Compress eland result files into the archive directory """ @@ -646,7 +666,7 @@ def compress_eland_results(gerald_object, cycle_dir, num_jobs=1): "Lane ID %s does not have a filename." % (eland_lane.lane_id,)) else: path, name = os.path.split(source_name) - dest_name = os.path.join(cycle_dir, name) + dest_name = os.path.join(run_dirname, name) LOGGER.info("Saving eland file %s to %s" % \ (source_name, dest_name)) @@ -687,52 +707,56 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, r if not os.path.exists(result_dir): os.mkdir(result_dir) - # create cycle_dir - cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop) - LOGGER.info("Filling in %s" % (cycle,)) - cycle_dir = os.path.join(result_dir, cycle) - cycle_dir = os.path.abspath(cycle_dir) - if os.path.exists(cycle_dir): - LOGGER.error("%s already exists, not overwriting" % (cycle_dir,)) + # create directory to add this runs results to + LOGGER.info("Filling in %s" % (r.run_dirname,)) + run_dirname = os.path.join(result_dir, r.run_dirname) + run_dirname = os.path.abspath(run_dirname) + if os.path.exists(run_dirname): + LOGGER.error("%s already exists, not overwriting" % (run_dirname,)) continue else: - os.mkdir(cycle_dir) + os.mkdir(run_dirname) # save run file - r.save(cycle_dir) + r.save(run_dirname) # save illumina flowcell status report save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), - cycle_dir) + run_dirname) # save stuff from bustard # grab IVC plot - save_ivc_plot(r.bustard, cycle_dir) + save_ivc_plot(r.bustard, run_dirname) # build base call saving commands if site is not None: - save_raw_data(num_jobs, r, site, raw_format, cycle_dir) + save_raw_data(num_jobs, r, site, raw_format, run_dirname) # save stuff from GERALD # copy stuff out of the main run - g = r.gerald + if r.gerald: + g = r.gerald - # save summary file - save_summary_file(r, cycle_dir) + # save summary file + save_summary_file(r, run_dirname) - # compress eland result files - compress_eland_results(g, cycle_dir, num_jobs) + # compress eland result files + compress_eland_results(g, run_dirname, num_jobs) # md5 all the compressed files once we're done - md5_commands = srf.make_md5_commands(cycle_dir) - srf.run_commands(cycle_dir, md5_commands, num_jobs) + md5_commands = srf.make_md5_commands(run_dirname) + srf.run_commands(run_dirname, md5_commands, num_jobs) -def save_raw_data(num_jobs, r, site, raw_format, cycle_dir): +def save_raw_data(num_jobs, r, site, raw_format, run_dirname): lanes = [] - for lane in r.gerald.lanes: - lane_parameters = r.gerald.lanes.get(lane, None) - if lane_parameters is not None: - lanes.append(lane) + if r.gerald: + for lane in r.gerald.lanes: + lane_parameters = r.gerald.lanes.get(lane, None) + if lane_parameters is not None: + lanes.append(lane) + else: + # assume default list of lanes + lanes = LANE_SAMPLE_KEYS run_name = srf.pathname_to_run_name(r.pathname) seq_cmds = [] @@ -741,13 +765,14 @@ def save_raw_data(num_jobs, r, site, raw_format, cycle_dir): LOGGER.info("Raw Format is: %s" % (raw_format, )) if raw_format == 'fastq': - rawpath = os.path.join(r.pathname, r.gerald.runfolder_name) + LOGGER.info("Reading fastq files from %s", r.bustard.pathname) + rawpath = os.path.join(r.pathname, r.bustard.pathname) LOGGER.info("raw data = %s" % (rawpath,)) - srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir) + srf.copy_hiseq_project_fastqs(run_name, rawpath, site, run_dirname) elif raw_format == 'qseq': - seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir) + seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, run_dirname) elif raw_format == 'srf': - seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0) + seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, run_dirname, 0) else: raise ValueError('Unknown --raw-format=%s' % (raw_format)) srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)