2 from ConfigParser import SafeConfigParser
7 from optparse import OptionParser
9 from pprint import pprint, pformat
11 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
23 from htsworkflow.pipelines import qseq2fastq
24 from htsworkflow.pipelines import srf2fastq
26 def main(cmdline=None):
27 parser = make_parser()
28 opts, args = parser.parse_args(cmdline)
31 logging.basicConfig(level = logging.DEBUG )
33 logging.basicConfig(level = logging.INFO )
35 logging.basicConfig(level = logging.WARNING )
37 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
39 if opts.host is None or opts.apiid is None or opts.apikey is None:
40 parser.error("Please specify host url, apiid, apikey")
43 parser.error("I need at least one library submission-dir input file")
45 library_result_map = []
47 library_result_map.extend(read_library_result_map(a))
49 if opts.make_tree_from is not None:
50 make_tree_from(opts.make_tree_from, library_result_map)
52 if opts.daf is not None:
53 link_daf(opts.daf, library_result_map)
56 build_fastqs(opts.host,
63 make_submission_ini(opts.host, apidata, library_result_map)
66 make_all_ddfs(library_result_map, opts.daf, force=opts.force)
70 # Load defaults from the config files
71 config = SafeConfigParser()
72 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
74 sequence_archive = None
78 SECTION = 'sequence_archive'
79 if config.has_section(SECTION):
80 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
81 sequence_archive = os.path.expanduser(sequence_archive)
82 apiid = config.get(SECTION, 'apiid', apiid)
83 apikey = config.get(SECTION, 'apikey', apikey)
84 apihost = config.get(SECTION, 'host', apihost)
86 parser = OptionParser()
89 parser.add_option('--make-tree-from',
90 help="create directories & link data files",
92 parser.add_option('--fastq', help="generate scripts for making fastq files",
93 default=False, action="store_true")
95 parser.add_option('--ini', help="generate submission ini file", default=False,
98 parser.add_option('--makeddf', help='make the ddfs', default=False,
101 parser.add_option('--daf', default=None, help='specify daf name')
102 parser.add_option('--force', default=False, action="store_true",
103 help="Force regenerating fastqs")
105 # configuration options
106 parser.add_option('--apiid', default=apiid, help="Specify API ID")
107 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
108 parser.add_option('--host', default=apihost,
109 help="specify HTSWorkflow host",)
110 parser.add_option('--sequence', default=sequence_archive,
111 help="sequence repository")
114 parser.add_option('--verbose', default=False, action="store_true",
115 help='verbose logging')
116 parser.add_option('--debug', default=False, action="store_true",
117 help='debug logging')
122 def make_tree_from(source_path, library_result_map):
123 """Create a tree using data files from source path.
125 for lib_id, lib_path in library_result_map:
126 if not os.path.exists(lib_path):
127 logging.info("Making dir {0}".format(lib_path))
129 source_lib_dir = os.path.join(source_path, lib_path)
130 if os.path.exists(source_lib_dir):
132 for filename in os.listdir(source_lib_dir):
133 source_pathname = os.path.join(source_lib_dir, filename)
134 target_pathname = os.path.join(lib_path, filename)
135 if not os.path.exists(source_pathname):
136 raise IOError("{0} does not exist".format(source_pathname))
137 if not os.path.exists(target_pathname):
138 os.symlink(source_pathname, target_pathname)
140 'LINK {0} to {1}'.format(source_pathname, target_pathname))
142 def build_fastqs(host, apidata, sequences_path, library_result_map,
145 Generate condor scripts to build any needed fastq files
148 host (str): root of the htsworkflow api server
149 apidata (dict): id & key to post to the server
150 sequences_path (str): root of the directory tree to scan for files
151 library_result_map (list): [(library_id, destination directory), ...]
153 qseq_condor_header = """
156 error=log/qseq2fastq.err.$(process).log
157 output=log/qseq2fastq.out.$(process).log
158 log=log/qseq2fastq.log
160 """ % {'exe': sys.executable }
161 qseq_condor_entries = []
162 srf_condor_header = """
165 output=log/srf_pair_fastq.out.$(process).log
166 error=log/srf_pair_fastq.err.$(process).log
167 log=log/srf_pair_fastq.log
168 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
170 """ % {'exe': sys.executable }
171 srf_condor_entries = []
172 lib_db = find_archive_sequence_files(host,
177 needed_targets = find_missing_targets(library_result_map, lib_db, force)
179 for target_pathname, available_sources in needed_targets.items():
180 logging.debug(' target : %s' % (target_pathname,))
181 logging.debug(' candidate sources: %s' % (available_sources,))
182 if available_sources.has_key('qseq'):
183 source = available_sources['qseq']
184 qseq_condor_entries.append(
185 condor_qseq_to_fastq(source.path,
190 elif available_sources.has_key('srf'):
191 source = available_sources['srf']
192 mid = getattr(source, 'mid_point', None)
193 srf_condor_entries.append(
194 condor_srf_to_fastq(source.path,
202 print " need file", target_pathname
204 if len(srf_condor_entries) > 0:
205 make_submit_script('srf.fastq.condor',
209 if len(qseq_condor_entries) > 0:
210 make_submit_script('qseq.fastq.condor',
215 def find_missing_targets(library_result_map, lib_db, force=False):
217 Check if the sequence file exists.
218 This requires computing what the sequence name is and checking
219 to see if it can be found in the sequence location.
221 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
223 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
224 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
225 # find what targets we're missing
227 for lib_id, result_dir in library_result_map:
229 lane_dict = make_lane_dict(lib_db, lib_id)
231 for lane_key, sequences in lib['lanes'].items():
232 for seq in sequences:
233 seq.paired = lane_dict[seq.flowcell]['paired_end']
234 lane_status = lane_dict[seq.flowcell]['status']
236 if seq.paired and seq.read is None:
238 filename_attributes = {
239 'flowcell': seq.flowcell,
246 if lane_status == 'Failed':
248 if seq.flowcell == '30DY0AAXX':
249 # 30DY0 only ran for 151 bases instead of 152
250 # it is actually 76 1st read, 75 2nd read
255 target_name = fastq_paired_template % filename_attributes
257 target_name = fastq_single_template % filename_attributes
259 target_pathname = os.path.join(result_dir, target_name)
260 if force or not os.path.exists(target_pathname):
261 t = needed_targets.setdefault(target_pathname, {})
262 t[seq.filetype] = seq
264 return needed_targets
267 def link_daf(daf_path, library_result_map):
268 if not os.path.exists(daf_path):
269 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
271 base_daf = os.path.basename(daf_path)
273 for lib_id, result_dir in library_result_map:
274 submission_daf = os.path.join(result_dir, base_daf)
275 if not os.path.exists(submission_daf):
276 os.link(daf_path, submission_daf)
279 def make_submission_ini(host, apidata, library_result_map, paired=True):
280 #attributes = get_filename_attribute_map(paired)
281 view_map = NameToViewMap(host, apidata)
283 candidate_fastq_src = {}
285 for lib_id, result_dir in library_result_map:
286 order_by = ['order_by=files', 'view', 'replicate', 'cell',
287 'readType', 'mapAlgorithm', 'insertLength' ]
288 inifile = ['[config]']
289 inifile += [" ".join(order_by)]
292 result_ini = os.path.join(result_dir, result_dir+'.ini')
295 submission_files = os.listdir(result_dir)
297 fastq_attributes = {}
298 for f in submission_files:
299 attributes = view_map.find_attributes(f, lib_id)
300 if attributes is None:
301 raise ValueError("Unrecognized file: %s" % (f,))
303 ext = attributes["extension"]
304 if attributes['view'] is None:
306 elif attributes.get("type", None) == 'fastq':
307 fastqs.setdefault(ext, set()).add(f)
308 fastq_attributes[ext] = attributes
311 make_submission_section(line_counter,
318 # add in fastqs on a single line.
320 for extension, fastq_files in fastqs.items():
322 make_submission_section(line_counter,
324 fastq_attributes[extension])
329 f = open(result_ini,'w')
330 f.write(os.linesep.join(inifile))
333 def make_lane_dict(lib_db, lib_id):
335 Convert the lane_set in a lib_db to a dictionary
336 indexed by flowcell ID
339 for lane in lib_db[lib_id]['lane_set']:
340 result.append((lane['flowcell'], lane))
344 def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False):
346 for lib_id, result_dir in library_result_map:
347 ininame = result_dir+'.ini'
348 inipathname = os.path.join(result_dir, ininame)
349 if os.path.exists(inipathname):
351 make_ddf(ininame, daf_name, True, make_condor, result_dir)
354 if make_condor and len(dag_fragment) > 0:
355 dag_filename = 'submission.dagman'
356 if not force and os.path.exists(dag_filename):
357 logging.warn("%s exists, please delete" % (dag_filename,))
359 f = open(dag_filename,'w')
360 f.write( os.linesep.join(dag_fragment))
361 f.write( os.linesep )
365 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
367 Make ddf files, and bonus condor file
371 if outdir is not None:
376 ddf_name = make_ddf_name(ininame)
378 output = open(ddf_name,'w')
380 file_list = read_ddf_ini(ininame, output)
382 file_list.append(daf_name)
383 if ddf_name is not None:
384 file_list.append(ddf_name)
387 archive_condor = make_condor_archive_script(ininame, file_list)
388 upload_condor = make_condor_upload_script(ininame)
390 dag_fragments.extend(
391 make_dag_fragment(ininame, archive_condor, upload_condor)
399 def read_ddf_ini(filename, output=sys.stdout):
401 Read a ini file and dump out a tab delmited text file
404 config = SafeConfigParser()
405 config.read(filename)
407 order_by = shlex.split(config.get("config", "order_by"))
409 output.write("\t".join(order_by))
410 output.write(os.linesep)
411 sections = config.sections()
413 for section in sections:
414 if section == "config":
415 # skip the config block
419 v = config.get(section, key)
422 file_list.extend(parse_filelist(v))
424 output.write("\t".join(values))
425 output.write(os.linesep)
429 def read_library_result_map(filename):
431 Read a file that maps library id to result directory.
432 Does not support spaces in filenames.
437 stream = open(filename,'r')
442 if not line.startswith('#') and len(line) > 0 :
443 library_id, result_dir = line.split()
444 results.append((library_id, result_dir))
448 def make_condor_archive_script(ininame, files):
449 script = """Universe = vanilla
451 Executable = /bin/tar
452 arguments = czvhf ../%(archivename)s %(filelist)s
454 Error = compress.err.$(Process).log
455 Output = compress.out.$(Process).log
456 Log = /tmp/submission-compress-%(user)s.log
457 initialdir = %(initialdir)s
458 environment="GZIP=-3"
464 if not os.path.exists(f):
465 raise RuntimeError("Missing %s" % (f,))
467 context = {'archivename': make_submission_name(ininame),
468 'filelist': " ".join(files),
469 'initialdir': os.getcwd(),
470 'user': os.getlogin()}
472 condor_script = make_condor_name(ininame, 'archive')
473 condor_stream = open(condor_script,'w')
474 condor_stream.write(script % context)
475 condor_stream.close()
479 def make_condor_upload_script(ininame):
480 script = """Universe = vanilla
482 Executable = /usr/bin/lftp
483 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/%(archivename)s
485 Error = upload.err.$(Process).log
486 Output = upload.out.$(Process).log
487 Log = /tmp/submission-upload-%(user)s.log
488 initialdir = %(initialdir)s
492 context = {'archivename': make_submission_name(ininame),
493 'initialdir': os.getcwd(),
494 'user': os.getlogin()}
496 condor_script = make_condor_name(ininame, 'upload')
497 condor_stream = open(condor_script,'w')
498 condor_stream.write(script % context)
499 condor_stream.close()
503 def make_dag_fragment(ininame, archive_condor, upload_condor):
505 Make the couple of fragments compress and then upload the data.
507 cur_dir = os.getcwd()
508 archive_condor = os.path.join(cur_dir, archive_condor)
509 upload_condor = os.path.join(cur_dir, upload_condor)
510 job_basename = make_base_name(ininame)
513 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
514 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
515 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
520 def get_library_info(host, apidata, library_id):
521 url = api.library_url(host, library_id)
522 contents = api.retrieve_info(url, apidata)
526 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
527 mid=None, force=False):
528 py = srf2fastq.__file__
529 args = [ py, srf_file, ]
531 args.extend(['--left', target_pathname])
532 # this is ugly. I did it because I was pregenerating the target
533 # names before I tried to figure out what sources could generate
534 # those targets, and everything up to this point had been
535 # one-to-one. So I couldn't figure out how to pair the
537 # With this at least the command will run correctly.
538 # however if we rename the default targets, this'll break
539 # also I think it'll generate it twice.
540 args.extend(['--right',
541 target_pathname.replace('_r1.fastq', '_r2.fastq')])
543 args.extend(['--single', target_pathname ])
544 if flowcell is not None:
545 args.extend(['--flowcell', flowcell])
548 args.extend(['-m', str(mid)])
551 args.extend(['--force'])
556 """ % (" ".join(args),)
561 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
562 py = qseq2fastq.__file__
563 args = [py, '-i', qseq_file, '-o', target_pathname ]
564 if flowcell is not None:
565 args.extend(['-f', flowcell])
569 """ % (" ".join(args))
573 def find_archive_sequence_files(host, apidata, sequences_path,
576 Find all the archive sequence files possibly associated with our results.
579 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
583 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
585 for lib_id, result_dir in library_result_map:
586 lib_info = get_library_info(host, apidata, lib_id)
587 lib_info['lanes'] = {}
588 lib_db[lib_id] = lib_info
590 for lane in lib_info['lane_set']:
591 lane_key = (lane['flowcell'], lane['lane_number'])
592 candidate_lanes[lane_key] = lib_id
593 seq_dirs.add(os.path.join(sequences_path,
596 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
597 candidate_seq_list = scan_for_sequences(seq_dirs)
599 # at this point we have too many sequences as scan_for_sequences
600 # returns all the sequences in a flowcell directory
601 # so lets filter out the extras
603 for seq in candidate_seq_list:
604 lane_key = (seq.flowcell, seq.lane)
605 lib_id = candidate_lanes.get(lane_key, None)
606 if lib_id is not None:
607 lib_info = lib_db[lib_id]
608 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
613 class NameToViewMap(object):
614 """Determine view attributes for a given submission file name
616 def __init__(self, root_url, apidata):
617 self.root_url = root_url
618 self.apidata = apidata
622 # ma is "map algorithm"
627 ('*.splices.bam', 'Splices'),
628 ('*.bam', self._guess_bam_view),
629 ('junctions.bed', 'Junctions'),
630 ('*.jnct', 'Junctions'),
631 ('*.unique.bigwig', None),
632 ('*.plus.bigwig', 'PlusSignal'),
633 ('*.minus.bigwig', 'MinusSignal'),
634 ('*.bigwig', 'Signal'),
639 ('*.?ufflinks-0.9.0?genes.expr', 'GeneDeNovo'),
640 ('*.?ufflinks-0.9.0?transcripts.expr', 'TranscriptDeNovo'),
641 ('*.?ufflinks-0.9.0?transcripts.gtf', 'GeneModel'),
642 ('*.GENCODE-v3c?genes.expr', 'GeneGCV3c'),
643 ('*.GENCODE-v3c?transcript*.expr', 'TranscriptGCV3c'),
644 ('*.GENCODE-v3c?transcript*.gtf', 'TranscriptGencV3c'),
645 ('*.GENCODE-v4?genes.expr', None), #'GeneGCV4'),
646 ('*.GENCODE-v4?transcript*.expr', None), #'TranscriptGCV4'),
647 ('*.GENCODE-v4?transcript*.gtf', None), #'TranscriptGencV4'),
648 ('*_1.75mers.fastq', 'FastqRd1'),
649 ('*_2.75mers.fastq', 'FastqRd2'),
650 ('*_r1.fastq', 'FastqRd1'),
651 ('*_r2.fastq', 'FastqRd2'),
652 ('*.fastq', 'Fastq'),
653 ('*.gtf', 'GeneModel'),
656 ('paired-end-distribution*', 'InsLength'),
657 ('*.stats.txt', 'InsLength'),
664 None: {"MapAlgorithm": "NA"},
665 "Paired": {"MapAlgorithm": ma},
666 "Aligns": {"MapAlgorithm": ma},
667 "Single": {"MapAlgorithm": ma},
668 "Splices": {"MapAlgorithm": ma},
669 "Junctions": {"MapAlgorithm": ma},
670 "PlusSignal": {"MapAlgorithm": ma},
671 "MinusSignal": {"MapAlgorithm": ma},
672 "Signal": {"MapAlgorithm": ma},
673 "GeneModel": {"MapAlgorithm": ma},
674 "GeneDeNovo": {"MapAlgorithm": ma},
675 "TranscriptDeNovo": {"MapAlgorithm": ma},
676 "GeneGCV3c": {"MapAlgorithm": ma},
677 "TranscriptGCV3c": {"MapAlgorithm": ma},
678 "TranscriptGencV3c": {"MapAlgorithm": ma},
679 "GeneGCV4": {"MapAlgorithm": ma},
680 "TranscriptGCV4": {"MapAlgorithm": ma},
681 "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"},
682 "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"},
683 "Fastq": {"MapAlgorithm": "NA", "type": "fastq" },
684 "InsLength": {"MapAlgorithm": ma},
686 # view name is one of the attributes
687 for v in self.views.keys():
688 self.views[v]['view'] = v
690 def find_attributes(self, pathname, lib_id):
691 """Looking for the best extension
692 The 'best' is the longest match
695 filename (str): the filename whose extention we are about to examine
697 path, filename = os.path.splitext(pathname)
698 if not self.lib_cache.has_key(lib_id):
699 self.lib_cache[lib_id] = get_library_info(self.root_url,
700 self.apidata, lib_id)
702 lib_info = self.lib_cache[lib_id]
703 if lib_info['cell_line'].lower() == 'unknown':
704 logging.warn("Library %s missing cell_line" % (lib_id,))
706 'cell': lib_info['cell_line'],
707 'replicate': lib_info['replicate'],
709 is_paired = self._is_paired(lib_id, lib_info)
712 attributes.update(self.get_paired_attributes(lib_info))
714 attributes.update(self.get_single_attributes(lib_info))
716 for pattern, view in self.patterns:
717 if fnmatch.fnmatch(pathname, pattern):
719 view = view(is_paired=is_paired)
721 attributes.update(self.views[view])
722 attributes["extension"] = pattern
726 def _guess_bam_view(self, is_paired=True):
727 """Guess a view name based on library attributes
735 def _is_paired(self, lib_id, lib_info):
736 """Determine if a library is paired end"""
737 if len(lib_info["lane_set"]) == 0:
740 if not self.lib_paired.has_key(lib_id):
744 # check to see if all the flowcells are the same.
745 # otherwise we might need to do something complicated
746 for flowcell in lib_info["lane_set"]:
747 # yes there's also a status code, but this comparison
749 if flowcell["status"].lower() == "failed":
750 # ignore failed flowcell
753 elif flowcell["paired_end"]:
758 logging.debug("Library %s: %d paired, %d single, %d failed" % \
759 (lib_info["library_id"], is_paired, isnot_paired, failed))
761 if is_paired > isnot_paired:
762 self.lib_paired[lib_id] = True
763 elif is_paired < isnot_paired:
764 self.lib_paired[lib_id] = False
766 raise RuntimeError("Equal number of paired & unpaired lanes."\
767 "Can't guess library paired status")
769 return self.lib_paired[lib_id]
771 def get_paired_attributes(self, lib_info):
772 if lib_info['insert_size'] is None:
773 errmsg = "Library %s is missing insert_size, assuming 200"
774 logging.warn(errmsg % (lib_info["library_id"],))
777 insert_size = lib_info['insert_size']
778 return {'insertLength': insert_size,
781 def get_single_attributes(self, lib_info):
782 return {'insertLength':'ilNA',
786 def make_submission_section(line_counter, files, attributes):
788 Create a section in the submission ini file
790 inifile = [ "[line%s]" % (line_counter,) ]
791 inifile += ["files=%s" % (",".join(files))]
793 for k,v in attributes.items():
794 inifile += ["%s=%s" % (k,v)]
798 def make_base_name(pathname):
799 base = os.path.basename(pathname)
800 name, ext = os.path.splitext(base)
804 def make_submission_name(ininame):
805 name = make_base_name(ininame)
809 def make_ddf_name(pathname):
810 name = make_base_name(pathname)
814 def make_condor_name(pathname, run_type=None):
815 name = make_base_name(pathname)
817 if run_type is not None:
818 elements.append(run_type)
819 elements.append("condor")
820 return ".".join(elements)
823 def make_submit_script(target, header, body_list):
825 write out a text file
827 this was intended for condor submit scripts
830 target (str or stream):
831 if target is a string, we will open and close the file
832 if target is a stream, the caller is responsible.
835 header to write at the beginning of the file
836 body_list (list of strs):
837 a list of blocks to add to the file.
839 if type(target) in types.StringTypes:
844 for entry in body_list:
846 if type(target) in types.StringTypes:
849 def parse_filelist(file_string):
850 return file_string.split(",")
853 def validate_filelist(files):
855 Die if a file doesn't exist in a file list
858 if not os.path.exists(f):
859 raise RuntimeError("%s does not exist" % (f,))
862 if __name__ == "__main__":