1cdf0118f6f990943adec84ab90492be44f360fe
[htsworkflow.git] / encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 import sys
15 import time
16 import types
17 import urllib
18 import urllib2
19 import urlparse
20 from zipfile import ZipFile
21
22 import RDF
23
24 from htsworkflow.util import api
25 from htsworkflow.util.rdfhelp import \
26      dafTermOntology, \
27      fromTypedNode, \
28      get_model, \
29      get_serializer, \
30      load_into_model, \
31      sparql_query, \
32      submissionOntology
33 from htsworkflow.submission.daf import \
34      UCSCSubmission, \
35      MetadataLookupException, \
36      get_submission_uri
37 from htsworkflow.submission.results import ResultMap
38 from htsworkflow.submission.condorfastq import CondorFastqExtract
39
40 logger = logging.getLogger('ucsc_gather')
41
42 def main(cmdline=None):
43     parser = make_parser()
44     opts, args = parser.parse_args(cmdline)
45     submission_uri = None
46
47     if opts.debug:
48         logging.basicConfig(level = logging.DEBUG )
49     elif opts.verbose:
50         logging.basicConfig(level = logging.INFO )
51     else:
52         logging.basicConfig(level = logging.WARNING )
53
54     apidata = api.make_auth_from_opts(opts, parser)
55
56     model = get_model(opts.model, opts.db_path)
57     mapper = None
58     if opts.name:
59         mapper = UCSCSubmssion(opts.name, opts.daf,  model)
60         if opts.library_url is not None:
61             mapper.library_url = opts.library_url
62         submission_uri = get_submission_uri(opts.name)
63
64
65     if opts.load_rdf is not None:
66         if submission_uri is None:
67             parser.error("Please specify the submission name")
68         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
69
70     if opts.make_ddf and opts.daf is None:
71         parser.error("Please specify your daf when making ddf files")
72
73     results = ResultMap()
74     for a in args:
75         results.add_results_from_file(a)
76
77     if opts.make_tree_from is not None:
78         results.make_tree_from(opts.make_tree_from)
79
80     if opts.link_daf:
81         if mapper is None:
82             parser.error("Specify a submission model")
83         if mapper.daf is None:
84             parser.error("Please load a daf first")
85         mapper.link_daf(results)
86
87     if opts.fastq:
88         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
89                                        force=opts.force)
90         extractor.create_scripts(results)
91
92     if opts.scan_submission:
93         mapper.scan_submission_dirs(results)
94
95     if opts.make_ddf:
96         make_all_ddfs(mapper, results, opts.daf, force=opts.force)
97
98     if opts.zip_ddf:
99         zip_ddfs(mapper, results, opts.daf)
100
101     if opts.sparql:
102         sparql_query(model, opts.sparql)
103
104     if opts.print_rdf:
105         writer = get_serializer()
106         print writer.serialize_model_to_string(model)
107
108
109 def make_parser():
110     parser = OptionParser()
111
112     model = OptionGroup(parser, 'model')
113     model.add_option('--name', help="Set submission name")
114     model.add_option('--db-path', default=None,
115                      help="set rdf database path")
116     model.add_option('--model', default=None,
117       help="Load model database")
118     model.add_option('--load-rdf', default=None,
119       help="load rdf statements into model")
120     model.add_option('--sparql', default=None, help="execute sparql query")
121     model.add_option('--print-rdf', action="store_true", default=False,
122       help="print ending model state")
123     parser.add_option_group(model)
124     # commands
125     commands = OptionGroup(parser, 'commands')
126     commands.add_option('--make-tree-from',
127                       help="create directories & link data files",
128                       default=None)
129     commands.add_option('--fastq', default=False, action="store_true",
130                         help="generate scripts for making fastq files")
131     commands.add_option('--scan-submission', default=False, action="store_true",
132                       help="Import metadata for submission into our model")
133     commands.add_option('--link-daf', default=False, action="store_true",
134                         help="link daf into submission directories")
135     commands.add_option('--make-ddf', help='make the ddfs', default=False,
136                       action="store_true")
137     commands.add_option('--zip-ddf', default=False, action='store_true',
138                         help='zip up just the metadata')
139
140     parser.add_option_group(commands)
141
142     parser.add_option('--force', default=False, action="store_true",
143                       help="Force regenerating fastqs")
144     parser.add_option('--daf', default=None, help='specify daf name')
145     parser.add_option('--library-url', default=None,
146                       help="specify an alternate source for library information")
147     # debugging
148     parser.add_option('--verbose', default=False, action="store_true",
149                       help='verbose logging')
150     parser.add_option('--debug', default=False, action="store_true",
151                       help='debug logging')
152
153     api.add_auth_options(parser)
154
155     return parser
156
157
158 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
159     dag_fragment = []
160     for lib_id, result_dir in library_result_map:
161         submissionNode = view_map.get_submission_node(result_dir)
162         dag_fragment.extend(
163             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
164         )
165
166     if make_condor and len(dag_fragment) > 0:
167         dag_filename = 'submission.dagman'
168         if not force and os.path.exists(dag_filename):
169             logger.warn("%s exists, please delete" % (dag_filename,))
170         else:
171             f = open(dag_filename,'w')
172             f.write( os.linesep.join(dag_fragment))
173             f.write( os.linesep )
174             f.close()
175
176
177 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
178     """
179     Make ddf files, and bonus condor file
180     """
181     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
182 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
183 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
184
185 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
186 WHERE {
187   ?file ucscDaf:filename ?files ;
188         ucscDaf:md5sum ?md5sum .
189   ?submitView ucscDaf:has_file ?file ;
190               ucscDaf:view ?dafView ;
191               ucscDaf:submission <%(submission)s> .
192   ?dafView ucscDaf:name ?view .
193   <%(submission)s> submissionOntology:library ?library ;
194
195   OPTIONAL { ?library libraryOntology:antibody ?antibody }
196   OPTIONAL { ?library libraryOntology:cell_line ?cell }
197   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
198   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
199   OPTIONAL { ?library ucscDaf:sex ?sex }
200   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
201   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
202   OPTIONAL { ?library libraryOntology:replicate ?replicate }
203   OPTIONAL { ?library libraryOntology:condition_term ?treatment }
204   OPTIONAL { ?library ucscDaf:protocol ?protocol }
205   OPTIONAL { ?library ucscDaf:readType ?readType }
206   OPTIONAL { ?library ucscDaf:strain ?strain }
207   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
208   OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
209 }
210 ORDER BY  ?submitView"""
211     dag_fragments = []
212
213     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
214     if name is None:
215         logger.error("Need name for %s" % (str(submissionNode)))
216         return []
217
218     ddf_name = make_ddf_name(name)
219     if outdir is not None:
220         outfile = os.path.join(outdir, ddf_name)
221         output = open(outfile,'w')
222     else:
223         outfile = 'stdout:'
224         output = sys.stdout
225
226     formatted_query = query_template % {'submission': str(submissionNode.uri)}
227
228     query = RDF.SPARQLQuery(formatted_query)
229     results = query.execute(view_map.model)
230
231     # filename goes first
232     variables = view_map.get_daf_variables()
233     # 'controlId',
234     output.write('\t'.join(variables))
235     output.write(os.linesep)
236
237     all_views = {}
238     all_files = []
239     for row in results:
240         viewname = fromTypedNode(row['view'])
241         current = all_views.setdefault(viewname, {})
242         for variable_name in variables:
243             value = str(fromTypedNode(row[variable_name]))
244             if value is None or value == 'None':
245                 logger.warn("{0}: {1} was None".format(outfile, variable_name))
246             if variable_name in ('files', 'md5sum'):
247                 current.setdefault(variable_name,[]).append(value)
248             else:
249                 current[variable_name] = value
250
251     for view in all_views.keys():
252         line = []
253         for variable_name in variables:
254             if variable_name in ('files', 'md5sum'):
255                 line.append(','.join(all_views[view][variable_name]))
256             else:
257                 line.append(all_views[view][variable_name])
258         output.write("\t".join(line))
259         output.write(os.linesep)
260         all_files.extend(all_views[view]['files'])
261
262     logger.info(
263         "Examined {0}, found files: {1}".format(
264             str(submissionNode), ", ".join(all_files)))
265
266     all_files.append(daf_name)
267     all_files.append(ddf_name)
268
269     if make_condor:
270         archive_condor = make_condor_archive_script(name, all_files, outdir)
271         upload_condor = make_condor_upload_script(name, outdir)
272
273         dag_fragments.extend(
274             make_dag_fragment(name, archive_condor, upload_condor)
275         )
276
277     return dag_fragments
278
279
280 def zip_ddfs(view_map, library_result_map, daf_name):
281     """zip up just the ddf & daf files
282     """
283     rootdir = os.getcwd()
284     for lib_id, result_dir in library_result_map:
285         submissionNode = view_map.get_submission_node(result_dir)
286         nameNode = view_map.model.get_target(submissionNode,
287                                              submissionOntology['name'])
288         name = fromTypedNode(nameNode)
289         if name is None:
290             logger.error("Need name for %s" % (str(submissionNode)))
291             continue
292
293         zip_name = '../{0}.zip'.format(lib_id)
294         os.chdir(os.path.join(rootdir, result_dir))
295         with ZipFile(zip_name, 'w') as stream:
296             stream.write(make_ddf_name(name))
297             stream.write(daf_name)
298         os.chdir(rootdir)
299
300
301 def make_condor_archive_script(name, files, outdir=None):
302     script = """Universe = vanilla
303
304 Executable = /bin/tar
305 arguments = czvhf ../%(archivename)s %(filelist)s
306
307 Error = compress.out.$(Process).log
308 Output = compress.out.$(Process).log
309 Log = /tmp/submission-compress-%(user)s.log
310 initialdir = %(initialdir)s
311 environment="GZIP=-3"
312 request_memory = 20
313
314 queue
315 """
316     if outdir is None:
317         outdir = os.getcwd()
318     for f in files:
319         pathname = os.path.join(outdir, f)
320         if not os.path.exists(pathname):
321             raise RuntimeError("Missing %s from %s" % (f,outdir))
322
323     context = {'archivename': make_submission_name(name),
324                'filelist': " ".join(files),
325                'initialdir': os.path.abspath(outdir),
326                'user': os.getlogin()}
327
328     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
329     condor_stream = open(condor_script,'w')
330     condor_stream.write(script % context)
331     condor_stream.close()
332     return condor_script
333
334
335 def make_condor_upload_script(name, outdir=None):
336     script = """Universe = vanilla
337
338 Executable = /usr/bin/lftp
339 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
340
341 Error = upload.out.$(Process).log
342 Output = upload.out.$(Process).log
343 Log = /tmp/submission-upload-%(user)s.log
344 initialdir = %(initialdir)s
345
346 queue
347 """
348     if outdir is None:
349         outdir = os.getcwd()
350
351     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
352
353     encodeftp = 'encodeftp.cse.ucsc.edu'
354     ftpuser = auth.hosts[encodeftp][0]
355     ftppassword = auth.hosts[encodeftp][2]
356     context = {'archivename': make_submission_name(name),
357                'initialdir': os.path.abspath(outdir),
358                'user': os.getlogin(),
359                'ftpuser': ftpuser,
360                'ftppassword': ftppassword,
361                'ftphost': encodeftp}
362
363     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
364     condor_stream = open(condor_script,'w')
365     condor_stream.write(script % context)
366     condor_stream.close()
367     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
368
369     return condor_script
370
371
372 def make_dag_fragment(ininame, archive_condor, upload_condor):
373     """
374     Make the couple of fragments compress and then upload the data.
375     """
376     cur_dir = os.getcwd()
377     archive_condor = os.path.join(cur_dir, archive_condor)
378     upload_condor = os.path.join(cur_dir, upload_condor)
379     job_basename = make_base_name(ininame)
380
381     fragments = []
382     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
383     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
384     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
385
386     return fragments
387
388
389 def make_base_name(pathname):
390     base = os.path.basename(pathname)
391     name, ext = os.path.splitext(base)
392     return name
393
394
395 def make_submission_name(ininame):
396     name = make_base_name(ininame)
397     return name + ".tgz"
398
399
400 def make_ddf_name(pathname):
401     name = make_base_name(pathname)
402     return name + ".ddf"
403
404
405 def make_condor_name(pathname, run_type=None):
406     name = make_base_name(pathname)
407     elements = [name]
408     if run_type is not None:
409         elements.append(run_type)
410     elements.append("condor")
411     return ".".join(elements)
412
413
414 def parse_filelist(file_string):
415     return file_string.split(",")
416
417
418 def validate_filelist(files):
419     """
420     Die if a file doesn't exist in a file list
421     """
422     for f in files:
423         if not os.path.exists(f):
424             raise RuntimeError("%s does not exist" % (f,))
425
426 if __name__ == "__main__":
427     main()