allow running ucsc_gather --fastq without specifying an analysis name or model
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 from subprocess import Popen, PIPE
15 import sys
16 import time
17 import types
18 import urllib
19 import urllib2
20 import urlparse
21
22 from htsworkflow.util import api
23 from htsworkflow.util.rdfhelp import \
24      dafTermOntology, \
25      fromTypedNode, \
26      get_model, \
27      get_serializer, \
28      load_into_model, \
29      submissionOntology 
30 from htsworkflow.submission.daf import DAFMapper, get_submission_uri
31 from htsworkflow.submission.condorfastq import CondorFastqExtract
32
33 logger = logging.getLogger('ucsc_gather')
34
35 def main(cmdline=None):
36     parser = make_parser()
37     opts, args = parser.parse_args(cmdline)
38     
39     if opts.debug:
40         logging.basicConfig(level = logging.DEBUG )
41     elif opts.verbose:
42         logging.basicConfig(level = logging.INFO )
43     else:
44         logging.basicConfig(level = logging.WARNING )        
45     
46     apidata = api.make_auth_from_opts(opts, parser)
47
48     model = get_model(opts.load_model)
49     if opts.name:
50         mapper = DAFMapper(opts.name, opts.daf,  model)
51         submission_uri = get_submission_uri(opts.name)
52         
53     if opts.load_rdf is not None:
54         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
55
56     if opts.makeddf and opts.daf is None:
57         parser.error("Please specify your daf when making ddf files")
58
59     if len(args) == 0:
60         parser.error("I need at least one library submission-dir input file")
61         
62     library_result_map = []
63     for a in args:
64         library_result_map.extend(read_library_result_map(a))
65
66     if opts.make_tree_from is not None:
67         make_tree_from(opts.make_tree_from, library_result_map)
68             
69     #if opts.daf is not None:
70     #    link_daf(opts.daf, library_result_map)
71
72     if opts.fastq:
73         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
74                                        force=opts.force)
75         extractor.build_fastqs(library_result_map)
76
77     if opts.scan_submission:
78         scan_submission_dirs(mapper, library_result_map)
79
80     if opts.makeddf:
81         make_all_ddfs(mapper, library_result_map, force=opts.force)
82
83     if opts.print_rdf:
84         writer = get_serializer()
85         print writer.serialize_model_to_string(model)
86
87         
88 def make_parser():
89     parser = OptionParser()
90
91     parser.add_option('--name', help="Set submission name")
92     parser.add_option('--load-model', default=None,
93       help="Load model database")
94     parser.add_option('--load-rdf', default=None,
95       help="load rdf statements into model")
96     parser.add_option('--print-rdf', action="store_true", default=False,
97       help="print ending model state")
98
99     # commands
100     parser.add_option('--make-tree-from',
101                       help="create directories & link data files",
102                       default=None)
103     parser.add_option('--fastq', help="generate scripts for making fastq files",
104                       default=False, action="store_true")
105
106     parser.add_option('--scan-submission', default=False, action="store_true",
107                       help="Import metadata for submission into our model")
108     
109     parser.add_option('--makeddf', help='make the ddfs', default=False,
110                       action="store_true")
111     
112     parser.add_option('--daf', default=None, help='specify daf name')
113     parser.add_option('--force', default=False, action="store_true",
114                       help="Force regenerating fastqs")
115
116     # debugging
117     parser.add_option('--verbose', default=False, action="store_true",
118                       help='verbose logging')
119     parser.add_option('--debug', default=False, action="store_true",
120                       help='debug logging')
121
122     api.add_auth_options(parser)
123     
124     return parser
125
126 def make_tree_from(source_path, library_result_map):
127     """Create a tree using data files from source path.
128     """
129     for lib_id, lib_path in library_result_map:
130         if not os.path.exists(lib_path):
131             logging.info("Making dir {0}".format(lib_path))
132             os.mkdir(lib_path)
133         source_lib_dir = os.path.join(source_path, lib_path)
134         if os.path.exists(source_lib_dir):
135             pass
136         for filename in os.listdir(source_lib_dir):
137             source_pathname = os.path.join(source_lib_dir, filename)
138             target_pathname = os.path.join(lib_path, filename)
139             if not os.path.exists(source_pathname):
140                 raise IOError("{0} does not exist".format(source_pathname))
141             if not os.path.exists(target_pathname):
142                 os.symlink(source_pathname, target_pathname)
143                 logging.info(
144                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
145     
146
147 def link_daf(daf_path, library_result_map):
148     if not os.path.exists(daf_path):
149         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
150
151     base_daf = os.path.basename(daf_path)
152     
153     for lib_id, result_dir in library_result_map:
154         if not os.path.exists(result_dir):
155             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
156         submission_daf = os.path.join(result_dir, base_daf)
157         if not os.path.exists(submission_daf):
158             if not os.path.exists(daf_path):
159                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
160             os.link(daf_path, submission_daf)
161
162
163 def scan_submission_dirs(view_map, library_result_map):
164     """Look through our submission directories and collect needed information
165     """
166     for lib_id, result_dir in library_result_map:
167         view_map.import_submission_dir(result_dir, lib_id)
168         
169 def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False):
170     dag_fragment = []
171     for lib_id, result_dir in library_result_map:
172         submissionNode = view_map.get_submission_node(result_dir)
173         dag_fragment.extend(
174             make_ddf(view_map, submissionNode, make_condor, result_dir)
175         )
176
177     if make_condor and len(dag_fragment) > 0:
178         dag_filename = 'submission.dagman'
179         if not force and os.path.exists(dag_filename):
180             logging.warn("%s exists, please delete" % (dag_filename,))
181         else:
182             f = open(dag_filename,'w')
183             f.write( os.linesep.join(dag_fragment))
184             f.write( os.linesep )
185             f.close()
186             
187
188 def make_ddf(view_map, submissionNode, make_condor=False, outdir=None):
189     """
190     Make ddf files, and bonus condor file
191     """
192     dag_fragments = []
193     curdir = os.getcwd()
194     if outdir is not None:
195         os.chdir(outdir)
196     output = sys.stdout
197
198     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
199     if name is None:
200         logging.error("Need name for %s" % (str(submissionNode)))
201         return []
202     
203     ddf_name = name + '.ddf'
204     output = sys.stdout
205     # output = open(ddf_name,'w')
206
207     # filename goes first
208     variables = ['filename']
209     variables.extend(view_map.get_daf_variables())
210     output.write('\t'.join(variables))
211     output.write(os.linesep)
212     
213     submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
214     file_list = []
215     for viewNode in submission_views:
216         record = []
217         for variable_name in variables:
218             varNode = dafTermOntology[variable_name]
219             values = [fromTypedNode(v) for v in list(view_map.model.get_targets(viewNode, varNode))]
220             if variable_name == 'filename':
221                 file_list.extend(values)
222             if len(values) == 0:
223                 attribute = "#None#"
224             elif len(values) == 1:
225                 attribute = values[0]
226             else:
227                 attribute = ",".join(values)
228             record.append(attribute)
229         output.write('\t'.join(record))
230         output.write(os.linesep)
231             
232     logging.info(
233         "Examined {0}, found files: {1}".format(
234             str(submissionNode), ", ".join(file_list)))
235
236     #file_list.append(daf_name)
237     #if ddf_name is not None:
238     #    file_list.append(ddf_name)
239     #
240     #if make_condor:
241     #    archive_condor = make_condor_archive_script(ininame, file_list)
242     #    upload_condor = make_condor_upload_script(ininame)
243     #    
244     #    dag_fragments.extend( 
245     #        make_dag_fragment(ininame, archive_condor, upload_condor)
246     #    ) 
247         
248     os.chdir(curdir)
249     
250     return dag_fragments
251
252
253 def read_library_result_map(filename):
254     """
255     Read a file that maps library id to result directory.
256     Does not support spaces in filenames. 
257     
258     For example:
259       10000 result/foo/bar
260     """
261     stream = open(filename,'r')
262
263     results = []
264     for line in stream:
265         line = line.rstrip()
266         if not line.startswith('#') and len(line) > 0 :
267             library_id, result_dir = line.split()
268             results.append((library_id, result_dir))
269     return results
270
271
272 def make_condor_archive_script(ininame, files):
273     script = """Universe = vanilla
274
275 Executable = /bin/tar
276 arguments = czvhf ../%(archivename)s %(filelist)s
277
278 Error = compress.err.$(Process).log
279 Output = compress.out.$(Process).log
280 Log = /tmp/submission-compress-%(user)s.log
281 initialdir = %(initialdir)s
282 environment="GZIP=-3"
283 request_memory = 20
284
285 queue 
286 """
287     for f in files:
288         if not os.path.exists(f):
289             raise RuntimeError("Missing %s" % (f,))
290
291     context = {'archivename': make_submission_name(ininame),
292                'filelist': " ".join(files),
293                'initialdir': os.getcwd(),
294                'user': os.getlogin()}
295
296     condor_script = make_condor_name(ininame, 'archive')
297     condor_stream = open(condor_script,'w')
298     condor_stream.write(script % context)
299     condor_stream.close()
300     return condor_script
301
302
303 def make_condor_upload_script(ininame):
304     script = """Universe = vanilla
305
306 Executable = /usr/bin/lftp
307 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
308
309 Error = upload.err.$(Process).log
310 Output = upload.out.$(Process).log
311 Log = /tmp/submission-upload-%(user)s.log
312 initialdir = %(initialdir)s
313
314 queue 
315 """
316     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
317     
318     encodeftp = 'encodeftp.cse.ucsc.edu'
319     ftpuser = auth.hosts[encodeftp][0]
320     ftppassword = auth.hosts[encodeftp][2]
321     context = {'archivename': make_submission_name(ininame),
322                'initialdir': os.getcwd(),
323                'user': os.getlogin(),
324                'ftpuser': ftpuser,
325                'ftppassword': ftppassword,
326                'ftphost': encodeftp}
327
328     condor_script = make_condor_name(ininame, 'upload')
329     condor_stream = open(condor_script,'w')
330     condor_stream.write(script % context)
331     condor_stream.close()
332     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
333
334     return condor_script
335
336
337 def make_dag_fragment(ininame, archive_condor, upload_condor):
338     """
339     Make the couple of fragments compress and then upload the data.
340     """
341     cur_dir = os.getcwd()
342     archive_condor = os.path.join(cur_dir, archive_condor)
343     upload_condor = os.path.join(cur_dir, upload_condor)
344     job_basename = make_base_name(ininame)
345
346     fragments = []
347     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
348     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
349     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
350
351     return fragments
352
353
354 def get_library_info(host, apidata, library_id):
355     url = api.library_url(host, library_id)
356     contents = api.retrieve_info(url, apidata)
357     return contents
358
359
360 def make_submission_section(line_counter, files, attributes):
361     """
362     Create a section in the submission ini file
363     """
364     inifile = [ "[line%s]" % (line_counter,) ]
365     inifile += ["files=%s" % (",".join(files))]
366
367     for k,v in attributes.items():
368         inifile += ["%s=%s" % (k,v)]
369     return inifile
370
371
372 def make_base_name(pathname):
373     base = os.path.basename(pathname)
374     name, ext = os.path.splitext(base)
375     return name
376
377
378 def make_submission_name(ininame):
379     name = make_base_name(ininame)
380     return name + ".tgz"
381
382
383 def make_ddf_name(pathname):
384     name = make_base_name(pathname)
385     return name + ".ddf"
386
387
388 def make_condor_name(pathname, run_type=None):
389     name = make_base_name(pathname)
390     elements = [name]
391     if run_type is not None:
392         elements.append(run_type)
393     elements.append("condor")
394     return ".".join(elements)
395
396
397 def parse_filelist(file_string):
398     return file_string.split(",")
399
400
401 def validate_filelist(files):
402     """
403     Die if a file doesn't exist in a file list
404     """
405     for f in files:
406         if not os.path.exists(f):
407             raise RuntimeError("%s does not exist" % (f,))
408
409 def make_md5sum(filename):
410     """Quickly find the md5sum of a file
411     """
412     md5_cache = os.path.join(filename+".md5")
413     print md5_cache
414     if os.path.exists(md5_cache):
415         logging.debug("Found md5sum in {0}".format(md5_cache))
416         stream = open(md5_cache,'r')
417         lines = stream.readlines()
418         md5sum = parse_md5sum_line(lines, filename)
419     else:
420         md5sum = make_md5sum_unix(filename, md5_cache)
421     return md5sum
422     
423 def make_md5sum_unix(filename, md5_cache):
424     cmd = ["md5sum", filename]
425     logging.debug("Running {0}".format(" ".join(cmd)))
426     p = Popen(cmd, stdout=PIPE)
427     stdin, stdout = p.communicate()
428     retcode = p.wait()
429     logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
430     if retcode != 0:
431         logging.error("Trouble with md5sum for {0}".format(filename))
432         return None
433     lines = stdin.split(os.linesep)
434     md5sum = parse_md5sum_line(lines, filename)
435     if md5sum is not None:
436         logging.debug("Caching sum in {0}".format(md5_cache))
437         stream = open(md5_cache, "w")
438         stream.write(stdin)
439         stream.close()
440     return md5sum
441
442 def parse_md5sum_line(lines, filename):
443     md5sum, md5sum_filename = lines[0].split()
444     if md5sum_filename != filename:
445         errmsg = "MD5sum and I disagre about filename. {0} != {1}"
446         logging.error(errmsg.format(filename, md5sum_filename))
447         return None
448     return md5sum
449
450 if __name__ == "__main__":
451     main()