2 from ConfigParser import SafeConfigParser
7 from optparse import OptionParser
9 from pprint import pprint, pformat
11 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
23 from htsworkflow.pipelines import qseq2fastq
24 from htsworkflow.pipelines import srf2fastq
26 def main(cmdline=None):
27 parser = make_parser()
28 opts, args = parser.parse_args(cmdline)
31 logging.basicConfig(level = logging.DEBUG )
33 logging.basicConfig(level = logging.INFO )
35 logging.basicConfig(level = logging.WARNING )
37 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
39 if opts.host is None or opts.apiid is None or opts.apikey is None:
40 parser.error("Please specify host url, apiid, apikey")
43 parser.error("I need at least one library submission-dir input file")
45 library_result_map = []
47 library_result_map.extend(read_library_result_map(a))
49 if opts.make_tree_from is not None:
50 make_tree_from(opts.make_tree_from, library_result_map)
52 if opts.daf is not None:
53 link_daf(opts.daf, library_result_map)
56 build_fastqs(opts.host,
63 make_submission_ini(opts.host, apidata, library_result_map)
66 make_all_ddfs(library_result_map, opts.daf, force=opts.force)
70 # Load defaults from the config files
71 config = SafeConfigParser()
72 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
74 sequence_archive = None
78 SECTION = 'sequence_archive'
79 if config.has_section(SECTION):
80 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
81 sequence_archive = os.path.expanduser(sequence_archive)
82 apiid = config.get(SECTION, 'apiid', apiid)
83 apikey = config.get(SECTION, 'apikey', apikey)
84 apihost = config.get(SECTION, 'host', apihost)
86 parser = OptionParser()
89 parser.add_option('--make-tree-from',
90 help="create directories & link data files",
92 parser.add_option('--fastq', help="generate scripts for making fastq files",
93 default=False, action="store_true")
95 parser.add_option('--ini', help="generate submission ini file", default=False,
98 parser.add_option('--makeddf', help='make the ddfs', default=False,
101 parser.add_option('--daf', default=None, help='specify daf name')
102 parser.add_option('--force', default=False, action="store_true",
103 help="Force regenerating fastqs")
105 # configuration options
106 parser.add_option('--apiid', default=apiid, help="Specify API ID")
107 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
108 parser.add_option('--host', default=apihost,
109 help="specify HTSWorkflow host",)
110 parser.add_option('--sequence', default=sequence_archive,
111 help="sequence repository")
114 parser.add_option('--verbose', default=False, action="store_true",
115 help='verbose logging')
116 parser.add_option('--debug', default=False, action="store_true",
117 help='debug logging')
122 def make_tree_from(source_path, library_result_map):
123 """Create a tree using data files from source path.
125 for lib_id, lib_path in library_result_map:
126 if not os.path.exists(lib_path):
127 logging.info("Making dir {0}".format(lib_path))
129 source_lib_dir = os.path.join(source_path, lib_path)
130 if os.path.exists(source_lib_dir):
132 for filename in os.listdir(source_lib_dir):
133 source_pathname = os.path.join(source_lib_dir, filename)
134 target_pathname = os.path.join(lib_path, filename)
135 if not os.path.exists(source_pathname):
136 raise IOError("{0} does not exist".format(source_pathname))
137 if not os.path.exists(target_pathname):
138 os.symlink(source_pathname, target_pathname)
140 'LINK {0} to {1}'.format(source_pathname, target_pathname))
142 def build_fastqs(host, apidata, sequences_path, library_result_map,
145 Generate condor scripts to build any needed fastq files
148 host (str): root of the htsworkflow api server
149 apidata (dict): id & key to post to the server
150 sequences_path (str): root of the directory tree to scan for files
151 library_result_map (list): [(library_id, destination directory), ...]
153 qseq_condor_header = """
156 error=log/qseq2fastq.err.$(process).log
157 output=log/qseq2fastq.out.$(process).log
158 log=log/qseq2fastq.log
160 """ % {'exe': sys.executable }
161 qseq_condor_entries = []
162 srf_condor_header = """
165 output=log/srf_pair_fastq.out.$(process).log
166 error=log/srf_pair_fastq.err.$(process).log
167 log=log/srf_pair_fastq.log
168 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
170 """ % {'exe': sys.executable }
171 srf_condor_entries = []
172 lib_db = find_archive_sequence_files(host,
177 needed_targets = find_missing_targets(library_result_map, lib_db, force)
179 for target_pathname, available_sources in needed_targets.items():
180 logging.debug(' target : %s' % (target_pathname,))
181 logging.debug(' candidate sources: %s' % (available_sources,))
182 if available_sources.has_key('qseq'):
183 source = available_sources['qseq']
184 qseq_condor_entries.append(
185 condor_qseq_to_fastq(source.path,
190 elif available_sources.has_key('srf'):
191 source = available_sources['srf']
192 mid = getattr(source, 'mid_point', None)
193 srf_condor_entries.append(
194 condor_srf_to_fastq(source.path,
202 print " need file", target_pathname
204 if len(srf_condor_entries) > 0:
205 make_submit_script('srf.fastq.condor',
209 if len(qseq_condor_entries) > 0:
210 make_submit_script('qseq.fastq.condor',
215 def find_missing_targets(library_result_map, lib_db, force=False):
217 Check if the sequence file exists.
218 This requires computing what the sequence name is and checking
219 to see if it can be found in the sequence location.
221 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
223 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
224 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
225 # find what targets we're missing
227 for lib_id, result_dir in library_result_map:
229 lane_dict = make_lane_dict(lib_db, lib_id)
231 for lane_key, sequences in lib['lanes'].items():
232 for seq in sequences:
233 seq.paired = lane_dict[seq.flowcell]['paired_end']
234 lane_status = lane_dict[seq.flowcell]['status']
236 if seq.paired and seq.read is None:
238 filename_attributes = {
239 'flowcell': seq.flowcell,
246 if lane_status == 'Failed':
248 if seq.flowcell == '30DY0AAXX':
249 # 30DY0 only ran for 151 bases instead of 152
250 # it is actually 76 1st read, 75 2nd read
255 target_name = fastq_paired_template % filename_attributes
257 target_name = fastq_single_template % filename_attributes
259 target_pathname = os.path.join(result_dir, target_name)
260 if force or not os.path.exists(target_pathname):
261 t = needed_targets.setdefault(target_pathname, {})
262 t[seq.filetype] = seq
264 return needed_targets
267 def link_daf(daf_path, library_result_map):
268 if not os.path.exists(daf_path):
269 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
271 base_daf = os.path.basename(daf_path)
273 for lib_id, result_dir in library_result_map:
274 submission_daf = os.path.join(result_dir, base_daf)
275 if not os.path.exists(submission_daf):
276 os.link(daf_path, submission_daf)
279 def make_submission_ini(host, apidata, library_result_map, paired=True):
280 #attributes = get_filename_attribute_map(paired)
281 view_map = NameToViewMap(host, apidata)
283 candidate_fastq_src = {}
285 for lib_id, result_dir in library_result_map:
286 order_by = ['order_by=files', 'view', 'replicate', 'cell',
287 'readType', 'mapAlgorithm', 'insertLength' ]
288 inifile = ['[config]']
289 inifile += [" ".join(order_by)]
292 result_ini = os.path.join(result_dir, result_dir+'.ini')
295 submission_files = os.listdir(result_dir)
297 fastq_attributes = {}
298 for f in submission_files:
299 attributes = view_map.find_attributes(f, lib_id)
300 if attributes is None:
301 raise ValueError("Unrecognized file: %s" % (f,))
303 ext = attributes["extension"]
304 if attributes['view'] is None:
306 elif attributes.get("type", None) == 'fastq':
307 fastqs.setdefault(ext, set()).add(f)
308 fastq_attributes[ext] = attributes
311 make_submission_section(line_counter,
318 # add in fastqs on a single line.
320 for extension, fastq_files in fastqs.items():
322 make_submission_section(line_counter,
324 fastq_attributes[extension])
329 f = open(result_ini,'w')
330 f.write(os.linesep.join(inifile))
333 def make_lane_dict(lib_db, lib_id):
335 Convert the lane_set in a lib_db to a dictionary
336 indexed by flowcell ID
339 for lane in lib_db[lib_id]['lane_set']:
340 result.append((lane['flowcell'], lane))
344 def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False):
346 for lib_id, result_dir in library_result_map:
347 ininame = result_dir+'.ini'
348 inipathname = os.path.join(result_dir, ininame)
349 if os.path.exists(inipathname):
351 make_ddf(ininame, daf_name, True, make_condor, result_dir)
354 if make_condor and len(dag_fragment) > 0:
355 dag_filename = 'submission.dagman'
356 if not force and os.path.exists(dag_filename):
357 logging.warn("%s exists, please delete" % (dag_filename,))
359 f = open(dag_filename,'w')
360 f.write( os.linesep.join(dag_fragment))
361 f.write( os.linesep )
365 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
367 Make ddf files, and bonus condor file
371 if outdir is not None:
376 ddf_name = make_ddf_name(ininame)
378 output = open(ddf_name,'w')
380 file_list = read_ddf_ini(ininame, output)
382 file_list.append(daf_name)
383 if ddf_name is not None:
384 file_list.append(ddf_name)
387 archive_condor = make_condor_archive_script(ininame, file_list)
388 upload_condor = make_condor_upload_script(ininame)
390 dag_fragments.extend(
391 make_dag_fragment(ininame, archive_condor, upload_condor)
399 def read_ddf_ini(filename, output=sys.stdout):
401 Read a ini file and dump out a tab delmited text file
404 config = SafeConfigParser()
405 config.read(filename)
407 order_by = shlex.split(config.get("config", "order_by"))
409 output.write("\t".join(order_by))
410 output.write(os.linesep)
411 sections = config.sections()
413 for section in sections:
414 if section == "config":
415 # skip the config block
419 v = config.get(section, key)
422 file_list.extend(parse_filelist(v))
424 output.write("\t".join(values))
425 output.write(os.linesep)
429 def read_library_result_map(filename):
431 Read a file that maps library id to result directory.
432 Does not support spaces in filenames.
437 stream = open(filename,'r')
442 if not line.startswith('#') and len(line) > 0 :
443 library_id, result_dir = line.split()
444 results.append((library_id, result_dir))
448 def make_condor_archive_script(ininame, files):
449 script = """Universe = vanilla
451 Executable = /bin/tar
452 arguments = czvhf ../%(archivename)s %(filelist)s
454 Error = compress.err.$(Process).log
455 Output = compress.out.$(Process).log
456 Log = /tmp/submission-compress-%(user)s.log
457 initialdir = %(initialdir)s
458 environment="GZIP=-3"
464 if not os.path.exists(f):
465 raise RuntimeError("Missing %s" % (f,))
467 context = {'archivename': make_submission_name(ininame),
468 'filelist': " ".join(files),
469 'initialdir': os.getcwd(),
470 'user': os.getlogin()}
472 condor_script = make_condor_name(ininame, 'archive')
473 condor_stream = open(condor_script,'w')
474 condor_stream.write(script % context)
475 condor_stream.close()
479 def make_condor_upload_script(ininame):
480 script = """Universe = vanilla
482 Executable = /usr/bin/lftp
483 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/%(archivename)s
485 Error = upload.err.$(Process).log
486 Output = upload.out.$(Process).log
487 Log = /tmp/submission-upload-%(user)s.log
488 initialdir = %(initialdir)s
492 context = {'archivename': make_submission_name(ininame),
493 'initialdir': os.getcwd(),
494 'user': os.getlogin()}
496 condor_script = make_condor_name(ininame, 'upload')
497 condor_stream = open(condor_script,'w')
498 condor_stream.write(script % context)
499 condor_stream.close()
503 def make_dag_fragment(ininame, archive_condor, upload_condor):
505 Make the couple of fragments compress and then upload the data.
507 cur_dir = os.getcwd()
508 archive_condor = os.path.join(cur_dir, archive_condor)
509 upload_condor = os.path.join(cur_dir, upload_condor)
510 job_basename = make_base_name(ininame)
513 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
514 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
515 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
520 def get_library_info(host, apidata, library_id):
521 url = api.library_url(host, library_id)
522 contents = api.retrieve_info(url, apidata)
526 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
527 mid=None, force=False):
528 py = srf2fastq.__file__
529 args = [ py, srf_file, ]
531 args.extend(['--left', target_pathname])
532 # this is ugly. I did it because I was pregenerating the target
533 # names before I tried to figure out what sources could generate
534 # those targets, and everything up to this point had been
535 # one-to-one. So I couldn't figure out how to pair the
537 # With this at least the command will run correctly.
538 # however if we rename the default targets, this'll break
539 # also I think it'll generate it twice.
540 args.extend(['--right',
541 target_pathname.replace('_r1.fastq', '_r2.fastq')])
543 args.extend(['--single', target_pathname ])
544 if flowcell is not None:
545 args.extend(['--flowcell', flowcell])
548 args.extend(['-m', str(mid)])
551 args.extend(['--force'])
556 """ % (" ".join(args),)
561 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
562 py = qseq2fastq.__file__
563 args = [py, '-i', qseq_file, '-o', target_pathname ]
564 if flowcell is not None:
565 args.extend(['-f', flowcell])
569 """ % (" ".join(args))
573 def find_archive_sequence_files(host, apidata, sequences_path,
576 Find all the archive sequence files possibly associated with our results.
579 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
583 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
585 for lib_id, result_dir in library_result_map:
586 lib_info = get_library_info(host, apidata, lib_id)
587 lib_info['lanes'] = {}
588 lib_db[lib_id] = lib_info
590 for lane in lib_info['lane_set']:
591 lane_key = (lane['flowcell'], lane['lane_number'])
592 candidate_lanes[lane_key] = lib_id
593 seq_dirs.add(os.path.join(sequences_path,
596 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
597 candidate_seq_list = scan_for_sequences(seq_dirs)
599 # at this point we have too many sequences as scan_for_sequences
600 # returns all the sequences in a flowcell directory
601 # so lets filter out the extras
603 for seq in candidate_seq_list:
604 lane_key = (seq.flowcell, seq.lane)
605 lib_id = candidate_lanes.get(lane_key, None)
606 if lib_id is not None:
607 lib_info = lib_db[lib_id]
608 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
613 class NameToViewMap(object):
614 """Determine view attributes for a given submission file name
616 def __init__(self, root_url, apidata):
617 self.root_url = root_url
618 self.apidata = apidata
622 # ma is "map algorithm"
627 ('*.splices.bam', 'Splices'),
628 ('*.bam', self._guess_bam_view),
629 ('junctions.bed', 'Junctions'),
630 ('*.jnct', 'Junctions'),
631 ('*.plus.bigwig', 'PlusSignal'),
632 ('*.minus.bigwig', 'MinusSignal'),
633 ('*.bigwig', 'Signal'),
638 ('*.?ufflinks-0.9.0?genes.expr', 'GeneDeNovo'),
639 ('*.?ufflinks-0.9.0?transcripts.expr', 'TranscriptDeNovo'),
640 ('*.?ufflinks-0.9.0?transcripts.gtf', 'GeneModel'),
641 ('*.GENCODE-v3c?genes.expr', 'GeneGencV3c'),
642 ('*.GENCODE-v3c?transcripts.expr', 'TranscriptGencV3c'),
643 ('*.GENCODE-v4?genes.expr', 'GeneGencV4'),
644 ('*.GENCODE-v4?transcripts.expr', 'TranscriptGencV4'),
645 ('*.GENCODE-v4?transcript.expr', 'TranscriptGencV4'),
646 ('*_1.75mers.fastq', 'FastqRd1'),
647 ('*_2.75mers.fastq', 'FastqRd2'),
648 ('*_r1.fastq', 'FastqRd1'),
649 ('*_r2.fastq', 'FastqRd2'),
650 ('*.fastq', 'Fastq'),
651 ('*.gtf', 'GeneModel'),
654 ('paired-end-distribution*', 'InsLength'),
655 ('*.stats.txt', 'InsLength'),
662 None: {"MapAlgorithm": "NA"},
663 "Paired": {"MapAlgorithm": ma},
664 "Aligns": {"MapAlgorithm": ma},
665 "Single": {"MapAlgorithm": ma},
666 "Splices": {"MapAlgorithm": ma},
667 "Junctions": {"MapAlgorithm": ma},
668 "PlusSignal": {"MapAlgorithm": ma},
669 "MinusSignal": {"MapAlgorithm": ma},
670 "Signal": {"MapAlgorithm": ma},
671 "GeneModel": {"MapAlgorithm": ma},
672 "GeneDeNovo": {"MapAlgorithm": ma},
673 "TranscriptDeNovo": {"MapAlgorithm": ma},
674 "GeneGencV3c": {"MapAlgorithm": ma},
675 "TranscriptGencV3c": {"MapAlgorithm": ma},
676 "GeneGencV4": {"MapAlgorithm": ma},
677 "TranscriptGencV4": {"MapAlgorithm": ma},
678 "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"},
679 "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"},
680 "Fastq": {"MapAlgorithm": "NA", "type": "fastq" },
681 "GeneModel": {"MapAlgorithm": ma},
682 "InsLength": {"MapAlgorithm": ma},
684 # view name is one of the attributes
685 for v in self.views.keys():
686 self.views[v]['view'] = v
688 def find_attributes(self, pathname, lib_id):
689 """Looking for the best extension
690 The 'best' is the longest match
693 filename (str): the filename whose extention we are about to examine
695 path, filename = os.path.splitext(pathname)
696 if not self.lib_cache.has_key(lib_id):
697 self.lib_cache[lib_id] = get_library_info(self.root_url,
698 self.apidata, lib_id)
700 lib_info = self.lib_cache[lib_id]
701 if lib_info['cell_line'].lower() == 'unknown':
702 logging.warn("Library %s missing cell_line" % (lib_id,))
704 'cell': lib_info['cell_line'],
705 'replicate': lib_info['replicate'],
707 is_paired = self._is_paired(lib_id, lib_info)
710 attributes.update(self.get_paired_attributes(lib_info))
712 attributes.update(self.get_single_attributes(lib_info))
714 for pattern, view in self.patterns:
715 if fnmatch.fnmatch(pathname, pattern):
717 view = view(is_paired=is_paired)
719 attributes.update(self.views[view])
720 attributes["extension"] = pattern
724 def _guess_bam_view(self, is_paired=True):
725 """Guess a view name based on library attributes
733 def _is_paired(self, lib_id, lib_info):
734 """Determine if a library is paired end"""
735 if len(lib_info["lane_set"]) == 0:
738 if not self.lib_paired.has_key(lib_id):
742 # check to see if all the flowcells are the same.
743 # otherwise we might need to do something complicated
744 for flowcell in lib_info["lane_set"]:
745 # yes there's also a status code, but this comparison
747 if flowcell["status"].lower() == "failed":
748 # ignore failed flowcell
751 elif flowcell["paired_end"]:
756 logging.debug("Library %s: %d paired, %d single, %d failed" % \
757 (lib_info["library_id"], is_paired, isnot_paired, failed))
759 if is_paired > isnot_paired:
760 self.lib_paired[lib_id] = True
761 elif is_paired < isnot_paired:
762 self.lib_paired[lib_id] = False
764 raise RuntimeError("Equal number of paired & unpaired lanes."\
765 "Can't guess library paired status")
767 return self.lib_paired[lib_id]
769 def get_paired_attributes(self, lib_info):
770 if lib_info['insert_size'] is None:
771 errmsg = "Library %s is missing insert_size, assuming 200"
772 logging.warn(errmsg % (lib_info["library_id"],))
775 insert_size = lib_info['insert_size']
776 return {'insertLength': insert_size,
779 def get_single_attributes(self, lib_info):
780 return {'insertLength':'ilNA',
784 def make_submission_section(line_counter, files, attributes):
786 Create a section in the submission ini file
788 inifile = [ "[line%s]" % (line_counter,) ]
789 inifile += ["files=%s" % (",".join(files))]
791 for k,v in attributes.items():
792 inifile += ["%s=%s" % (k,v)]
796 def make_base_name(pathname):
797 base = os.path.basename(pathname)
798 name, ext = os.path.splitext(base)
802 def make_submission_name(ininame):
803 name = make_base_name(ininame)
807 def make_ddf_name(pathname):
808 name = make_base_name(pathname)
812 def make_condor_name(pathname, run_type=None):
813 name = make_base_name(pathname)
815 if run_type is not None:
816 elements.append(run_type)
817 elements.append("condor")
818 return ".".join(elements)
821 def make_submit_script(target, header, body_list):
823 write out a text file
825 this was intended for condor submit scripts
828 target (str or stream):
829 if target is a string, we will open and close the file
830 if target is a stream, the caller is responsible.
833 header to write at the beginning of the file
834 body_list (list of strs):
835 a list of blocks to add to the file.
837 if type(target) in types.StringTypes:
842 for entry in body_list:
844 if type(target) in types.StringTypes:
847 def parse_filelist(file_string):
848 return file_string.split(",")
851 def validate_filelist(files):
853 Die if a file doesn't exist in a file list
856 if not os.path.exists(f):
857 raise RuntimeError("%s does not exist" % (f,))
860 if __name__ == "__main__":