2 from ConfigParser import SafeConfigParser
8 from optparse import OptionParser, OptionGroup
10 from pprint import pprint, pformat
12 from StringIO import StringIO
14 from subprocess import Popen, PIPE
22 from htsworkflow.util import api
23 from htsworkflow.util.rdfhelp import \
30 from htsworkflow.submission.daf import \
32 MetadataLookupException, \
34 from htsworkflow.submission.condorfastq import CondorFastqExtract
36 logger = logging.getLogger('ucsc_gather')
38 def main(cmdline=None):
39 parser = make_parser()
40 opts, args = parser.parse_args(cmdline)
43 logging.basicConfig(level = logging.DEBUG )
45 logging.basicConfig(level = logging.INFO )
47 logging.basicConfig(level = logging.WARNING )
49 apidata = api.make_auth_from_opts(opts, parser)
51 model = get_model(opts.load_model)
52 mapper = DAFMapper(opts.name, opts.daf, model)
53 submission_uri = get_submission_uri(opts.name)
55 if opts.library_url is not None:
56 mapper.library_url = opts.library_url
58 if opts.load_rdf is not None:
59 load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
61 if opts.make_ddf and opts.daf is None:
62 parser.error("Please specify your daf when making ddf files")
65 parser.error("I need at least one library submission-dir input file")
67 library_result_map = []
69 library_result_map.extend(read_library_result_map(a))
71 if opts.make_tree_from is not None:
72 make_tree_from(opts.make_tree_from, library_result_map)
75 link_daf(opts.daf, library_result_map)
78 extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
80 extractor.build_fastqs(library_result_map)
82 if opts.scan_submission:
83 scan_submission_dirs(mapper, library_result_map)
86 make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
89 writer = get_serializer()
90 print writer.serialize_model_to_string(model)
94 parser = OptionParser()
96 model = OptionGroup(parser, 'model')
97 model.add_option('--name', help="Set submission name")
98 model.add_option('--load-model', default=None,
99 help="Load model database")
100 model.add_option('--load-rdf', default=None,
101 help="load rdf statements into model")
102 model.add_option('--print-rdf', action="store_true", default=False,
103 help="print ending model state")
104 parser.add_option_group(model)
106 commands = OptionGroup(parser, 'commands')
107 commands.add_option('--make-tree-from',
108 help="create directories & link data files",
110 commands.add_option('--fastq', default=False, action="store_true",
111 help="generate scripts for making fastq files")
112 commands.add_option('--scan-submission', default=False, action="store_true",
113 help="Import metadata for submission into our model")
114 commands.add_option('--link-daf', default=False, action="store_true",
115 help="link daf into submission directories")
116 commands.add_option('--make-ddf', help='make the ddfs', default=False,
118 parser.add_option_group(commands)
120 parser.add_option('--force', default=False, action="store_true",
121 help="Force regenerating fastqs")
122 parser.add_option('--daf', default=None, help='specify daf name')
123 parser.add_option('--library-url', default=None,
124 help="specify an alternate source for library information")
126 parser.add_option('--verbose', default=False, action="store_true",
127 help='verbose logging')
128 parser.add_option('--debug', default=False, action="store_true",
129 help='debug logging')
131 api.add_auth_options(parser)
135 def make_tree_from(source_path, library_result_map):
136 """Create a tree using data files from source path.
138 for lib_id, lib_path in library_result_map:
139 if not os.path.exists(lib_path):
140 logging.info("Making dir {0}".format(lib_path))
142 source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
143 if os.path.exists(source_lib_dir):
145 for filename in os.listdir(source_lib_dir):
146 source_pathname = os.path.join(source_lib_dir, filename)
147 target_pathname = os.path.join(lib_path, filename)
148 if not os.path.exists(source_pathname):
149 raise IOError("{0} does not exist".format(source_pathname))
150 if not os.path.exists(target_pathname):
151 os.symlink(source_pathname, target_pathname)
153 'LINK {0} to {1}'.format(source_pathname, target_pathname))
156 def link_daf(daf_path, library_result_map):
157 if not os.path.exists(daf_path):
158 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
160 base_daf = os.path.basename(daf_path)
162 for lib_id, result_dir in library_result_map:
163 if not os.path.exists(result_dir):
164 raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
165 submission_daf = os.path.join(result_dir, base_daf)
166 if not os.path.exists(submission_daf):
167 if not os.path.exists(daf_path):
168 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
169 os.link(daf_path, submission_daf)
172 def scan_submission_dirs(view_map, library_result_map):
173 """Look through our submission directories and collect needed information
175 for lib_id, result_dir in library_result_map:
176 logging.info("Importing %s from %s" % (lib_id, result_dir))
178 view_map.import_submission_dir(result_dir, lib_id)
179 except MetadataLookupException, e:
180 logging.error("Skipping %s: %s" % (lib_id, str(e)))
182 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
184 for lib_id, result_dir in library_result_map:
185 submissionNode = view_map.get_submission_node(result_dir)
187 make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
190 if make_condor and len(dag_fragment) > 0:
191 dag_filename = 'submission.dagman'
192 if not force and os.path.exists(dag_filename):
193 logging.warn("%s exists, please delete" % (dag_filename,))
195 f = open(dag_filename,'w')
196 f.write( os.linesep.join(dag_fragment))
197 f.write( os.linesep )
201 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
203 Make ddf files, and bonus condor file
207 name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
209 logging.error("Need name for %s" % (str(submissionNode)))
212 ddf_name = name + '.ddf'
213 if outdir is not None:
214 outfile = os.path.join(outdir, ddf_name)
215 output = open(outfile,'w')
219 # filename goes first
220 variables = ['filename']
221 variables.extend(view_map.get_daf_variables())
222 output.write('\t'.join(variables))
223 output.write(os.linesep)
225 nameTerm = dafTermOntology['name']
227 submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
229 for viewNode in submission_views:
231 for variable_name in variables:
232 varNode = dafTermOntology[variable_name]
233 values = list(view_map.model.get_targets(viewNode, varNode))
235 if variable_name == 'view':
236 nameNode = view_map.model.get_target(values[0], nameTerm)
237 values = [fromTypedNode(nameNode)]
239 values = [ fromTypedNode(v) for v in values ]
240 if variable_name == 'filename':
241 file_list.extend(values)
245 elif len(values) == 1:
246 attribute = values[0]
248 attribute = ",".join(values)
249 record.append(attribute)
250 output.write('\t'.join(record))
251 output.write(os.linesep)
254 "Examined {0}, found files: {1}".format(
255 str(submissionNode), ", ".join(file_list)))
257 file_list.append(daf_name)
258 file_list.append(ddf_name)
261 print name, file_list
262 archive_condor = make_condor_archive_script(name, file_list, outdir)
263 upload_condor = make_condor_upload_script(name, outdir)
265 dag_fragments.extend(
266 make_dag_fragment(name, archive_condor, upload_condor)
272 def read_library_result_map(filename):
274 Read a file that maps library id to result directory.
275 Does not support spaces in filenames.
280 stream = open(filename,'r')
285 if not line.startswith('#') and len(line) > 0 :
286 library_id, result_dir = line.split()
287 results.append((library_id, result_dir))
291 def make_condor_archive_script(name, files, outdir=None):
292 script = """Universe = vanilla
294 Executable = /bin/tar
295 arguments = czvhf ../%(archivename)s %(filelist)s
297 Error = compress.err.$(Process).log
298 Output = compress.out.$(Process).log
299 Log = /tmp/submission-compress-%(user)s.log
300 initialdir = %(initialdir)s
301 environment="GZIP=-3"
309 pathname = os.path.join(outdir, f)
310 if not os.path.exists(pathname):
311 raise RuntimeError("Missing %s" % (f,))
313 context = {'archivename': make_submission_name(name),
314 'filelist': " ".join(files),
315 'initialdir': os.getcwd(),
316 'user': os.getlogin()}
318 condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
319 condor_stream = open(condor_script,'w')
320 condor_stream.write(script % context)
321 condor_stream.close()
325 def make_condor_upload_script(name, outdir=None):
326 script = """Universe = vanilla
328 Executable = /usr/bin/lftp
329 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
331 Error = upload.err.$(Process).log
332 Output = upload.out.$(Process).log
333 Log = /tmp/submission-upload-%(user)s.log
334 initialdir = %(initialdir)s
341 auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
343 encodeftp = 'encodeftp.cse.ucsc.edu'
344 ftpuser = auth.hosts[encodeftp][0]
345 ftppassword = auth.hosts[encodeftp][2]
346 context = {'archivename': make_submission_name(name),
347 'initialdir': outdir,
348 'user': os.getlogin(),
350 'ftppassword': ftppassword,
351 'ftphost': encodeftp}
353 condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
354 condor_stream = open(condor_script,'w')
355 condor_stream.write(script % context)
356 condor_stream.close()
357 os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
362 def make_dag_fragment(ininame, archive_condor, upload_condor):
364 Make the couple of fragments compress and then upload the data.
366 cur_dir = os.getcwd()
367 archive_condor = os.path.join(cur_dir, archive_condor)
368 upload_condor = os.path.join(cur_dir, upload_condor)
369 job_basename = make_base_name(ininame)
372 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
373 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
374 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
379 def get_library_info(host, apidata, library_id):
380 url = api.library_url(host, library_id)
381 contents = api.retrieve_info(url, apidata)
385 def make_submission_section(line_counter, files, attributes):
387 Create a section in the submission ini file
389 inifile = [ "[line%s]" % (line_counter,) ]
390 inifile += ["files=%s" % (",".join(files))]
392 for k,v in attributes.items():
393 inifile += ["%s=%s" % (k,v)]
397 def make_base_name(pathname):
398 base = os.path.basename(pathname)
399 name, ext = os.path.splitext(base)
403 def make_submission_name(ininame):
404 name = make_base_name(ininame)
408 def make_ddf_name(pathname):
409 name = make_base_name(pathname)
413 def make_condor_name(pathname, run_type=None):
414 name = make_base_name(pathname)
416 if run_type is not None:
417 elements.append(run_type)
418 elements.append("condor")
419 return ".".join(elements)
422 def parse_filelist(file_string):
423 return file_string.split(",")
426 def validate_filelist(files):
428 Die if a file doesn't exist in a file list
431 if not os.path.exists(f):
432 raise RuntimeError("%s does not exist" % (f,))
434 def make_md5sum(filename):
435 """Quickly find the md5sum of a file
437 md5_cache = os.path.join(filename+".md5")
439 if os.path.exists(md5_cache):
440 logging.debug("Found md5sum in {0}".format(md5_cache))
441 stream = open(md5_cache,'r')
442 lines = stream.readlines()
443 md5sum = parse_md5sum_line(lines, filename)
445 md5sum = make_md5sum_unix(filename, md5_cache)
448 def make_md5sum_unix(filename, md5_cache):
449 cmd = ["md5sum", filename]
450 logging.debug("Running {0}".format(" ".join(cmd)))
451 p = Popen(cmd, stdout=PIPE)
452 stdin, stdout = p.communicate()
454 logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
456 logging.error("Trouble with md5sum for {0}".format(filename))
458 lines = stdin.split(os.linesep)
459 md5sum = parse_md5sum_line(lines, filename)
460 if md5sum is not None:
461 logging.debug("Caching sum in {0}".format(md5_cache))
462 stream = open(md5_cache, "w")
467 def parse_md5sum_line(lines, filename):
468 md5sum, md5sum_filename = lines[0].split()
469 if md5sum_filename != filename:
470 errmsg = "MD5sum and I disagre about filename. {0} != {1}"
471 logging.error(errmsg.format(filename, md5sum_filename))
475 if __name__ == "__main__":