2 from configparser import SafeConfigParser
8 from optparse import OptionParser, OptionGroup
10 from pprint import pprint, pformat
12 from io import StringIO
17 import urllib.request, urllib.parse, urllib.error
18 import urllib.request, urllib.error, urllib.parse
20 from zipfile import ZipFile
24 if not 'DJANGO_SETTINGS_MODULE' in os.environ:
25 os.environ['DJANGO_SETTINGS_MODULE'] = 'htsworkflow.settings'
27 from htsworkflow.util import api
28 from htsworkflow.util.rdfhelp import \
36 from htsworkflow.submission.daf import \
38 MetadataLookupException, \
40 from htsworkflow.submission.results import ResultMap
41 from htsworkflow.submission.condorfastq import CondorFastqExtract
43 logger = logging.getLogger('ucsc_gather')
46 LFTP = '/usr/bin/lftp'
48 def main(cmdline=None):
49 parser = make_parser()
50 opts, args = parser.parse_args(cmdline)
59 logging.basicConfig(level = logging.DEBUG )
61 logging.basicConfig(level = logging.INFO )
63 logging.basicConfig(level = logging.WARNING )
65 apidata = api.make_auth_from_opts(opts, parser)
67 model = get_model(opts.model, opts.db_path)
70 mapper = UCSCSubmission(opts.name, opts.daf, model)
71 if opts.library_url is not None:
72 mapper.library_url = opts.library_url
73 submission_uri = get_submission_uri(opts.name)
76 if opts.load_rdf is not None:
77 if submission_uri is None:
78 parser.error("Please specify the submission name")
79 load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
81 if opts.make_ddf and opts.daf is None:
82 parser.error("Please specify your daf when making ddf files")
86 results.add_results_from_file(a)
88 if opts.make_tree_from is not None:
89 results.make_tree_from(opts.make_tree_from)
93 parser.error("Specify a submission model")
94 if mapper.daf is None:
95 parser.error("Please load a daf first")
96 mapper.link_daf(results)
99 flowcells = os.path.join(opts.sequence, 'flowcells')
100 extractor = CondorFastqExtract(opts.host, flowcells,
102 extractor.create_scripts(results)
104 if opts.scan_submission:
105 mapper.scan_submission_dirs(results)
108 if not os.path.exists(TAR):
109 parser.error("%s does not exist, please specify --tar" % (TAR,))
110 if not os.path.exists(LFTP):
111 parser.error("%s does not exist, please specify --lftp" % (LFTP,))
112 make_all_ddfs(mapper, results, opts.daf, force=opts.force)
115 zip_ddfs(mapper, results, opts.daf)
118 sparql_query(model, opts.sparql)
121 writer = get_serializer()
122 print(writer.serialize_model_to_string(model))
126 parser = OptionParser()
128 model = OptionGroup(parser, 'model')
129 model.add_option('--name', help="Set submission name")
130 model.add_option('--db-path', default=None,
131 help="set rdf database path")
132 model.add_option('--model', default=None,
133 help="Load model database")
134 model.add_option('--load-rdf', default=None,
135 help="load rdf statements into model")
136 model.add_option('--sparql', default=None, help="execute sparql query")
137 model.add_option('--print-rdf', action="store_true", default=False,
138 help="print ending model state")
139 model.add_option('--tar', default=TAR,
140 help="override path to tar command")
141 model.add_option('--lftp', default=LFTP,
142 help="override path to lftp command")
143 parser.add_option_group(model)
145 commands = OptionGroup(parser, 'commands')
146 commands.add_option('--make-tree-from',
147 help="create directories & link data files",
149 commands.add_option('--fastq', default=False, action="store_true",
150 help="generate scripts for making fastq files")
151 commands.add_option('--scan-submission', default=False, action="store_true",
152 help="Import metadata for submission into our model")
153 commands.add_option('--link-daf', default=False, action="store_true",
154 help="link daf into submission directories")
155 commands.add_option('--make-ddf', help='make the ddfs', default=False,
157 commands.add_option('--zip-ddf', default=False, action='store_true',
158 help='zip up just the metadata')
160 parser.add_option_group(commands)
162 parser.add_option('--force', default=False, action="store_true",
163 help="Force regenerating fastqs")
164 parser.add_option('--daf', default=None, help='specify daf name')
165 parser.add_option('--library-url', default=None,
166 help="specify an alternate source for library information")
168 parser.add_option('--verbose', default=False, action="store_true",
169 help='verbose logging')
170 parser.add_option('--debug', default=False, action="store_true",
171 help='debug logging')
173 api.add_auth_options(parser)
178 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
180 for lib_id, result_dir in list(library_result_map.items()):
181 submissionNode = view_map.get_submission_node(result_dir)
183 make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
186 if make_condor and len(dag_fragment) > 0:
187 dag_filename = 'submission.dagman'
188 if not force and os.path.exists(dag_filename):
189 logger.warn("%s exists, please delete" % (dag_filename,))
191 f = open(dag_filename,'w')
192 f.write( os.linesep.join(dag_fragment))
193 f.write( os.linesep )
197 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
199 Make ddf files, and bonus condor file
201 query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
202 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
203 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
205 select ?submitView ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
207 ?file ucscDaf:filename ?files ;
208 ucscDaf:md5sum ?md5sum .
209 ?submitView ucscDaf:has_file ?file ;
210 ucscDaf:view ?dafView ;
211 ucscDaf:submission <%(submission)s> .
212 ?dafView ucscDaf:name ?view .
213 <%(submission)s> submissionOntology:library ?library ;
215 OPTIONAL { ?library libraryOntology:antibody ?antibody }
216 OPTIONAL { ?library libraryOntology:cell_line ?cell }
217 OPTIONAL { <%(submission)s> ucscDaf:control ?control }
218 OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
219 OPTIONAL { ?library ucscDaf:sex ?sex }
220 OPTIONAL { ?library libraryOntology:library_id ?labExpId }
221 OPTIONAL { ?library libraryOntology:library_id ?labVersion }
222 OPTIONAL { ?library libraryOntology:replicate ?replicate }
223 OPTIONAL { ?library libraryOntology:condition_term ?treatment }
224 OPTIONAL { ?library ucscDaf:protocol ?protocol }
225 OPTIONAL { ?library ucscDaf:readType ?readType }
226 OPTIONAL { ?library ucscDaf:strain ?strain }
227 OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
228 OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
230 ORDER BY ?submitView"""
233 name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
235 logger.error("Need name for %s" % (str(submissionNode)))
238 ddf_name = make_ddf_name(name)
239 if outdir is not None:
240 outfile = os.path.join(outdir, ddf_name)
241 output = open(outfile,'w')
246 formatted_query = query_template % {'submission': str(submissionNode.uri)}
248 query = RDF.SPARQLQuery(formatted_query)
249 results = query.execute(view_map.model)
251 # filename goes first
252 variables = view_map.get_daf_variables()
254 output.write('\t'.join(variables))
255 output.write(os.linesep)
260 viewname = fromTypedNode(row['view'])
261 current = all_views.setdefault(viewname, {})
262 for variable_name in variables:
263 value = str(fromTypedNode(row[variable_name]))
264 if value is None or value == 'None':
265 logger.warn("{0}: {1} was None".format(outfile, variable_name))
266 if variable_name in ('files', 'md5sum'):
267 current.setdefault(variable_name,[]).append(value)
269 current[variable_name] = value
271 for view in list(all_views.keys()):
273 for variable_name in variables:
274 if variable_name in ('files', 'md5sum'):
275 line.append(','.join(all_views[view][variable_name]))
277 line.append(all_views[view][variable_name])
278 output.write("\t".join(line))
279 output.write(os.linesep)
280 all_files.extend(all_views[view]['files'])
283 "Examined {0}, found files: {1}".format(
284 str(submissionNode), ", ".join(all_files)))
286 all_files.append(daf_name)
287 all_files.append(ddf_name)
290 archive_condor = make_condor_archive_script(name, all_files, outdir)
291 upload_condor = make_condor_upload_script(name, outdir)
293 dag_fragments.extend(
294 make_dag_fragment(name, archive_condor, upload_condor)
300 def zip_ddfs(view_map, library_result_map, daf_name):
301 """zip up just the ddf & daf files
303 rootdir = os.getcwd()
304 for lib_id, result_dir in library_result_map:
305 submissionNode = view_map.get_submission_node(result_dir)
306 nameNode = view_map.model.get_target(submissionNode,
307 submissionOntology['name'])
308 name = fromTypedNode(nameNode)
310 logger.error("Need name for %s" % (str(submissionNode)))
313 zip_name = '../{0}.zip'.format(lib_id)
314 os.chdir(os.path.join(rootdir, result_dir))
315 with ZipFile(zip_name, 'w') as stream:
316 stream.write(make_ddf_name(name))
317 stream.write(daf_name)
321 def make_condor_archive_script(name, files, outdir=None):
322 script = """Universe = vanilla
325 arguments = czvhf ../%(archivename)s %(filelist)s
327 Error = compress.out.$(Process).log
328 Output = compress.out.$(Process).log
329 Log = /tmp/submission-compress-%(user)s.log
330 initialdir = %(initialdir)s
331 environment="GZIP=-3"
339 pathname = os.path.join(outdir, f)
340 if not os.path.exists(pathname):
341 raise RuntimeError("Missing %s from %s" % (f,outdir))
343 context = {'archivename': make_submission_name(name),
344 'filelist': " ".join(files),
345 'initialdir': os.path.abspath(outdir),
346 'user': os.getlogin(),
349 condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
350 condor_stream = open(condor_script,'w')
351 condor_stream.write(script % context)
352 condor_stream.close()
356 def make_condor_upload_script(name, lftp, outdir=None):
357 script = """Universe = vanilla
359 Executable = %(lftp)s
360 arguments = -c put %(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
362 Error = upload.out.$(Process).log
363 Output = upload.out.$(Process).log
364 Log = /tmp/submission-upload-%(user)s.log
365 initialdir = %(initialdir)s
372 auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
374 encodeftp = 'encodeftp.cse.ucsc.edu'
375 ftpuser = auth.hosts[encodeftp][0]
376 ftppassword = auth.hosts[encodeftp][2]
377 context = {'archivename': make_submission_name(name),
378 'initialdir': os.path.abspath(outdir),
379 'user': os.getlogin(),
381 'ftppassword': ftppassword,
382 'ftphost': encodeftp,
385 condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
386 condor_stream = open(condor_script,'w')
387 condor_stream.write(script % context)
388 condor_stream.close()
389 os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
394 def make_dag_fragment(ininame, archive_condor, upload_condor):
396 Make the couple of fragments compress and then upload the data.
398 cur_dir = os.getcwd()
399 archive_condor = os.path.join(cur_dir, archive_condor)
400 upload_condor = os.path.join(cur_dir, upload_condor)
401 job_basename = make_base_name(ininame)
404 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
405 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
406 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
411 def make_base_name(pathname):
412 base = os.path.basename(pathname)
413 name, ext = os.path.splitext(base)
417 def make_submission_name(ininame):
418 name = make_base_name(ininame)
422 def make_ddf_name(pathname):
423 name = make_base_name(pathname)
427 def make_condor_name(pathname, run_type=None):
428 name = make_base_name(pathname)
430 if run_type is not None:
431 elements.append(run_type)
432 elements.append("condor")
433 return ".".join(elements)
436 def parse_filelist(file_string):
437 return file_string.split(",")
440 def validate_filelist(files):
442 Die if a file doesn't exist in a file list
445 if not os.path.exists(f):
446 raise RuntimeError("%s does not exist" % (f,))
448 if __name__ == "__main__":