2 from ConfigParser import SafeConfigParser
8 from optparse import OptionParser, OptionGroup
10 from pprint import pprint, pformat
12 from StringIO import StringIO
20 from zipfile import ZipFile
24 from htsworkflow.util import api
25 from htsworkflow.util.rdfhelp import \
33 from htsworkflow.submission.daf import \
35 MetadataLookupException, \
37 from htsworkflow.submission.condorfastq import CondorFastqExtract
39 logger = logging.getLogger('ucsc_gather')
41 def main(cmdline=None):
42 parser = make_parser()
43 opts, args = parser.parse_args(cmdline)
47 logging.basicConfig(level = logging.DEBUG )
49 logging.basicConfig(level = logging.INFO )
51 logging.basicConfig(level = logging.WARNING )
53 apidata = api.make_auth_from_opts(opts, parser)
55 model = get_model(opts.model, opts.db_path)
57 mapper = DAFMapper(opts.name, opts.daf, model)
58 if opts.library_url is not None:
59 mapper.library_url = opts.library_url
60 submission_uri = get_submission_uri(opts.name)
63 if opts.load_rdf is not None:
64 if submission_uri is None:
65 parser.error("Please specify the submission name")
66 load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
68 if opts.make_ddf and opts.daf is None:
69 parser.error("Please specify your daf when making ddf files")
71 library_result_map = []
73 library_result_map.extend(read_library_result_map(a))
75 if opts.make_tree_from is not None:
76 make_tree_from(opts.make_tree_from, library_result_map)
80 parser.error("Please specify daf filename with --daf")
81 link_daf(opts.daf, library_result_map)
84 extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
86 extractor.create_scripts(library_result_map)
88 if opts.scan_submission:
89 scan_submission_dirs(mapper, library_result_map)
92 make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
95 zip_ddfs(mapper, library_result_map, opts.daf)
98 sparql_query(model, opts.sparql)
101 writer = get_serializer()
102 print writer.serialize_model_to_string(model)
106 parser = OptionParser()
108 model = OptionGroup(parser, 'model')
109 model.add_option('--name', help="Set submission name")
110 model.add_option('--db-path', default=None,
111 help="set rdf database path")
112 model.add_option('--model', default=None,
113 help="Load model database")
114 model.add_option('--load-rdf', default=None,
115 help="load rdf statements into model")
116 model.add_option('--sparql', default=None, help="execute sparql query")
117 model.add_option('--print-rdf', action="store_true", default=False,
118 help="print ending model state")
119 parser.add_option_group(model)
121 commands = OptionGroup(parser, 'commands')
122 commands.add_option('--make-tree-from',
123 help="create directories & link data files",
125 commands.add_option('--fastq', default=False, action="store_true",
126 help="generate scripts for making fastq files")
127 commands.add_option('--scan-submission', default=False, action="store_true",
128 help="Import metadata for submission into our model")
129 commands.add_option('--link-daf', default=False, action="store_true",
130 help="link daf into submission directories")
131 commands.add_option('--make-ddf', help='make the ddfs', default=False,
133 commands.add_option('--zip-ddf', default=False, action='store_true',
134 help='zip up just the metadata')
136 parser.add_option_group(commands)
138 parser.add_option('--force', default=False, action="store_true",
139 help="Force regenerating fastqs")
140 parser.add_option('--daf', default=None, help='specify daf name')
141 parser.add_option('--library-url', default=None,
142 help="specify an alternate source for library information")
144 parser.add_option('--verbose', default=False, action="store_true",
145 help='verbose logging')
146 parser.add_option('--debug', default=False, action="store_true",
147 help='debug logging')
149 api.add_auth_options(parser)
153 def make_tree_from(source_path, library_result_map):
154 """Create a tree using data files from source path.
156 for lib_id, lib_path in library_result_map:
157 if not os.path.exists(lib_path):
158 logger.info("Making dir {0}".format(lib_path))
160 source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
161 if os.path.exists(source_lib_dir):
163 for filename in os.listdir(source_lib_dir):
164 source_pathname = os.path.join(source_lib_dir, filename)
165 target_pathname = os.path.join(lib_path, filename)
166 if not os.path.exists(source_pathname):
167 raise IOError("{0} does not exist".format(source_pathname))
168 if not os.path.exists(target_pathname):
169 os.symlink(source_pathname, target_pathname)
171 'LINK {0} to {1}'.format(source_pathname, target_pathname))
174 def link_daf(daf_path, library_result_map):
175 if not os.path.exists(daf_path):
176 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
178 base_daf = os.path.basename(daf_path)
180 for lib_id, result_dir in library_result_map:
181 if not os.path.exists(result_dir):
182 raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
183 submission_daf = os.path.join(result_dir, base_daf)
184 if not os.path.exists(submission_daf):
185 if not os.path.exists(daf_path):
186 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
187 os.link(daf_path, submission_daf)
190 def scan_submission_dirs(view_map, library_result_map):
191 """Look through our submission directories and collect needed information
193 for lib_id, result_dir in library_result_map:
194 logger.info("Importing %s from %s" % (lib_id, result_dir))
196 view_map.import_submission_dir(result_dir, lib_id)
197 except MetadataLookupException, e:
198 logger.error("Skipping %s: %s" % (lib_id, str(e)))
201 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
203 for lib_id, result_dir in library_result_map:
204 submissionNode = view_map.get_submission_node(result_dir)
206 make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
209 if make_condor and len(dag_fragment) > 0:
210 dag_filename = 'submission.dagman'
211 if not force and os.path.exists(dag_filename):
212 logger.warn("%s exists, please delete" % (dag_filename,))
214 f = open(dag_filename,'w')
215 f.write( os.linesep.join(dag_fragment))
216 f.write( os.linesep )
220 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
222 Make ddf files, and bonus condor file
224 query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
225 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
226 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
228 select ?submitView ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
230 ?file ucscDaf:filename ?files ;
231 ucscDaf:md5sum ?md5sum .
232 ?submitView ucscDaf:has_file ?file ;
233 ucscDaf:view ?dafView ;
234 ucscDaf:submission <%(submission)s> .
235 ?dafView ucscDaf:name ?view .
236 <%(submission)s> submissionOntology:library ?library ;
238 OPTIONAL { ?library libraryOntology:antibody ?antibody }
239 OPTIONAL { ?library libraryOntology:cell_line ?cell }
240 OPTIONAL { <%(submission)s> ucscDaf:control ?control }
241 OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
242 OPTIONAL { ?library ucscDaf:sex ?sex }
243 OPTIONAL { ?library libraryOntology:library_id ?labExpId }
244 OPTIONAL { ?library libraryOntology:library_id ?labVersion }
245 OPTIONAL { ?library libraryOntology:replicate ?replicate }
246 OPTIONAL { ?library libraryOntology:condition_term ?treatment }
247 OPTIONAL { ?library ucscDaf:protocol ?protocol }
248 OPTIONAL { ?library ucscDaf:readType ?readType }
249 OPTIONAL { ?library ucscDaf:strain ?strain }
250 OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
251 OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
253 ORDER BY ?submitView"""
256 name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
258 logger.error("Need name for %s" % (str(submissionNode)))
261 ddf_name = make_ddf_name(name)
262 if outdir is not None:
263 outfile = os.path.join(outdir, ddf_name)
264 output = open(outfile,'w')
269 formatted_query = query_template % {'submission': str(submissionNode.uri)}
271 query = RDF.SPARQLQuery(formatted_query)
272 results = query.execute(view_map.model)
274 # filename goes first
275 variables = view_map.get_daf_variables()
277 output.write('\t'.join(variables))
278 output.write(os.linesep)
283 viewname = fromTypedNode(row['view'])
284 current = all_views.setdefault(viewname, {})
285 for variable_name in variables:
286 value = str(fromTypedNode(row[variable_name]))
287 if value is None or value == 'None':
288 logger.warn("{0}: {1} was None".format(outfile, variable_name))
289 if variable_name in ('files', 'md5sum'):
290 current.setdefault(variable_name,[]).append(value)
292 current[variable_name] = value
294 for view in all_views.keys():
296 for variable_name in variables:
297 if variable_name in ('files', 'md5sum'):
298 line.append(','.join(all_views[view][variable_name]))
300 line.append(all_views[view][variable_name])
301 output.write("\t".join(line))
302 output.write(os.linesep)
303 all_files.extend(all_views[view]['files'])
306 "Examined {0}, found files: {1}".format(
307 str(submissionNode), ", ".join(all_files)))
309 all_files.append(daf_name)
310 all_files.append(ddf_name)
313 archive_condor = make_condor_archive_script(name, all_files, outdir)
314 upload_condor = make_condor_upload_script(name, outdir)
316 dag_fragments.extend(
317 make_dag_fragment(name, archive_condor, upload_condor)
323 def zip_ddfs(view_map, library_result_map, daf_name):
324 """zip up just the ddf & daf files
326 rootdir = os.getcwd()
327 for lib_id, result_dir in library_result_map:
328 submissionNode = view_map.get_submission_node(result_dir)
329 nameNode = view_map.model.get_target(submissionNode,
330 submissionOntology['name'])
331 name = fromTypedNode(nameNode)
333 logger.error("Need name for %s" % (str(submissionNode)))
336 zip_name = '../{0}.zip'.format(lib_id)
337 os.chdir(os.path.join(rootdir, result_dir))
338 with ZipFile(zip_name, 'w') as stream:
339 stream.write(make_ddf_name(name))
340 stream.write(daf_name)
344 def read_library_result_map(filename):
346 Read a file that maps library id to result directory.
347 Does not support spaces in filenames.
352 stream = open(filename,'r')
357 if not line.startswith('#') and len(line) > 0 :
358 library_id, result_dir = line.split()
359 results.append((library_id, result_dir))
363 def make_condor_archive_script(name, files, outdir=None):
364 script = """Universe = vanilla
366 Executable = /bin/tar
367 arguments = czvhf ../%(archivename)s %(filelist)s
369 Error = compress.out.$(Process).log
370 Output = compress.out.$(Process).log
371 Log = /tmp/submission-compress-%(user)s.log
372 initialdir = %(initialdir)s
373 environment="GZIP=-3"
381 pathname = os.path.join(outdir, f)
382 if not os.path.exists(pathname):
383 raise RuntimeError("Missing %s from %s" % (f,outdir))
385 context = {'archivename': make_submission_name(name),
386 'filelist': " ".join(files),
387 'initialdir': os.path.abspath(outdir),
388 'user': os.getlogin()}
390 condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
391 condor_stream = open(condor_script,'w')
392 condor_stream.write(script % context)
393 condor_stream.close()
397 def make_condor_upload_script(name, outdir=None):
398 script = """Universe = vanilla
400 Executable = /usr/bin/lftp
401 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
403 Error = upload.out.$(Process).log
404 Output = upload.out.$(Process).log
405 Log = /tmp/submission-upload-%(user)s.log
406 initialdir = %(initialdir)s
413 auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
415 encodeftp = 'encodeftp.cse.ucsc.edu'
416 ftpuser = auth.hosts[encodeftp][0]
417 ftppassword = auth.hosts[encodeftp][2]
418 context = {'archivename': make_submission_name(name),
419 'initialdir': os.path.abspath(outdir),
420 'user': os.getlogin(),
422 'ftppassword': ftppassword,
423 'ftphost': encodeftp}
425 condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
426 condor_stream = open(condor_script,'w')
427 condor_stream.write(script % context)
428 condor_stream.close()
429 os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
434 def make_dag_fragment(ininame, archive_condor, upload_condor):
436 Make the couple of fragments compress and then upload the data.
438 cur_dir = os.getcwd()
439 archive_condor = os.path.join(cur_dir, archive_condor)
440 upload_condor = os.path.join(cur_dir, upload_condor)
441 job_basename = make_base_name(ininame)
444 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
445 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
446 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
451 def get_library_info(host, apidata, library_id):
452 url = api.library_url(host, library_id)
453 contents = api.retrieve_info(url, apidata)
457 def make_base_name(pathname):
458 base = os.path.basename(pathname)
459 name, ext = os.path.splitext(base)
463 def make_submission_name(ininame):
464 name = make_base_name(ininame)
468 def make_ddf_name(pathname):
469 name = make_base_name(pathname)
473 def make_condor_name(pathname, run_type=None):
474 name = make_base_name(pathname)
476 if run_type is not None:
477 elements.append(run_type)
478 elements.append("condor")
479 return ".".join(elements)
482 def parse_filelist(file_string):
483 return file_string.split(",")
486 def validate_filelist(files):
488 Die if a file doesn't exist in a file list
491 if not os.path.exists(f):
492 raise RuntimeError("%s does not exist" % (f,))
494 if __name__ == "__main__":