2 from ConfigParser import SafeConfigParser
8 from optparse import OptionParser
10 from pprint import pprint, pformat
12 from StringIO import StringIO
14 from subprocess import Popen, PIPE
22 from htsworkflow.util import api
23 from htsworkflow.pipelines.sequences import \
24 create_sequence_table, \
26 from htsworkflow.pipelines import qseq2fastq
27 from htsworkflow.pipelines import srf2fastq
29 def main(cmdline=None):
30 parser = make_parser()
31 opts, args = parser.parse_args(cmdline)
34 logging.basicConfig(level = logging.DEBUG )
36 logging.basicConfig(level = logging.INFO )
38 logging.basicConfig(level = logging.WARNING )
40 apidata = api.make_auth_from_opts(opts, parser)
42 if opts.makeddf and opts.daf is None:
43 parser.error("Please specify your daf when making ddf files")
46 parser.error("I need at least one library submission-dir input file")
48 library_result_map = []
50 library_result_map.extend(read_library_result_map(a))
52 if opts.make_tree_from is not None:
53 make_tree_from(opts.make_tree_from, library_result_map)
55 if opts.daf is not None:
56 link_daf(opts.daf, library_result_map)
59 build_fastqs(opts.host,
66 make_submission_ini(opts.host, apidata, library_result_map)
69 make_all_ddfs(library_result_map, opts.daf, force=opts.force)
73 parser = OptionParser()
76 parser.add_option('--make-tree-from',
77 help="create directories & link data files",
79 parser.add_option('--fastq', help="generate scripts for making fastq files",
80 default=False, action="store_true")
82 parser.add_option('--ini', help="generate submission ini file", default=False,
85 parser.add_option('--makeddf', help='make the ddfs', default=False,
88 parser.add_option('--daf', default=None, help='specify daf name')
89 parser.add_option('--force', default=False, action="store_true",
90 help="Force regenerating fastqs")
93 parser.add_option('--verbose', default=False, action="store_true",
94 help='verbose logging')
95 parser.add_option('--debug', default=False, action="store_true",
98 api.add_auth_options(parser)
103 def make_tree_from(source_path, library_result_map):
104 """Create a tree using data files from source path.
106 for lib_id, lib_path in library_result_map:
107 if not os.path.exists(lib_path):
108 logging.info("Making dir {0}".format(lib_path))
110 source_lib_dir = os.path.join(source_path, lib_path)
111 if os.path.exists(source_lib_dir):
113 for filename in os.listdir(source_lib_dir):
114 source_pathname = os.path.join(source_lib_dir, filename)
115 target_pathname = os.path.join(lib_path, filename)
116 if not os.path.exists(source_pathname):
117 raise IOError("{0} does not exist".format(source_pathname))
118 if not os.path.exists(target_pathname):
119 os.symlink(source_pathname, target_pathname)
121 'LINK {0} to {1}'.format(source_pathname, target_pathname))
123 def build_fastqs(host, apidata, sequences_path, library_result_map,
126 Generate condor scripts to build any needed fastq files
129 host (str): root of the htsworkflow api server
130 apidata (dict): id & key to post to the server
131 sequences_path (str): root of the directory tree to scan for files
132 library_result_map (list): [(library_id, destination directory), ...]
134 qseq_condor_header = """
137 error=log/qseq2fastq.err.$(process).log
138 output=log/qseq2fastq.out.$(process).log
139 log=log/qseq2fastq.log
141 """ % {'exe': sys.executable }
142 qseq_condor_entries = []
143 srf_condor_header = """
146 output=log/srf_pair_fastq.out.$(process).log
147 error=log/srf_pair_fastq.err.$(process).log
148 log=log/srf_pair_fastq.log
149 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
151 """ % {'exe': sys.executable }
152 srf_condor_entries = []
153 lib_db = find_archive_sequence_files(host,
158 needed_targets = find_missing_targets(library_result_map, lib_db, force)
160 for target_pathname, available_sources in needed_targets.items():
161 logging.debug(' target : %s' % (target_pathname,))
162 logging.debug(' candidate sources: %s' % (available_sources,))
163 if available_sources.has_key('qseq'):
164 source = available_sources['qseq']
165 qseq_condor_entries.append(
166 condor_qseq_to_fastq(source.path,
171 elif available_sources.has_key('srf'):
172 source = available_sources['srf']
173 mid = getattr(source, 'mid_point', None)
174 srf_condor_entries.append(
175 condor_srf_to_fastq(source.path,
183 print " need file", target_pathname
185 if len(srf_condor_entries) > 0:
186 make_submit_script('srf.fastq.condor',
190 if len(qseq_condor_entries) > 0:
191 make_submit_script('qseq.fastq.condor',
196 def find_missing_targets(library_result_map, lib_db, force=False):
198 Check if the sequence file exists.
199 This requires computing what the sequence name is and checking
200 to see if it can be found in the sequence location.
202 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
204 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
205 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
206 # find what targets we're missing
208 for lib_id, result_dir in library_result_map:
210 lane_dict = make_lane_dict(lib_db, lib_id)
212 for lane_key, sequences in lib['lanes'].items():
213 for seq in sequences:
214 seq.paired = lane_dict[seq.flowcell]['paired_end']
215 lane_status = lane_dict[seq.flowcell]['status']
217 if seq.paired and seq.read is None:
219 filename_attributes = {
220 'flowcell': seq.flowcell,
227 if lane_status == 'Failed':
229 if seq.flowcell == '30DY0AAXX':
230 # 30DY0 only ran for 151 bases instead of 152
231 # it is actually 76 1st read, 75 2nd read
236 target_name = fastq_paired_template % filename_attributes
238 target_name = fastq_single_template % filename_attributes
240 target_pathname = os.path.join(result_dir, target_name)
241 if force or not os.path.exists(target_pathname):
242 t = needed_targets.setdefault(target_pathname, {})
243 t[seq.filetype] = seq
245 return needed_targets
248 def link_daf(daf_path, library_result_map):
249 if not os.path.exists(daf_path):
250 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
252 base_daf = os.path.basename(daf_path)
254 for lib_id, result_dir in library_result_map:
255 if not os.path.exists(result_dir):
256 raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
257 submission_daf = os.path.join(result_dir, base_daf)
258 if not os.path.exists(submission_daf):
259 if not os.path.exists(daf_path):
260 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
261 os.link(daf_path, submission_daf)
264 def make_submission_ini(host, apidata, library_result_map, paired=True):
265 #attributes = get_filename_attribute_map(paired)
266 view_map = NameToViewMap(host, apidata)
268 candidate_fastq_src = {}
270 for lib_id, result_dir in library_result_map:
271 order_by = ['order_by=files', 'view', 'replicate', 'cell',
272 'readType', 'mapAlgorithm', 'insertLength', 'md5sum' ]
273 inifile = ['[config]']
274 inifile += [" ".join(order_by)]
277 result_ini = os.path.join(result_dir, result_dir+'.ini')
280 submission_files = os.listdir(result_dir)
282 fastq_attributes = {}
283 for f in submission_files:
284 attributes = view_map.find_attributes(f, lib_id)
285 if attributes is None:
286 raise ValueError("Unrecognized file: %s" % (f,))
287 attributes['md5sum'] = "None"
289 ext = attributes["extension"]
290 if attributes['view'] is None:
292 elif attributes.get("type", None) == 'fastq':
293 fastqs.setdefault(ext, set()).add(f)
294 fastq_attributes[ext] = attributes
296 md5sum = make_md5sum(os.path.join(result_dir,f))
297 if md5sum is not None:
298 attributes['md5sum']=md5sum
300 make_submission_section(line_counter,
307 # add in fastqs on a single line.
309 for extension, fastq_files in fastqs.items():
311 make_submission_section(line_counter,
313 fastq_attributes[extension])
318 f = open(result_ini,'w')
319 f.write(os.linesep.join(inifile))
322 def make_lane_dict(lib_db, lib_id):
324 Convert the lane_set in a lib_db to a dictionary
325 indexed by flowcell ID
328 for lane in lib_db[lib_id]['lane_set']:
329 result.append((lane['flowcell'], lane))
333 def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False):
335 for lib_id, result_dir in library_result_map:
336 ininame = result_dir+'.ini'
337 inipathname = os.path.join(result_dir, ininame)
338 if os.path.exists(inipathname):
340 make_ddf(ininame, daf_name, True, make_condor, result_dir)
343 if make_condor and len(dag_fragment) > 0:
344 dag_filename = 'submission.dagman'
345 if not force and os.path.exists(dag_filename):
346 logging.warn("%s exists, please delete" % (dag_filename,))
348 f = open(dag_filename,'w')
349 f.write( os.linesep.join(dag_fragment))
350 f.write( os.linesep )
354 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
356 Make ddf files, and bonus condor file
360 if outdir is not None:
365 ddf_name = make_ddf_name(ininame)
367 output = open(ddf_name,'w')
369 file_list = read_ddf_ini(ininame, output)
371 "Read config {0}, found files: {1}".format(
372 ininame, ", ".join(file_list)))
374 file_list.append(daf_name)
375 if ddf_name is not None:
376 file_list.append(ddf_name)
379 archive_condor = make_condor_archive_script(ininame, file_list)
380 upload_condor = make_condor_upload_script(ininame)
382 dag_fragments.extend(
383 make_dag_fragment(ininame, archive_condor, upload_condor)
391 def read_ddf_ini(filename, output=sys.stdout):
393 Read a ini file and dump out a tab delmited text file
396 config = SafeConfigParser()
397 config.read(filename)
399 order_by = shlex.split(config.get("config", "order_by"))
401 output.write("\t".join(order_by))
402 output.write(os.linesep)
403 sections = config.sections()
405 for section in sections:
406 if section == "config":
407 # skip the config block
411 v = config.get(section, key)
414 file_list.extend(parse_filelist(v))
416 output.write("\t".join(values))
417 output.write(os.linesep)
421 def read_library_result_map(filename):
423 Read a file that maps library id to result directory.
424 Does not support spaces in filenames.
429 stream = open(filename,'r')
434 if not line.startswith('#') and len(line) > 0 :
435 library_id, result_dir = line.split()
436 results.append((library_id, result_dir))
440 def make_condor_archive_script(ininame, files):
441 script = """Universe = vanilla
443 Executable = /bin/tar
444 arguments = czvhf ../%(archivename)s %(filelist)s
446 Error = compress.err.$(Process).log
447 Output = compress.out.$(Process).log
448 Log = /tmp/submission-compress-%(user)s.log
449 initialdir = %(initialdir)s
450 environment="GZIP=-3"
456 if not os.path.exists(f):
457 raise RuntimeError("Missing %s" % (f,))
459 context = {'archivename': make_submission_name(ininame),
460 'filelist': " ".join(files),
461 'initialdir': os.getcwd(),
462 'user': os.getlogin()}
464 condor_script = make_condor_name(ininame, 'archive')
465 condor_stream = open(condor_script,'w')
466 condor_stream.write(script % context)
467 condor_stream.close()
471 def make_condor_upload_script(ininame):
472 script = """Universe = vanilla
474 Executable = /usr/bin/lftp
475 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
477 Error = upload.err.$(Process).log
478 Output = upload.out.$(Process).log
479 Log = /tmp/submission-upload-%(user)s.log
480 initialdir = %(initialdir)s
484 auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
486 encodeftp = 'encodeftp.cse.ucsc.edu'
487 ftpuser = auth.hosts[encodeftp][0]
488 ftppassword = auth.hosts[encodeftp][2]
489 context = {'archivename': make_submission_name(ininame),
490 'initialdir': os.getcwd(),
491 'user': os.getlogin(),
493 'ftppassword': ftppassword,
494 'ftphost': encodeftp}
496 condor_script = make_condor_name(ininame, 'upload')
497 condor_stream = open(condor_script,'w')
498 condor_stream.write(script % context)
499 condor_stream.close()
500 os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
505 def make_dag_fragment(ininame, archive_condor, upload_condor):
507 Make the couple of fragments compress and then upload the data.
509 cur_dir = os.getcwd()
510 archive_condor = os.path.join(cur_dir, archive_condor)
511 upload_condor = os.path.join(cur_dir, upload_condor)
512 job_basename = make_base_name(ininame)
515 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
516 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
517 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
522 def get_library_info(host, apidata, library_id):
523 url = api.library_url(host, library_id)
524 contents = api.retrieve_info(url, apidata)
528 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
529 mid=None, force=False):
530 py = srf2fastq.__file__
531 args = [ py, srf_file, ]
533 args.extend(['--left', target_pathname])
534 # this is ugly. I did it because I was pregenerating the target
535 # names before I tried to figure out what sources could generate
536 # those targets, and everything up to this point had been
537 # one-to-one. So I couldn't figure out how to pair the
539 # With this at least the command will run correctly.
540 # however if we rename the default targets, this'll break
541 # also I think it'll generate it twice.
542 args.extend(['--right',
543 target_pathname.replace('_r1.fastq', '_r2.fastq')])
545 args.extend(['--single', target_pathname ])
546 if flowcell is not None:
547 args.extend(['--flowcell', flowcell])
550 args.extend(['-m', str(mid)])
553 args.extend(['--force'])
558 """ % (" ".join(args),)
563 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
564 py = qseq2fastq.__file__
565 args = [py, '-i', qseq_file, '-o', target_pathname ]
566 if flowcell is not None:
567 args.extend(['-f', flowcell])
571 """ % (" ".join(args))
575 def find_archive_sequence_files(host, apidata, sequences_path,
578 Find all the archive sequence files possibly associated with our results.
581 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
585 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
587 for lib_id, result_dir in library_result_map:
588 lib_info = get_library_info(host, apidata, lib_id)
589 lib_info['lanes'] = {}
590 lib_db[lib_id] = lib_info
592 for lane in lib_info['lane_set']:
593 lane_key = (lane['flowcell'], lane['lane_number'])
594 candidate_lanes[lane_key] = lib_id
595 seq_dirs.add(os.path.join(sequences_path,
598 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
599 candidate_seq_list = scan_for_sequences(seq_dirs)
601 # at this point we have too many sequences as scan_for_sequences
602 # returns all the sequences in a flowcell directory
603 # so lets filter out the extras
605 for seq in candidate_seq_list:
606 lane_key = (seq.flowcell, seq.lane)
607 lib_id = candidate_lanes.get(lane_key, None)
608 if lib_id is not None:
609 lib_info = lib_db[lib_id]
610 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
615 class NameToViewMap(object):
616 """Determine view attributes for a given submission file name
618 def __init__(self, root_url, apidata):
619 self.root_url = root_url
620 self.apidata = apidata
624 # ma is "map algorithm"
629 ('*.splices.bam', 'Splices'),
630 ('*.bam', self._guess_bam_view),
631 ('junctions.bed', 'Junctions'),
632 ('*.jnct', 'Junctions'),
633 ('*unique.bigwig', None),
634 ('*plus.bigwig', 'PlusSignal'),
635 ('*minus.bigwig', 'MinusSignal'),
636 ('*.bigwig', 'Signal'),
641 ('*.?ufflinks-0.9.0?genes.expr', 'GeneDeNovo'),
642 ('*.?ufflinks-0.9.0?transcripts.expr', 'TranscriptDeNovo'),
643 ('*.?ufflinks-0.9.0?transcripts.gtf', 'GeneModel'),
644 ('*.GENCODE-v3c?genes.expr', 'GeneGCV3c'),
645 ('*.GENCODE-v3c?transcript*.expr', 'TranscriptGCV3c'),
646 ('*.GENCODE-v3c?transcript*.gtf', 'TranscriptGencV3c'),
647 ('*.GENCODE-v4?genes.expr', None), #'GeneGCV4'),
648 ('*.GENCODE-v4?transcript*.expr', None), #'TranscriptGCV4'),
649 ('*.GENCODE-v4?transcript*.gtf', None), #'TranscriptGencV4'),
650 ('*_1.75mers.fastq', 'FastqRd1'),
651 ('*_2.75mers.fastq', 'FastqRd2'),
652 ('*_r1.fastq', 'FastqRd1'),
653 ('*_r2.fastq', 'FastqRd2'),
654 ('*.fastq', 'Fastq'),
655 ('*.gtf', 'GeneModel'),
659 ('paired-end-distribution*', 'InsLength'),
660 ('*.stats.txt', 'InsLength'),
667 None: {"MapAlgorithm": "NA"},
668 "Paired": {"MapAlgorithm": ma},
669 "Aligns": {"MapAlgorithm": ma},
670 "Single": {"MapAlgorithm": ma},
671 "Splices": {"MapAlgorithm": ma},
672 "Junctions": {"MapAlgorithm": ma},
673 "PlusSignal": {"MapAlgorithm": ma},
674 "MinusSignal": {"MapAlgorithm": ma},
675 "Signal": {"MapAlgorithm": ma},
676 "GeneModel": {"MapAlgorithm": ma},
677 "GeneDeNovo": {"MapAlgorithm": ma},
678 "TranscriptDeNovo": {"MapAlgorithm": ma},
679 "GeneGCV3c": {"MapAlgorithm": ma},
680 "TranscriptGCV3c": {"MapAlgorithm": ma},
681 "TranscriptGencV3c": {"MapAlgorithm": ma},
682 "GeneGCV4": {"MapAlgorithm": ma},
683 "TranscriptGCV4": {"MapAlgorithm": ma},
684 "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"},
685 "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"},
686 "Fastq": {"MapAlgorithm": "NA", "type": "fastq" },
687 "InsLength": {"MapAlgorithm": ma},
689 # view name is one of the attributes
690 for v in self.views.keys():
691 self.views[v]['view'] = v
693 def find_attributes(self, pathname, lib_id):
694 """Looking for the best extension
695 The 'best' is the longest match
698 filename (str): the filename whose extention we are about to examine
700 path, filename = os.path.splitext(pathname)
701 if not self.lib_cache.has_key(lib_id):
702 self.lib_cache[lib_id] = get_library_info(self.root_url,
703 self.apidata, lib_id)
705 lib_info = self.lib_cache[lib_id]
706 if lib_info['cell_line'].lower() == 'unknown':
707 logging.warn("Library %s missing cell_line" % (lib_id,))
709 'cell': lib_info['cell_line'],
710 'replicate': lib_info['replicate'],
712 is_paired = self._is_paired(lib_id, lib_info)
715 attributes.update(self.get_paired_attributes(lib_info))
717 attributes.update(self.get_single_attributes(lib_info))
719 for pattern, view in self.patterns:
720 if fnmatch.fnmatch(pathname, pattern):
722 view = view(is_paired=is_paired)
724 attributes.update(self.views[view])
725 attributes["extension"] = pattern
729 def _guess_bam_view(self, is_paired=True):
730 """Guess a view name based on library attributes
738 def _is_paired(self, lib_id, lib_info):
739 """Determine if a library is paired end"""
740 if len(lib_info["lane_set"]) == 0:
743 if not self.lib_paired.has_key(lib_id):
747 # check to see if all the flowcells are the same.
748 # otherwise we might need to do something complicated
749 for flowcell in lib_info["lane_set"]:
750 # yes there's also a status code, but this comparison
752 if flowcell["status"].lower() == "failed":
753 # ignore failed flowcell
756 elif flowcell["paired_end"]:
761 logging.debug("Library %s: %d paired, %d single, %d failed" % \
762 (lib_info["library_id"], is_paired, isnot_paired, failed))
764 if is_paired > isnot_paired:
765 self.lib_paired[lib_id] = True
766 elif is_paired < isnot_paired:
767 self.lib_paired[lib_id] = False
769 raise RuntimeError("Equal number of paired & unpaired lanes."\
770 "Can't guess library paired status")
772 return self.lib_paired[lib_id]
774 def get_paired_attributes(self, lib_info):
775 if lib_info['insert_size'] is None:
776 errmsg = "Library %s is missing insert_size, assuming 200"
777 logging.warn(errmsg % (lib_info["library_id"],))
780 insert_size = lib_info['insert_size']
781 return {'insertLength': insert_size,
784 def get_single_attributes(self, lib_info):
785 return {'insertLength':'ilNA',
789 def make_submission_section(line_counter, files, attributes):
791 Create a section in the submission ini file
793 inifile = [ "[line%s]" % (line_counter,) ]
794 inifile += ["files=%s" % (",".join(files))]
796 for k,v in attributes.items():
797 inifile += ["%s=%s" % (k,v)]
801 def make_base_name(pathname):
802 base = os.path.basename(pathname)
803 name, ext = os.path.splitext(base)
807 def make_submission_name(ininame):
808 name = make_base_name(ininame)
812 def make_ddf_name(pathname):
813 name = make_base_name(pathname)
817 def make_condor_name(pathname, run_type=None):
818 name = make_base_name(pathname)
820 if run_type is not None:
821 elements.append(run_type)
822 elements.append("condor")
823 return ".".join(elements)
826 def make_submit_script(target, header, body_list):
828 write out a text file
830 this was intended for condor submit scripts
833 target (str or stream):
834 if target is a string, we will open and close the file
835 if target is a stream, the caller is responsible.
838 header to write at the beginning of the file
839 body_list (list of strs):
840 a list of blocks to add to the file.
842 if type(target) in types.StringTypes:
847 for entry in body_list:
849 if type(target) in types.StringTypes:
852 def parse_filelist(file_string):
853 return file_string.split(",")
856 def validate_filelist(files):
858 Die if a file doesn't exist in a file list
861 if not os.path.exists(f):
862 raise RuntimeError("%s does not exist" % (f,))
864 def make_md5sum(filename):
865 """Quickly find the md5sum of a file
867 md5_cache = os.path.join(filename+".md5")
869 if os.path.exists(md5_cache):
870 logging.debug("Found md5sum in {0}".format(md5_cache))
871 stream = open(md5_cache,'r')
872 lines = stream.readlines()
873 md5sum = parse_md5sum_line(lines, filename)
875 md5sum = make_md5sum_unix(filename, md5_cache)
878 def make_md5sum_unix(filename, md5_cache):
879 cmd = ["md5sum", filename]
880 logging.debug("Running {0}".format(" ".join(cmd)))
881 p = Popen(cmd, stdout=PIPE)
882 stdin, stdout = p.communicate()
884 logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
886 logging.error("Trouble with md5sum for {0}".format(filename))
888 lines = stdin.split(os.linesep)
889 md5sum = parse_md5sum_line(lines, filename)
890 if md5sum is not None:
891 logging.debug("Caching sum in {0}".format(md5_cache))
892 stream = open(md5_cache, "w")
897 def parse_md5sum_line(lines, filename):
898 md5sum, md5sum_filename = lines[0].split()
899 if md5sum_filename != filename:
900 errmsg = "MD5sum and I disagre about filename. {0} != {1}"
901 logging.error(errmsg.format(filename, md5sum_filename))
905 if __name__ == "__main__":