2 from ConfigParser import SafeConfigParser
8 from optparse import OptionParser
10 from pprint import pprint, pformat
12 from StringIO import StringIO
14 from subprocess import Popen, PIPE
22 from htsworkflow.util import api
23 from htsworkflow.util.rdfhelp import \
30 from htsworkflow.submission.daf import DAFMapper, get_submission_uri
31 from htsworkflow.submission.condorfastq import CondorFastqExtract
33 logger = logging.getLogger('ucsc_gather')
35 def main(cmdline=None):
36 parser = make_parser()
37 opts, args = parser.parse_args(cmdline)
40 logging.basicConfig(level = logging.DEBUG )
42 logging.basicConfig(level = logging.INFO )
44 logging.basicConfig(level = logging.WARNING )
46 apidata = api.make_auth_from_opts(opts, parser)
48 model = get_model(opts.load_model)
50 mapper = DAFMapper(opts.name, opts.daf, model)
51 submission_uri = get_submission_uri(opts.name)
53 if opts.load_rdf is not None:
54 load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
56 if opts.makeddf and opts.daf is None:
57 parser.error("Please specify your daf when making ddf files")
60 parser.error("I need at least one library submission-dir input file")
62 library_result_map = []
64 library_result_map.extend(read_library_result_map(a))
66 if opts.make_tree_from is not None:
67 make_tree_from(opts.make_tree_from, library_result_map)
69 #if opts.daf is not None:
70 # link_daf(opts.daf, library_result_map)
73 extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
75 extractor.build_fastqs(library_result_map)
77 if opts.scan_submission:
78 scan_submission_dirs(mapper, library_result_map)
81 make_all_ddfs(mapper, library_result_map, force=opts.force)
84 writer = get_serializer()
85 print writer.serialize_model_to_string(model)
89 parser = OptionParser()
91 parser.add_option('--name', help="Set submission name")
92 parser.add_option('--load-model', default=None,
93 help="Load model database")
94 parser.add_option('--load-rdf', default=None,
95 help="load rdf statements into model")
96 parser.add_option('--print-rdf', action="store_true", default=False,
97 help="print ending model state")
100 parser.add_option('--make-tree-from',
101 help="create directories & link data files",
103 parser.add_option('--fastq', help="generate scripts for making fastq files",
104 default=False, action="store_true")
106 parser.add_option('--scan-submission', default=False, action="store_true",
107 help="Import metadata for submission into our model")
109 parser.add_option('--makeddf', help='make the ddfs', default=False,
112 parser.add_option('--daf', default=None, help='specify daf name')
113 parser.add_option('--force', default=False, action="store_true",
114 help="Force regenerating fastqs")
117 parser.add_option('--verbose', default=False, action="store_true",
118 help='verbose logging')
119 parser.add_option('--debug', default=False, action="store_true",
120 help='debug logging')
122 api.add_auth_options(parser)
126 def make_tree_from(source_path, library_result_map):
127 """Create a tree using data files from source path.
129 for lib_id, lib_path in library_result_map:
130 if not os.path.exists(lib_path):
131 logging.info("Making dir {0}".format(lib_path))
133 source_lib_dir = os.path.join(source_path, lib_path)
134 if os.path.exists(source_lib_dir):
136 for filename in os.listdir(source_lib_dir):
137 source_pathname = os.path.join(source_lib_dir, filename)
138 target_pathname = os.path.join(lib_path, filename)
139 if not os.path.exists(source_pathname):
140 raise IOError("{0} does not exist".format(source_pathname))
141 if not os.path.exists(target_pathname):
142 os.symlink(source_pathname, target_pathname)
144 'LINK {0} to {1}'.format(source_pathname, target_pathname))
147 def link_daf(daf_path, library_result_map):
148 if not os.path.exists(daf_path):
149 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
151 base_daf = os.path.basename(daf_path)
153 for lib_id, result_dir in library_result_map:
154 if not os.path.exists(result_dir):
155 raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
156 submission_daf = os.path.join(result_dir, base_daf)
157 if not os.path.exists(submission_daf):
158 if not os.path.exists(daf_path):
159 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
160 os.link(daf_path, submission_daf)
163 def scan_submission_dirs(view_map, library_result_map):
164 """Look through our submission directories and collect needed information
166 for lib_id, result_dir in library_result_map:
167 view_map.import_submission_dir(result_dir, lib_id)
169 def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False):
171 for lib_id, result_dir in library_result_map:
172 submissionNode = view_map.get_submission_node(result_dir)
174 make_ddf(view_map, submissionNode, make_condor, result_dir)
177 if make_condor and len(dag_fragment) > 0:
178 dag_filename = 'submission.dagman'
179 if not force and os.path.exists(dag_filename):
180 logging.warn("%s exists, please delete" % (dag_filename,))
182 f = open(dag_filename,'w')
183 f.write( os.linesep.join(dag_fragment))
184 f.write( os.linesep )
188 def make_ddf(view_map, submissionNode, make_condor=False, outdir=None):
190 Make ddf files, and bonus condor file
194 if outdir is not None:
198 name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
200 logging.error("Need name for %s" % (str(submissionNode)))
203 ddf_name = name + '.ddf'
205 # output = open(ddf_name,'w')
207 # filename goes first
208 variables = ['filename']
209 variables.extend(view_map.get_daf_variables())
210 output.write('\t'.join(variables))
211 output.write(os.linesep)
213 submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
215 for viewNode in submission_views:
217 for variable_name in variables:
218 varNode = dafTermOntology[variable_name]
219 values = [fromTypedNode(v) for v in list(view_map.model.get_targets(viewNode, varNode))]
220 if variable_name == 'filename':
221 file_list.extend(values)
224 elif len(values) == 1:
225 attribute = values[0]
227 attribute = ",".join(values)
228 record.append(attribute)
229 output.write('\t'.join(record))
230 output.write(os.linesep)
233 "Examined {0}, found files: {1}".format(
234 str(submissionNode), ", ".join(file_list)))
236 #file_list.append(daf_name)
237 #if ddf_name is not None:
238 # file_list.append(ddf_name)
241 # archive_condor = make_condor_archive_script(ininame, file_list)
242 # upload_condor = make_condor_upload_script(ininame)
244 # dag_fragments.extend(
245 # make_dag_fragment(ininame, archive_condor, upload_condor)
253 def read_library_result_map(filename):
255 Read a file that maps library id to result directory.
256 Does not support spaces in filenames.
261 stream = open(filename,'r')
266 if not line.startswith('#') and len(line) > 0 :
267 library_id, result_dir = line.split()
268 results.append((library_id, result_dir))
272 def make_condor_archive_script(ininame, files):
273 script = """Universe = vanilla
275 Executable = /bin/tar
276 arguments = czvhf ../%(archivename)s %(filelist)s
278 Error = compress.err.$(Process).log
279 Output = compress.out.$(Process).log
280 Log = /tmp/submission-compress-%(user)s.log
281 initialdir = %(initialdir)s
282 environment="GZIP=-3"
288 if not os.path.exists(f):
289 raise RuntimeError("Missing %s" % (f,))
291 context = {'archivename': make_submission_name(ininame),
292 'filelist': " ".join(files),
293 'initialdir': os.getcwd(),
294 'user': os.getlogin()}
296 condor_script = make_condor_name(ininame, 'archive')
297 condor_stream = open(condor_script,'w')
298 condor_stream.write(script % context)
299 condor_stream.close()
303 def make_condor_upload_script(ininame):
304 script = """Universe = vanilla
306 Executable = /usr/bin/lftp
307 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
309 Error = upload.err.$(Process).log
310 Output = upload.out.$(Process).log
311 Log = /tmp/submission-upload-%(user)s.log
312 initialdir = %(initialdir)s
316 auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
318 encodeftp = 'encodeftp.cse.ucsc.edu'
319 ftpuser = auth.hosts[encodeftp][0]
320 ftppassword = auth.hosts[encodeftp][2]
321 context = {'archivename': make_submission_name(ininame),
322 'initialdir': os.getcwd(),
323 'user': os.getlogin(),
325 'ftppassword': ftppassword,
326 'ftphost': encodeftp}
328 condor_script = make_condor_name(ininame, 'upload')
329 condor_stream = open(condor_script,'w')
330 condor_stream.write(script % context)
331 condor_stream.close()
332 os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
337 def make_dag_fragment(ininame, archive_condor, upload_condor):
339 Make the couple of fragments compress and then upload the data.
341 cur_dir = os.getcwd()
342 archive_condor = os.path.join(cur_dir, archive_condor)
343 upload_condor = os.path.join(cur_dir, upload_condor)
344 job_basename = make_base_name(ininame)
347 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
348 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
349 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
354 def get_library_info(host, apidata, library_id):
355 url = api.library_url(host, library_id)
356 contents = api.retrieve_info(url, apidata)
360 def make_submission_section(line_counter, files, attributes):
362 Create a section in the submission ini file
364 inifile = [ "[line%s]" % (line_counter,) ]
365 inifile += ["files=%s" % (",".join(files))]
367 for k,v in attributes.items():
368 inifile += ["%s=%s" % (k,v)]
372 def make_base_name(pathname):
373 base = os.path.basename(pathname)
374 name, ext = os.path.splitext(base)
378 def make_submission_name(ininame):
379 name = make_base_name(ininame)
383 def make_ddf_name(pathname):
384 name = make_base_name(pathname)
388 def make_condor_name(pathname, run_type=None):
389 name = make_base_name(pathname)
391 if run_type is not None:
392 elements.append(run_type)
393 elements.append("condor")
394 return ".".join(elements)
397 def parse_filelist(file_string):
398 return file_string.split(",")
401 def validate_filelist(files):
403 Die if a file doesn't exist in a file list
406 if not os.path.exists(f):
407 raise RuntimeError("%s does not exist" % (f,))
409 def make_md5sum(filename):
410 """Quickly find the md5sum of a file
412 md5_cache = os.path.join(filename+".md5")
414 if os.path.exists(md5_cache):
415 logging.debug("Found md5sum in {0}".format(md5_cache))
416 stream = open(md5_cache,'r')
417 lines = stream.readlines()
418 md5sum = parse_md5sum_line(lines, filename)
420 md5sum = make_md5sum_unix(filename, md5_cache)
423 def make_md5sum_unix(filename, md5_cache):
424 cmd = ["md5sum", filename]
425 logging.debug("Running {0}".format(" ".join(cmd)))
426 p = Popen(cmd, stdout=PIPE)
427 stdin, stdout = p.communicate()
429 logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
431 logging.error("Trouble with md5sum for {0}".format(filename))
433 lines = stdin.split(os.linesep)
434 md5sum = parse_md5sum_line(lines, filename)
435 if md5sum is not None:
436 logging.debug("Caching sum in {0}".format(md5_cache))
437 stream = open(md5_cache, "w")
442 def parse_md5sum_line(lines, filename):
443 md5sum, md5sum_filename = lines[0].split()
444 if md5sum_filename != filename:
445 errmsg = "MD5sum and I disagre about filename. {0} != {1}"
446 logging.error(errmsg.format(filename, md5sum_filename))
450 if __name__ == "__main__":