import stat
import subprocess
import sys
+import tarfile
import time
try:
VERSION_RE = "([0-9\.]+)"
USER_RE = "([a-zA-Z0-9]+)"
LANES_PER_FLOWCELL = 8
-LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
+LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
from htsworkflow.util.alphanum import alphanum
from htsworkflow.util.ethelp import indent, flatten
+from htsworkflow.util.queuecommands import QueueCommands
+
+from htsworkflow.pipelines import srf
class PipelineRun(object):
"""
return self._flowcell_id
flowcell_id = property(_get_flowcell_id)
+ def _get_runfolder_name(self):
+ if self.gerald is None:
+ return None
+ else:
+ return self.gerald.runfolder_name
+ runfolder_name = property(_get_runfolder_name)
+
def get_elements(self):
"""
make one master xml file from all of our sub-components.
if self._name is None:
tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
- self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
+ self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
return self._name
name = property(_get_run_name)
def save(self, destdir=None):
if destdir is None:
destdir = ''
- logging.info("Saving run report "+ self.name)
+ logging.info("Saving run report " + self.name)
xml = self.get_elements()
indent(xml)
dest_pathname = os.path.join(destdir, self.name)
logging.info('Searching for runs in ' + datadir)
runs = []
# scan for firecrest directories
- for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
+ for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
logging.info('Found firecrest in ' + datadir)
image_analysis = firecrest.firecrest(firecrest_pathname)
if image_analysis is None:
image_analysis = ipar.ipar(ipar_pathname)
if image_analysis is None:
logging.warn(
- "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
+ "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
)
else:
scan_post_image_analysis(
bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
- runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
-
+ runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
+
logging.info('--- use-run detected options ---')
logging.info('runfolder: %s' % (runfolder_dir,))
logging.info('image_dir: %s' % (image_dir,))
# leaf directory should be an IPAR or firecrest directory
data_dir, short_image_dir = os.path.split(image_dir)
logging.info('data_dir: %s' % (data_dir,))
- logging.info('short_iamge_dir: %s' %(short_image_dir,))
+ logging.info('short_iamge_dir: %s' % (short_image_dir,))
# guess which type of image processing directory we have by looking
# in the leaf directory name
p.image_analysis = image_run
p.bustard = base_calling_run
p.gerald = gerald_run
-
+
logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
return p
def summarize_lane(gerald, lane_id):
report = []
summary_results = gerald.summary.lane_results
- for end in range(len(summary_results)):
+ for end in range(len(summary_results)):
eland_result = gerald.eland_results.results[end][lane_id]
report.append("Sample name %s" % (eland_result.sample_name))
report.append("Lane id %s end %s" % (eland_result.lane_id, end))
if hasattr(eland_result, 'match_codes'):
mc = eland_result.match_codes
nm = mc['NM']
- nm_percent = float(nm)/eland_result.reads * 100
+ nm_percent = float(nm) / eland_result.reads * 100
qc = mc['QC']
- qc_percent = float(qc)/eland_result.reads * 100
+ qc_percent = float(qc) / eland_result.reads * 100
report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
else:
return False
-def extract_results(runs, output_base_dir=None):
+def save_flowcell_reports(data_dir, cycle_dir):
+ """
+ Save the flowcell quality reports
+ """
+ data_dir = os.path.abspath(data_dir)
+ status_file = os.path.join(data_dir, 'Status.xml')
+ reports_dir = os.path.join(data_dir, 'reports')
+ reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
+ if os.path.exists(reports_dir):
+ cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
+ if os.path.exists(status_file):
+ cmd_list.extend(['Status.xml', 'Status.xsl'])
+ logging.info("Saving reports from " + reports_dir)
+ cwd = os.getcwd()
+ os.chdir(data_dir)
+ q = QueueCommands([" ".join(cmd_list)])
+ q.run()
+ os.chdir(cwd)
+
+
+def save_summary_file(gerald_object, cycle_dir):
+ # Copy Summary.htm
+ summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
+ if os.path.exists(summary_path):
+ logging.info('Copying %s to %s' % (summary_path, cycle_dir))
+ shutil.copy(summary_path, cycle_dir)
+ else:
+ logging.info('Summary file %s was not found' % (summary_path,))
+
+def save_ivc_plot(bustard_object, cycle_dir):
+ """
+ Save the IVC page and its supporting images
+ """
+ plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
+ plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
+ plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
+
+ plot_target_path = os.path.join(cycle_dir, 'Plots')
+
+ if os.path.exists(plot_html):
+ logging.debug("Saving %s" % (plot_html,))
+ logging.debug("Saving %s" % (plot_images,))
+ shutil.copy(plot_html, cycle_dir)
+ if not os.path.exists(plot_target_path):
+ os.mkdir(plot_target_path)
+ for plot_file in glob(plot_images):
+ shutil.copy(plot_file, plot_target_path)
+ else:
+ logging.warning('Missing IVC.html file, not archiving')
+
+
+def compress_score_files(bustard_object, cycle_dir):
+ """
+ Compress score files into our result directory
+ """
+ # check for g.pathname/Temp a new feature of 1.1rc1
+ scores_path = bustard_object.pathname
+ scores_path_temp = os.path.join(scores_path, 'Temp')
+ if os.path.isdir(scores_path_temp):
+ scores_path = scores_path_temp
+
+ # hopefully we have a directory that contains s_*_score files
+ score_files = []
+ for f in os.listdir(scores_path):
+ if re.match('.*_score.txt', f):
+ score_files.append(f)
+
+ tar_cmd = ['tar', 'c'] + score_files
+ bzip_cmd = [ 'bzip2', '-9', '-c' ]
+ tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
+ tar_dest = open(tar_dest_name, 'w')
+ logging.info("Compressing score files from %s" % (scores_path,))
+ logging.info("Running tar: " + " ".join(tar_cmd[:10]))
+ logging.info("Running bzip2: " + " ".join(bzip_cmd))
+ logging.info("Writing to %s" % (tar_dest_name,))
+
+ env = {'BZIP': '-9'}
+ tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
+ cwd=scores_path)
+ bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
+ tar.wait()
+
+
+def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
+ """
+ Compress eland result files into the archive directory
+ """
+ # copy & bzip eland files
+ bz_commands = []
+
+ for lanes_dictionary in gerald_object.eland_results.results:
+ for eland_lane in lanes_dictionary.values():
+ source_name = eland_lane.pathname
+ if source_name is None:
+ logging.info(
+ "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
+ else:
+ path, name = os.path.split(source_name)
+ dest_name = os.path.join(cycle_dir, name)
+ logging.info("Saving eland file %s to %s" % \
+ (source_name, dest_name))
+
+ if is_compressed(name):
+ logging.info('Already compressed, Saving to %s' % (dest_name,))
+ shutil.copy(source_name, dest_name)
+ else:
+ # not compressed
+ dest_name += '.bz2'
+ args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
+ bz_commands.append(" ".join(args))
+ #logging.info('Running: %s' % ( " ".join(args) ))
+ #bzip_dest = open(dest_name, 'w')
+ #bzip = subprocess.Popen(args, stdout=bzip_dest)
+ #logging.info('Saving to %s' % (dest_name, ))
+ #bzip.wait()
+
+ if len(bz_commands) > 0:
+ q = QueueCommands(bz_commands, num_jobs)
+ q.run()
+
+
+def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
+ """
+ Iterate over runfolders in runs extracting the most useful information.
+ * run parameters (in run-*.xml)
+ * eland_result files
+ * score files
+ * Summary.htm
+ * srf files (raw sequence & qualities)
+ """
if output_base_dir is None:
output_base_dir = os.getcwd()
cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
logging.info("Filling in %s" % (cycle,))
cycle_dir = os.path.join(result_dir, cycle)
+ cycle_dir = os.path.abspath(cycle_dir)
if os.path.exists(cycle_dir):
logging.error("%s already exists, not overwriting" % (cycle_dir,))
continue
else:
os.mkdir(cycle_dir)
+ # save run file
+ r.save(cycle_dir)
+
+ # save illumina flowcell status report
+ save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
+
+ # save stuff from bustard
+ # grab IVC plot
+ save_ivc_plot(r.bustard, cycle_dir)
+
+ # build base call saving commands
+ if site is not None:
+ lanes = []
+ for lane in range(1, 9):
+ if r.gerald.lanes[lane].analysis != 'none':
+ lanes.append(lane)
+
+ run_name = srf.pathname_to_run_name(r.pathname)
+ if raw_format == 'qseq':
+ seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
+ elif raw_format == 'srf':
+ seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
+ else:
+ raise ValueError('Unknown --raw-format=%s' % (raw_format))
+ srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
+
+ # save stuff from GERALD
# copy stuff out of the main run
g = r.gerald
- # save run file
- r.save(cycle_dir)
+ # save summary file
+ save_summary_file(g, cycle_dir)
- # Copy Summary.htm
- summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
- if os.path.exists(summary_path):
- logging.info('Copying %s to %s' % (summary_path, cycle_dir))
- shutil.copy(summary_path, cycle_dir)
- else:
- logging.info('Summary file %s was not found' % (summary_path,))
-
- # tar score files
- score_files = []
-
- # check for g.pathname/Temp a new feature of 1.1rc1
- scores_path = g.pathname
- scores_path_temp = os.path.join(scores_path, 'Temp')
- if os.path.isdir(scores_path_temp):
- scores_path = scores_path_temp
-
- # hopefully we have a directory that contains s_*_score files
- for f in os.listdir(scores_path):
- if re.match('.*_score.txt', f):
- score_files.append(f)
-
- tar_cmd = ['/bin/tar', 'c'] + score_files
- bzip_cmd = [ 'bzip2', '-9', '-c' ]
- tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
- tar_dest = open(tar_dest_name, 'w')
- logging.info("Compressing score files from %s" % (scores_path,))
- logging.info("Running tar: " + " ".join(tar_cmd[:10]))
- logging.info("Running bzip2: " + " ".join(bzip_cmd))
- logging.info("Writing to %s" %(tar_dest_name))
-
- env = {'BZIP': '-9'}
- tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
- cwd=scores_path)
- bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
- tar.wait()
-
- # copy & bzip eland files
- for lanes_dictionary in g.eland_results.results:
- for eland_lane in lanes_dictionary.values():
- source_name = eland_lane.pathname
- path, name = os.path.split(eland_lane.pathname)
- dest_name = os.path.join(cycle_dir, name)
- logging.info("Saving eland file %s to %s" % \
- (source_name, dest_name))
+ # compress eland result files
+ compress_eland_results(g, cycle_dir, num_jobs)
- if is_compressed(name):
- logging.info('Already compressed, Saving to %s' % (dest_name, ))
- shutil.copy(source_name, dest_name)
- else:
- # not compressed
- dest_name += '.bz2'
- args = ['bzip2', '-9', '-c', source_name]
- logging.info('Running: %s' % ( " ".join(args) ))
- bzip_dest = open(dest_name, 'w')
- bzip = subprocess.Popen(args, stdout=bzip_dest)
- logging.info('Saving to %s' % (dest_name, ))
- bzip.wait()
+ # md5 all the compressed files once we're done
+ md5_commands = srf.make_md5_commands(cycle_dir)
+ srf.run_commands(cycle_dir, md5_commands, num_jobs)
def rm_list(files, dry_run=True):
for f in files:
else:
os.unlink(f)
else:
- logging.warn("%s doesn't exist."% (f,))
+ logging.warn("%s doesn't exist." % (f,))
def clean_runs(runs, dry_run=True):
"""
logging.info("Cleaning intermediate files")
# make clean_intermediate
if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
- clean_process = subprocess.Popen(['make', 'clean_intermediate'],
+ clean_process = subprocess.Popen(['make', 'clean_intermediate'],
cwd=run.image_analysis.pathname,)
clean_process.wait()