2 from ConfigParser import SafeConfigParser
8 from optparse import OptionParser
10 from pprint import pprint, pformat
12 from StringIO import StringIO
14 from subprocess import Popen, PIPE
22 from htsworkflow.util import api
23 from htsworkflow.submission.condorfastq import CondorFastqExtract
25 def main(cmdline=None):
26 parser = make_parser()
27 opts, args = parser.parse_args(cmdline)
30 logging.basicConfig(level = logging.DEBUG )
32 logging.basicConfig(level = logging.INFO )
34 logging.basicConfig(level = logging.WARNING )
36 apidata = api.make_auth_from_opts(opts, parser)
38 if opts.makeddf and opts.daf is None:
39 parser.error("Please specify your daf when making ddf files")
42 parser.error("I need at least one library submission-dir input file")
44 library_result_map = []
46 library_result_map.extend(read_library_result_map(a))
48 if opts.make_tree_from is not None:
49 make_tree_from(opts.make_tree_from, library_result_map)
51 if opts.daf is not None:
52 link_daf(opts.daf, library_result_map)
55 extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
57 extractor.build_fastqs(library_result_map)
60 make_submission_ini(opts.host, apidata, library_result_map)
63 make_all_ddfs(library_result_map, opts.daf, force=opts.force)
67 parser = OptionParser()
70 parser.add_option('--make-tree-from',
71 help="create directories & link data files",
73 parser.add_option('--fastq', help="generate scripts for making fastq files",
74 default=False, action="store_true")
76 parser.add_option('--ini', help="generate submission ini file", default=False,
79 parser.add_option('--makeddf', help='make the ddfs', default=False,
82 parser.add_option('--daf', default=None, help='specify daf name')
83 parser.add_option('--force', default=False, action="store_true",
84 help="Force regenerating fastqs")
87 parser.add_option('--verbose', default=False, action="store_true",
88 help='verbose logging')
89 parser.add_option('--debug', default=False, action="store_true",
92 api.add_auth_options(parser)
97 def make_tree_from(source_path, library_result_map):
98 """Create a tree using data files from source path.
100 for lib_id, lib_path in library_result_map:
101 if not os.path.exists(lib_path):
102 logging.info("Making dir {0}".format(lib_path))
104 source_lib_dir = os.path.join(source_path, lib_path)
105 if os.path.exists(source_lib_dir):
107 for filename in os.listdir(source_lib_dir):
108 source_pathname = os.path.join(source_lib_dir, filename)
109 target_pathname = os.path.join(lib_path, filename)
110 if not os.path.exists(source_pathname):
111 raise IOError("{0} does not exist".format(source_pathname))
112 if not os.path.exists(target_pathname):
113 os.symlink(source_pathname, target_pathname)
115 'LINK {0} to {1}'.format(source_pathname, target_pathname))
118 def link_daf(daf_path, library_result_map):
119 if not os.path.exists(daf_path):
120 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
122 base_daf = os.path.basename(daf_path)
124 for lib_id, result_dir in library_result_map:
125 if not os.path.exists(result_dir):
126 raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
127 submission_daf = os.path.join(result_dir, base_daf)
128 if not os.path.exists(submission_daf):
129 if not os.path.exists(daf_path):
130 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
131 os.link(daf_path, submission_daf)
134 def make_submission_ini(host, apidata, library_result_map, paired=True):
135 #attributes = get_filename_attribute_map(paired)
136 view_map = NameToViewMap(host, apidata)
138 candidate_fastq_src = {}
140 for lib_id, result_dir in library_result_map:
141 order_by = ['order_by=files', 'view', 'replicate', 'cell',
142 'readType', 'mapAlgorithm', 'insertLength', 'md5sum' ]
143 inifile = ['[config]']
144 inifile += [" ".join(order_by)]
147 result_ini = os.path.join(result_dir, result_dir+'.ini')
150 submission_files = os.listdir(result_dir)
152 fastq_attributes = {}
153 for f in submission_files:
154 attributes = view_map.find_attributes(f, lib_id)
155 if attributes is None:
156 raise ValueError("Unrecognized file: %s" % (f,))
157 attributes['md5sum'] = "None"
159 ext = attributes["extension"]
160 if attributes['view'] is None:
162 elif attributes.get("type", None) == 'fastq':
163 fastqs.setdefault(ext, set()).add(f)
164 fastq_attributes[ext] = attributes
166 md5sum = make_md5sum(os.path.join(result_dir,f))
167 if md5sum is not None:
168 attributes['md5sum']=md5sum
170 make_submission_section(line_counter,
177 # add in fastqs on a single line.
179 for extension, fastq_files in fastqs.items():
181 make_submission_section(line_counter,
183 fastq_attributes[extension])
188 f = open(result_ini,'w')
189 f.write(os.linesep.join(inifile))
192 def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False):
194 for lib_id, result_dir in library_result_map:
195 ininame = result_dir+'.ini'
196 inipathname = os.path.join(result_dir, ininame)
197 if os.path.exists(inipathname):
199 make_ddf(ininame, daf_name, True, make_condor, result_dir)
202 if make_condor and len(dag_fragment) > 0:
203 dag_filename = 'submission.dagman'
204 if not force and os.path.exists(dag_filename):
205 logging.warn("%s exists, please delete" % (dag_filename,))
207 f = open(dag_filename,'w')
208 f.write( os.linesep.join(dag_fragment))
209 f.write( os.linesep )
213 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
215 Make ddf files, and bonus condor file
219 if outdir is not None:
224 ddf_name = make_ddf_name(ininame)
226 output = open(ddf_name,'w')
228 file_list = read_ddf_ini(ininame, output)
230 "Read config {0}, found files: {1}".format(
231 ininame, ", ".join(file_list)))
233 file_list.append(daf_name)
234 if ddf_name is not None:
235 file_list.append(ddf_name)
238 archive_condor = make_condor_archive_script(ininame, file_list)
239 upload_condor = make_condor_upload_script(ininame)
241 dag_fragments.extend(
242 make_dag_fragment(ininame, archive_condor, upload_condor)
250 def read_ddf_ini(filename, output=sys.stdout):
252 Read a ini file and dump out a tab delmited text file
255 config = SafeConfigParser()
256 config.read(filename)
258 order_by = shlex.split(config.get("config", "order_by"))
260 output.write("\t".join(order_by))
261 output.write(os.linesep)
262 sections = config.sections()
264 for section in sections:
265 if section == "config":
266 # skip the config block
270 v = config.get(section, key)
273 file_list.extend(parse_filelist(v))
275 output.write("\t".join(values))
276 output.write(os.linesep)
280 def read_library_result_map(filename):
282 Read a file that maps library id to result directory.
283 Does not support spaces in filenames.
288 stream = open(filename,'r')
293 if not line.startswith('#') and len(line) > 0 :
294 library_id, result_dir = line.split()
295 results.append((library_id, result_dir))
299 def make_condor_archive_script(ininame, files):
300 script = """Universe = vanilla
302 Executable = /bin/tar
303 arguments = czvhf ../%(archivename)s %(filelist)s
305 Error = compress.err.$(Process).log
306 Output = compress.out.$(Process).log
307 Log = /tmp/submission-compress-%(user)s.log
308 initialdir = %(initialdir)s
309 environment="GZIP=-3"
315 if not os.path.exists(f):
316 raise RuntimeError("Missing %s" % (f,))
318 context = {'archivename': make_submission_name(ininame),
319 'filelist': " ".join(files),
320 'initialdir': os.getcwd(),
321 'user': os.getlogin()}
323 condor_script = make_condor_name(ininame, 'archive')
324 condor_stream = open(condor_script,'w')
325 condor_stream.write(script % context)
326 condor_stream.close()
330 def make_condor_upload_script(ininame):
331 script = """Universe = vanilla
333 Executable = /usr/bin/lftp
334 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
336 Error = upload.err.$(Process).log
337 Output = upload.out.$(Process).log
338 Log = /tmp/submission-upload-%(user)s.log
339 initialdir = %(initialdir)s
343 auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
345 encodeftp = 'encodeftp.cse.ucsc.edu'
346 ftpuser = auth.hosts[encodeftp][0]
347 ftppassword = auth.hosts[encodeftp][2]
348 context = {'archivename': make_submission_name(ininame),
349 'initialdir': os.getcwd(),
350 'user': os.getlogin(),
352 'ftppassword': ftppassword,
353 'ftphost': encodeftp}
355 condor_script = make_condor_name(ininame, 'upload')
356 condor_stream = open(condor_script,'w')
357 condor_stream.write(script % context)
358 condor_stream.close()
359 os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
364 def make_dag_fragment(ininame, archive_condor, upload_condor):
366 Make the couple of fragments compress and then upload the data.
368 cur_dir = os.getcwd()
369 archive_condor = os.path.join(cur_dir, archive_condor)
370 upload_condor = os.path.join(cur_dir, upload_condor)
371 job_basename = make_base_name(ininame)
374 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
375 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
376 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
381 def get_library_info(host, apidata, library_id):
382 url = api.library_url(host, library_id)
383 contents = api.retrieve_info(url, apidata)
387 class NameToViewMap(object):
388 """Determine view attributes for a given submission file name
390 def __init__(self, root_url, apidata):
391 self.root_url = root_url
392 self.apidata = apidata
396 # ma is "map algorithm"
400 # for 2011 Feb 18 elements submission
401 ('final_Cufflinks_genes_*gtf', 'GeneDeNovo'),
402 ('final_Cufflinks_transcripts_*gtf', 'TranscriptDeNovo'),
403 ('final_exonFPKM-Cufflinks-0.9.3-GENCODE-v3c-*.gtf',
405 ('final_GENCODE-v3-Cufflinks-0.9.3.genes-*gtf',
407 ('final_GENCODE-v3-Cufflinks-0.9.3.transcripts-*gtf',
408 'TranscriptGencV3c'),
409 ('final_TSS-Cufflinks-0.9.3-GENCODE-v3c-*.gtf', 'TSS'),
410 ('final_junctions-*.bed6+3', 'Junctions'),
413 ('*.splices.bam', 'Splices'),
414 ('*.bam', self._guess_bam_view),
415 ('junctions.bed', 'Junctions'),
416 ('*.jnct', 'Junctions'),
417 ('*unique.bigwig', None),
418 ('*plus.bigwig', 'PlusSignal'),
419 ('*minus.bigwig', 'MinusSignal'),
420 ('*.bigwig', 'Signal'),
426 ('*ufflinks?0.9.3.genes.gtf', 'GeneDeNovo'),
427 ('*ufflinks?0.9.3.transcripts.gtf', 'TranscriptDeNovo'),
428 ('*GENCODE-v3c.exonFPKM.gtf', 'ExonsGencV3c'),
429 ('*GENCODE-v3c.genes.gtf', 'GeneGencV3c'),
430 ('*GENCODE-v3c.transcripts.gtf', 'TranscriptGencV3c'),
431 ('*GENCODE-v3c.TSS.gtf', 'TSS'),
432 ('*.junctions.bed6+3', 'Junctions'),
434 ('*.?ufflinks-0.9.0?genes.expr', 'GeneDeNovo'),
435 ('*.?ufflinks-0.9.0?transcripts.expr', 'TranscriptDeNovo'),
436 ('*.?ufflinks-0.9.0?transcripts.gtf', 'GeneModel'),
438 ('*.GENCODE-v3c?genes.expr', 'GeneGCV3c'),
439 ('*.GENCODE-v3c?transcript*.expr', 'TranscriptGCV3c'),
440 ('*.GENCODE-v3c?transcript*.gtf', 'TranscriptGencV3c'),
441 ('*.GENCODE-v4?genes.expr', None), #'GeneGCV4'),
442 ('*.GENCODE-v4?transcript*.expr', None), #'TranscriptGCV4'),
443 ('*.GENCODE-v4?transcript*.gtf', None), #'TranscriptGencV4'),
444 ('*_1.75mers.fastq', 'FastqRd1'),
445 ('*_2.75mers.fastq', 'FastqRd2'),
446 ('*_r1.fastq', 'FastqRd1'),
447 ('*_r2.fastq', 'FastqRd2'),
448 ('*.fastq', 'Fastq'),
449 ('*.gtf', 'GeneModel'),
453 ('paired-end-distribution*', 'InsLength'),
454 ('*.stats.txt', 'InsLength'),
458 ('transfer_log', None),
462 None: {"MapAlgorithm": "NA"},
463 "Paired": {"MapAlgorithm": ma},
464 "Aligns": {"MapAlgorithm": ma},
465 "Single": {"MapAlgorithm": ma},
466 "Splices": {"MapAlgorithm": ma},
467 "Junctions": {"MapAlgorithm": ma},
468 "PlusSignal": {"MapAlgorithm": ma},
469 "MinusSignal": {"MapAlgorithm": ma},
470 "Signal": {"MapAlgorithm": ma},
471 "GeneModel": {"MapAlgorithm": ma},
472 "GeneDeNovo": {"MapAlgorithm": ma},
473 "TranscriptDeNovo": {"MapAlgorithm": ma},
474 "ExonsGencV3c": {"MapAlgorithm": ma},
475 "GeneGencV3c": {"MapAlgorithm": ma},
476 "TSS": {"MapAlgorithm": ma},
477 "GeneGCV3c": {"MapAlgorithm": ma},
478 "TranscriptGCV3c": {"MapAlgorithm": ma},
479 "TranscriptGencV3c": {"MapAlgorithm": ma},
480 "GeneGCV4": {"MapAlgorithm": ma},
481 "TranscriptGCV4": {"MapAlgorithm": ma},
482 "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"},
483 "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"},
484 "Fastq": {"MapAlgorithm": "NA", "type": "fastq" },
485 "InsLength": {"MapAlgorithm": ma},
487 # view name is one of the attributes
488 for v in self.views.keys():
489 self.views[v]['view'] = v
491 def find_attributes(self, pathname, lib_id):
492 """Looking for the best extension
493 The 'best' is the longest match
496 filename (str): the filename whose extention we are about to examine
498 path, filename = os.path.splitext(pathname)
499 if not self.lib_cache.has_key(lib_id):
500 self.lib_cache[lib_id] = get_library_info(self.root_url,
501 self.apidata, lib_id)
503 lib_info = self.lib_cache[lib_id]
504 if lib_info['cell_line'].lower() == 'unknown':
505 logging.warn("Library %s missing cell_line" % (lib_id,))
507 'cell': lib_info['cell_line'],
508 'replicate': lib_info['replicate'],
510 is_paired = self._is_paired(lib_id, lib_info)
513 attributes.update(self.get_paired_attributes(lib_info))
515 attributes.update(self.get_single_attributes(lib_info))
517 for pattern, view in self.patterns:
518 if fnmatch.fnmatch(pathname, pattern):
520 view = view(is_paired=is_paired)
522 attributes.update(self.views[view])
523 attributes["extension"] = pattern
527 def _guess_bam_view(self, is_paired=True):
528 """Guess a view name based on library attributes
536 def _is_paired(self, lib_id, lib_info):
537 """Determine if a library is paired end"""
538 # TODO: encode this information in the library type page.
540 if len(lib_info["lane_set"]) == 0:
541 # we haven't sequenced anything so guess based on library type
542 if lib_info['library_type_id'] in single:
547 if not self.lib_paired.has_key(lib_id):
551 # check to see if all the flowcells are the same.
552 # otherwise we might need to do something complicated
553 for flowcell in lib_info["lane_set"]:
554 # yes there's also a status code, but this comparison
556 if flowcell["status"].lower() == "failed":
557 # ignore failed flowcell
560 elif flowcell["paired_end"]:
565 logging.debug("Library %s: %d paired, %d single, %d failed" % \
566 (lib_info["library_id"], is_paired, isnot_paired, failed))
568 if is_paired > isnot_paired:
569 self.lib_paired[lib_id] = True
570 elif is_paired < isnot_paired:
571 self.lib_paired[lib_id] = False
573 raise RuntimeError("Equal number of paired & unpaired lanes."\
574 "Can't guess library paired status")
576 return self.lib_paired[lib_id]
578 def get_paired_attributes(self, lib_info):
579 if lib_info['insert_size'] is None:
580 errmsg = "Library %s is missing insert_size, assuming 200"
581 logging.warn(errmsg % (lib_info["library_id"],))
584 insert_size = lib_info['insert_size']
585 return {'insertLength': insert_size,
588 def get_single_attributes(self, lib_info):
589 return {'insertLength':'ilNA',
593 def make_submission_section(line_counter, files, attributes):
595 Create a section in the submission ini file
597 inifile = [ "[line%s]" % (line_counter,) ]
598 inifile += ["files=%s" % (",".join(files))]
600 for k,v in attributes.items():
601 inifile += ["%s=%s" % (k,v)]
605 def make_base_name(pathname):
606 base = os.path.basename(pathname)
607 name, ext = os.path.splitext(base)
611 def make_submission_name(ininame):
612 name = make_base_name(ininame)
616 def make_ddf_name(pathname):
617 name = make_base_name(pathname)
621 def make_condor_name(pathname, run_type=None):
622 name = make_base_name(pathname)
624 if run_type is not None:
625 elements.append(run_type)
626 elements.append("condor")
627 return ".".join(elements)
630 def parse_filelist(file_string):
631 return file_string.split(",")
634 def validate_filelist(files):
636 Die if a file doesn't exist in a file list
639 if not os.path.exists(f):
640 raise RuntimeError("%s does not exist" % (f,))
642 def make_md5sum(filename):
643 """Quickly find the md5sum of a file
645 md5_cache = os.path.join(filename+".md5")
647 if os.path.exists(md5_cache):
648 logging.debug("Found md5sum in {0}".format(md5_cache))
649 stream = open(md5_cache,'r')
650 lines = stream.readlines()
651 md5sum = parse_md5sum_line(lines, filename)
653 md5sum = make_md5sum_unix(filename, md5_cache)
656 def make_md5sum_unix(filename, md5_cache):
657 cmd = ["md5sum", filename]
658 logging.debug("Running {0}".format(" ".join(cmd)))
659 p = Popen(cmd, stdout=PIPE)
660 stdin, stdout = p.communicate()
662 logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
664 logging.error("Trouble with md5sum for {0}".format(filename))
666 lines = stdin.split(os.linesep)
667 md5sum = parse_md5sum_line(lines, filename)
668 if md5sum is not None:
669 logging.debug("Caching sum in {0}".format(md5_cache))
670 stream = open(md5_cache, "w")
675 def parse_md5sum_line(lines, filename):
676 md5sum, md5sum_filename = lines[0].split()
677 if md5sum_filename != filename:
678 errmsg = "MD5sum and I disagre about filename. {0} != {1}"
679 logging.error(errmsg.format(filename, md5sum_filename))
683 if __name__ == "__main__":