2 from ConfigParser import SafeConfigParser
8 from optparse import OptionParser
10 from pprint import pprint, pformat
12 from StringIO import StringIO
14 from subprocess import Popen, PIPE
22 from htsworkflow.util import api
23 from htsworkflow.util.rdfhelp import \
30 from htsworkflow.submission.daf import DAFMapper, get_submission_uri
31 from htsworkflow.submission.condorfastq import CondorFastqExtract
33 logger = logging.getLogger('ucsc_gather')
35 def main(cmdline=None):
36 parser = make_parser()
37 opts, args = parser.parse_args(cmdline)
40 logging.basicConfig(level = logging.DEBUG )
42 logging.basicConfig(level = logging.INFO )
44 logging.basicConfig(level = logging.WARNING )
46 apidata = api.make_auth_from_opts(opts, parser)
48 model = get_model(opts.load_model)
49 mapper = DAFMapper(opts.name, opts.daf, model)
50 submission_uri = get_submission_uri(opts.name)
51 if opts.load_rdf is not None:
52 load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
54 if opts.makeddf and opts.daf is None:
55 parser.error("Please specify your daf when making ddf files")
58 parser.error("I need at least one library submission-dir input file")
60 library_result_map = []
62 library_result_map.extend(read_library_result_map(a))
64 if opts.make_tree_from is not None:
65 make_tree_from(opts.make_tree_from, library_result_map)
67 #if opts.daf is not None:
68 # link_daf(opts.daf, library_result_map)
71 extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
73 extractor.build_fastqs(library_result_map)
75 if opts.scan_submission:
76 scan_submission_dirs(mapper, library_result_map)
79 make_all_ddfs(mapper, library_result_map, force=opts.force)
82 writer = get_serializer()
83 print writer.serialize_model_to_string(model)
87 parser = OptionParser()
89 parser.add_option('--name', help="Set submission name")
90 parser.add_option('--load-model', default=None,
91 help="Load model database")
92 parser.add_option('--load-rdf', default=None,
93 help="load rdf statements into model")
94 parser.add_option('--print-rdf', action="store_true", default=False,
95 help="print ending model state")
98 parser.add_option('--make-tree-from',
99 help="create directories & link data files",
101 parser.add_option('--fastq', help="generate scripts for making fastq files",
102 default=False, action="store_true")
104 parser.add_option('--scan-submission', default=False, action="store_true",
105 help="Import metadata for submission into our model")
107 parser.add_option('--makeddf', help='make the ddfs', default=False,
110 parser.add_option('--daf', default=None, help='specify daf name')
111 parser.add_option('--force', default=False, action="store_true",
112 help="Force regenerating fastqs")
115 parser.add_option('--verbose', default=False, action="store_true",
116 help='verbose logging')
117 parser.add_option('--debug', default=False, action="store_true",
118 help='debug logging')
120 api.add_auth_options(parser)
124 def make_tree_from(source_path, library_result_map):
125 """Create a tree using data files from source path.
127 for lib_id, lib_path in library_result_map:
128 if not os.path.exists(lib_path):
129 logging.info("Making dir {0}".format(lib_path))
131 source_lib_dir = os.path.join(source_path, lib_path)
132 if os.path.exists(source_lib_dir):
134 for filename in os.listdir(source_lib_dir):
135 source_pathname = os.path.join(source_lib_dir, filename)
136 target_pathname = os.path.join(lib_path, filename)
137 if not os.path.exists(source_pathname):
138 raise IOError("{0} does not exist".format(source_pathname))
139 if not os.path.exists(target_pathname):
140 os.symlink(source_pathname, target_pathname)
142 'LINK {0} to {1}'.format(source_pathname, target_pathname))
145 def link_daf(daf_path, library_result_map):
146 if not os.path.exists(daf_path):
147 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
149 base_daf = os.path.basename(daf_path)
151 for lib_id, result_dir in library_result_map:
152 if not os.path.exists(result_dir):
153 raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
154 submission_daf = os.path.join(result_dir, base_daf)
155 if not os.path.exists(submission_daf):
156 if not os.path.exists(daf_path):
157 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
158 os.link(daf_path, submission_daf)
161 def scan_submission_dirs(view_map, library_result_map):
162 """Look through our submission directories and collect needed information
164 for lib_id, result_dir in library_result_map:
165 view_map.import_submission_dir(result_dir, lib_id)
167 def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False):
169 for lib_id, result_dir in library_result_map:
170 submissionNode = view_map.get_submission_node(result_dir)
172 make_ddf(view_map, submissionNode, make_condor, result_dir)
175 if make_condor and len(dag_fragment) > 0:
176 dag_filename = 'submission.dagman'
177 if not force and os.path.exists(dag_filename):
178 logging.warn("%s exists, please delete" % (dag_filename,))
180 f = open(dag_filename,'w')
181 f.write( os.linesep.join(dag_fragment))
182 f.write( os.linesep )
186 def make_ddf(view_map, submissionNode, make_condor=False, outdir=None):
188 Make ddf files, and bonus condor file
192 if outdir is not None:
196 name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
198 logging.error("Need name for %s" % (str(submissionNode)))
201 ddf_name = name + '.ddf'
203 # output = open(ddf_name,'w')
205 # filename goes first
206 variables = ['filename']
207 variables.extend(view_map.get_daf_variables())
208 output.write('\t'.join(variables))
209 output.write(os.linesep)
211 submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
213 for viewNode in submission_views:
215 for variable_name in variables:
216 varNode = dafTermOntology[variable_name]
217 values = [fromTypedNode(v) for v in list(view_map.model.get_targets(viewNode, varNode))]
218 if variable_name == 'filename':
219 file_list.extend(values)
222 elif len(values) == 1:
223 attribute = values[0]
225 attribute = ",".join(values)
226 record.append(attribute)
227 output.write('\t'.join(record))
228 output.write(os.linesep)
231 "Examined {0}, found files: {1}".format(
232 str(submissionNode), ", ".join(file_list)))
234 #file_list.append(daf_name)
235 #if ddf_name is not None:
236 # file_list.append(ddf_name)
239 # archive_condor = make_condor_archive_script(ininame, file_list)
240 # upload_condor = make_condor_upload_script(ininame)
242 # dag_fragments.extend(
243 # make_dag_fragment(ininame, archive_condor, upload_condor)
251 def read_library_result_map(filename):
253 Read a file that maps library id to result directory.
254 Does not support spaces in filenames.
259 stream = open(filename,'r')
264 if not line.startswith('#') and len(line) > 0 :
265 library_id, result_dir = line.split()
266 results.append((library_id, result_dir))
270 def make_condor_archive_script(ininame, files):
271 script = """Universe = vanilla
273 Executable = /bin/tar
274 arguments = czvhf ../%(archivename)s %(filelist)s
276 Error = compress.err.$(Process).log
277 Output = compress.out.$(Process).log
278 Log = /tmp/submission-compress-%(user)s.log
279 initialdir = %(initialdir)s
280 environment="GZIP=-3"
286 if not os.path.exists(f):
287 raise RuntimeError("Missing %s" % (f,))
289 context = {'archivename': make_submission_name(ininame),
290 'filelist': " ".join(files),
291 'initialdir': os.getcwd(),
292 'user': os.getlogin()}
294 condor_script = make_condor_name(ininame, 'archive')
295 condor_stream = open(condor_script,'w')
296 condor_stream.write(script % context)
297 condor_stream.close()
301 def make_condor_upload_script(ininame):
302 script = """Universe = vanilla
304 Executable = /usr/bin/lftp
305 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
307 Error = upload.err.$(Process).log
308 Output = upload.out.$(Process).log
309 Log = /tmp/submission-upload-%(user)s.log
310 initialdir = %(initialdir)s
314 auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
316 encodeftp = 'encodeftp.cse.ucsc.edu'
317 ftpuser = auth.hosts[encodeftp][0]
318 ftppassword = auth.hosts[encodeftp][2]
319 context = {'archivename': make_submission_name(ininame),
320 'initialdir': os.getcwd(),
321 'user': os.getlogin(),
323 'ftppassword': ftppassword,
324 'ftphost': encodeftp}
326 condor_script = make_condor_name(ininame, 'upload')
327 condor_stream = open(condor_script,'w')
328 condor_stream.write(script % context)
329 condor_stream.close()
330 os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
335 def make_dag_fragment(ininame, archive_condor, upload_condor):
337 Make the couple of fragments compress and then upload the data.
339 cur_dir = os.getcwd()
340 archive_condor = os.path.join(cur_dir, archive_condor)
341 upload_condor = os.path.join(cur_dir, upload_condor)
342 job_basename = make_base_name(ininame)
345 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
346 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
347 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
352 def get_library_info(host, apidata, library_id):
353 url = api.library_url(host, library_id)
354 contents = api.retrieve_info(url, apidata)
358 def make_submission_section(line_counter, files, attributes):
360 Create a section in the submission ini file
362 inifile = [ "[line%s]" % (line_counter,) ]
363 inifile += ["files=%s" % (",".join(files))]
365 for k,v in attributes.items():
366 inifile += ["%s=%s" % (k,v)]
370 def make_base_name(pathname):
371 base = os.path.basename(pathname)
372 name, ext = os.path.splitext(base)
376 def make_submission_name(ininame):
377 name = make_base_name(ininame)
381 def make_ddf_name(pathname):
382 name = make_base_name(pathname)
386 def make_condor_name(pathname, run_type=None):
387 name = make_base_name(pathname)
389 if run_type is not None:
390 elements.append(run_type)
391 elements.append("condor")
392 return ".".join(elements)
395 def parse_filelist(file_string):
396 return file_string.split(",")
399 def validate_filelist(files):
401 Die if a file doesn't exist in a file list
404 if not os.path.exists(f):
405 raise RuntimeError("%s does not exist" % (f,))
407 def make_md5sum(filename):
408 """Quickly find the md5sum of a file
410 md5_cache = os.path.join(filename+".md5")
412 if os.path.exists(md5_cache):
413 logging.debug("Found md5sum in {0}".format(md5_cache))
414 stream = open(md5_cache,'r')
415 lines = stream.readlines()
416 md5sum = parse_md5sum_line(lines, filename)
418 md5sum = make_md5sum_unix(filename, md5_cache)
421 def make_md5sum_unix(filename, md5_cache):
422 cmd = ["md5sum", filename]
423 logging.debug("Running {0}".format(" ".join(cmd)))
424 p = Popen(cmd, stdout=PIPE)
425 stdin, stdout = p.communicate()
427 logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
429 logging.error("Trouble with md5sum for {0}".format(filename))
431 lines = stdin.split(os.linesep)
432 md5sum = parse_md5sum_line(lines, filename)
433 if md5sum is not None:
434 logging.debug("Caching sum in {0}".format(md5_cache))
435 stream = open(md5_cache, "w")
440 def parse_md5sum_line(lines, filename):
441 md5sum, md5sum_filename = lines[0].split()
442 if md5sum_filename != filename:
443 errmsg = "MD5sum and I disagre about filename. {0} != {1}"
444 logging.error(errmsg.format(filename, md5sum_filename))
448 if __name__ == "__main__":