2 from ConfigParser import SafeConfigParser
8 from optparse import OptionParser, OptionGroup
10 from pprint import pprint, pformat
12 from StringIO import StringIO
14 from subprocess import Popen, PIPE
22 from htsworkflow.util import api
23 from htsworkflow.util.rdfhelp import \
30 from htsworkflow.submission.daf import \
32 MetadataLookupException, \
34 from htsworkflow.submission.condorfastq import CondorFastqExtract
36 logger = logging.getLogger('ucsc_gather')
38 def main(cmdline=None):
39 parser = make_parser()
40 opts, args = parser.parse_args(cmdline)
43 logging.basicConfig(level = logging.DEBUG )
45 logging.basicConfig(level = logging.INFO )
47 logging.basicConfig(level = logging.WARNING )
49 apidata = api.make_auth_from_opts(opts, parser)
51 model = get_model(opts.load_model)
53 mapper = DAFMapper(opts.name, opts.daf, model)
54 submission_uri = get_submission_uri(opts.name)
56 if opts.library_url is not None:
57 mapper.library_url = opts.library_url
59 if opts.load_rdf is not None:
60 load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
62 if opts.make_ddf and opts.daf is None:
63 parser.error("Please specify your daf when making ddf files")
66 parser.error("I need at least one library submission-dir input file")
68 library_result_map = []
70 library_result_map.extend(read_library_result_map(a))
72 if opts.make_tree_from is not None:
73 make_tree_from(opts.make_tree_from, library_result_map)
76 link_daf(opts.daf, library_result_map)
79 extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
81 extractor.build_fastqs(library_result_map)
83 if opts.scan_submission:
84 scan_submission_dirs(mapper, library_result_map)
87 make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
90 writer = get_serializer()
91 print writer.serialize_model_to_string(model)
95 parser = OptionParser()
97 model = OptionGroup(parser, 'model')
98 model.add_option('--name', help="Set submission name")
99 model.add_option('--load-model', default=None,
100 help="Load model database")
101 model.add_option('--load-rdf', default=None,
102 help="load rdf statements into model")
103 model.add_option('--print-rdf', action="store_true", default=False,
104 help="print ending model state")
105 parser.add_option_group(model)
107 commands = OptionGroup(parser, 'commands')
108 commands.add_option('--make-tree-from',
109 help="create directories & link data files",
111 commands.add_option('--fastq', default=False, action="store_true",
112 help="generate scripts for making fastq files")
113 commands.add_option('--scan-submission', default=False, action="store_true",
114 help="Import metadata for submission into our model")
115 commands.add_option('--link-daf', default=False, action="store_true",
116 help="link daf into submission directories")
117 commands.add_option('--make-ddf', help='make the ddfs', default=False,
119 parser.add_option_group(commands)
121 parser.add_option('--force', default=False, action="store_true",
122 help="Force regenerating fastqs")
123 parser.add_option('--daf', default=None, help='specify daf name')
124 parser.add_option('--library-url', default=None,
125 help="specify an alternate source for library information")
127 parser.add_option('--verbose', default=False, action="store_true",
128 help='verbose logging')
129 parser.add_option('--debug', default=False, action="store_true",
130 help='debug logging')
132 api.add_auth_options(parser)
136 def make_tree_from(source_path, library_result_map):
137 """Create a tree using data files from source path.
139 for lib_id, lib_path in library_result_map:
140 if not os.path.exists(lib_path):
141 logging.info("Making dir {0}".format(lib_path))
143 source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
144 if os.path.exists(source_lib_dir):
146 for filename in os.listdir(source_lib_dir):
147 source_pathname = os.path.join(source_lib_dir, filename)
148 target_pathname = os.path.join(lib_path, filename)
149 if not os.path.exists(source_pathname):
150 raise IOError("{0} does not exist".format(source_pathname))
151 if not os.path.exists(target_pathname):
152 os.symlink(source_pathname, target_pathname)
154 'LINK {0} to {1}'.format(source_pathname, target_pathname))
157 def link_daf(daf_path, library_result_map):
158 if not os.path.exists(daf_path):
159 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
161 base_daf = os.path.basename(daf_path)
163 for lib_id, result_dir in library_result_map:
164 if not os.path.exists(result_dir):
165 raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
166 submission_daf = os.path.join(result_dir, base_daf)
167 if not os.path.exists(submission_daf):
168 if not os.path.exists(daf_path):
169 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
170 os.link(daf_path, submission_daf)
173 def scan_submission_dirs(view_map, library_result_map):
174 """Look through our submission directories and collect needed information
176 for lib_id, result_dir in library_result_map:
177 logging.info("Importing %s from %s" % (lib_id, result_dir))
179 view_map.import_submission_dir(result_dir, lib_id)
180 except MetadataLookupException, e:
181 logging.error("Skipping %s: %s" % (lib_id, str(e)))
183 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
185 for lib_id, result_dir in library_result_map:
186 submissionNode = view_map.get_submission_node(result_dir)
188 make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
191 if make_condor and len(dag_fragment) > 0:
192 dag_filename = 'submission.dagman'
193 if not force and os.path.exists(dag_filename):
194 logging.warn("%s exists, please delete" % (dag_filename,))
196 f = open(dag_filename,'w')
197 f.write( os.linesep.join(dag_fragment))
198 f.write( os.linesep )
202 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
204 Make ddf files, and bonus condor file
208 name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
210 logging.error("Need name for %s" % (str(submissionNode)))
213 ddf_name = name + '.ddf'
214 if outdir is not None:
215 outfile = os.path.join(outdir, ddf_name)
216 output = open(outfile,'w')
220 # filename goes first
221 variables = ['filename']
222 variables.extend(view_map.get_daf_variables())
223 output.write('\t'.join(variables))
224 output.write(os.linesep)
226 nameTerm = dafTermOntology['name']
228 submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
230 for viewNode in submission_views:
232 for variable_name in variables:
233 varNode = dafTermOntology[variable_name]
234 values = list(view_map.model.get_targets(viewNode, varNode))
236 if variable_name == 'view':
237 nameNode = view_map.model.get_target(values[0], nameTerm)
238 values = [fromTypedNode(nameNode)]
240 values = [ fromTypedNode(v) for v in values ]
241 if variable_name == 'filename':
242 file_list.extend(values)
246 elif len(values) == 1:
247 attribute = values[0]
249 attribute = ",".join(values)
250 record.append(attribute)
251 output.write('\t'.join(record))
252 output.write(os.linesep)
255 "Examined {0}, found files: {1}".format(
256 str(submissionNode), ", ".join(file_list)))
258 file_list.append(daf_name)
259 file_list.append(ddf_name)
262 print name, file_list
263 archive_condor = make_condor_archive_script(name, file_list, outdir)
264 upload_condor = make_condor_upload_script(name, outdir)
266 dag_fragments.extend(
267 make_dag_fragment(name, archive_condor, upload_condor)
273 def read_library_result_map(filename):
275 Read a file that maps library id to result directory.
276 Does not support spaces in filenames.
281 stream = open(filename,'r')
286 if not line.startswith('#') and len(line) > 0 :
287 library_id, result_dir = line.split()
288 results.append((library_id, result_dir))
292 def make_condor_archive_script(name, files, outdir=None):
293 script = """Universe = vanilla
295 Executable = /bin/tar
296 arguments = czvhf ../%(archivename)s %(filelist)s
298 Error = compress.err.$(Process).log
299 Output = compress.out.$(Process).log
300 Log = /tmp/submission-compress-%(user)s.log
301 initialdir = %(initialdir)s
302 environment="GZIP=-3"
310 pathname = os.path.join(outdir, f)
311 if not os.path.exists(pathname):
312 raise RuntimeError("Missing %s" % (f,))
314 context = {'archivename': make_submission_name(name),
315 'filelist': " ".join(files),
316 'initialdir': os.getcwd(),
317 'user': os.getlogin()}
319 condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
320 condor_stream = open(condor_script,'w')
321 condor_stream.write(script % context)
322 condor_stream.close()
326 def make_condor_upload_script(name, outdir=None):
327 script = """Universe = vanilla
329 Executable = /usr/bin/lftp
330 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
332 Error = upload.err.$(Process).log
333 Output = upload.out.$(Process).log
334 Log = /tmp/submission-upload-%(user)s.log
335 initialdir = %(initialdir)s
342 auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
344 encodeftp = 'encodeftp.cse.ucsc.edu'
345 ftpuser = auth.hosts[encodeftp][0]
346 ftppassword = auth.hosts[encodeftp][2]
347 context = {'archivename': make_submission_name(name),
348 'initialdir': outdir,
349 'user': os.getlogin(),
351 'ftppassword': ftppassword,
352 'ftphost': encodeftp}
354 condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
355 condor_stream = open(condor_script,'w')
356 condor_stream.write(script % context)
357 condor_stream.close()
358 os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
363 def make_dag_fragment(ininame, archive_condor, upload_condor):
365 Make the couple of fragments compress and then upload the data.
367 cur_dir = os.getcwd()
368 archive_condor = os.path.join(cur_dir, archive_condor)
369 upload_condor = os.path.join(cur_dir, upload_condor)
370 job_basename = make_base_name(ininame)
373 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
374 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
375 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
380 def get_library_info(host, apidata, library_id):
381 url = api.library_url(host, library_id)
382 contents = api.retrieve_info(url, apidata)
386 def make_submission_section(line_counter, files, attributes):
388 Create a section in the submission ini file
390 inifile = [ "[line%s]" % (line_counter,) ]
391 inifile += ["files=%s" % (",".join(files))]
393 for k,v in attributes.items():
394 inifile += ["%s=%s" % (k,v)]
398 def make_base_name(pathname):
399 base = os.path.basename(pathname)
400 name, ext = os.path.splitext(base)
404 def make_submission_name(ininame):
405 name = make_base_name(ininame)
409 def make_ddf_name(pathname):
410 name = make_base_name(pathname)
414 def make_condor_name(pathname, run_type=None):
415 name = make_base_name(pathname)
417 if run_type is not None:
418 elements.append(run_type)
419 elements.append("condor")
420 return ".".join(elements)
423 def parse_filelist(file_string):
424 return file_string.split(",")
427 def validate_filelist(files):
429 Die if a file doesn't exist in a file list
432 if not os.path.exists(f):
433 raise RuntimeError("%s does not exist" % (f,))
435 def make_md5sum(filename):
436 """Quickly find the md5sum of a file
438 md5_cache = os.path.join(filename+".md5")
440 if os.path.exists(md5_cache):
441 logging.debug("Found md5sum in {0}".format(md5_cache))
442 stream = open(md5_cache,'r')
443 lines = stream.readlines()
444 md5sum = parse_md5sum_line(lines, filename)
446 md5sum = make_md5sum_unix(filename, md5_cache)
449 def make_md5sum_unix(filename, md5_cache):
450 cmd = ["md5sum", filename]
451 logging.debug("Running {0}".format(" ".join(cmd)))
452 p = Popen(cmd, stdout=PIPE)
453 stdin, stdout = p.communicate()
455 logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
457 logging.error("Trouble with md5sum for {0}".format(filename))
459 lines = stdin.split(os.linesep)
460 md5sum = parse_md5sum_line(lines, filename)
461 if md5sum is not None:
462 logging.debug("Caching sum in {0}".format(md5_cache))
463 stream = open(md5_cache, "w")
468 def parse_md5sum_line(lines, filename):
469 md5sum, md5sum_filename = lines[0].split()
470 if md5sum_filename != filename:
471 errmsg = "MD5sum and I disagre about filename. {0} != {1}"
472 logging.error(errmsg.format(filename, md5sum_filename))
476 if __name__ == "__main__":