Rework ucsc gather to use RDF models for gathering and storing track metadata.
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 from subprocess import Popen, PIPE
15 import sys
16 import time
17 import types
18 import urllib
19 import urllib2
20 import urlparse
21
22 from htsworkflow.util import api
23 from htsworkflow.util.rdfhelp import \
24      dafTermOntology, \
25      fromTypedNode, \
26      get_model, \
27      get_serializer, \
28      load_into_model, \
29      submissionOntology 
30 from htsworkflow.submission.daf import DAFMapper, get_submission_uri
31 from htsworkflow.submission.condorfastq import CondorFastqExtract
32
33 logger = logging.getLogger('ucsc_gather')
34
35 def main(cmdline=None):
36     parser = make_parser()
37     opts, args = parser.parse_args(cmdline)
38     
39     if opts.debug:
40         logging.basicConfig(level = logging.DEBUG )
41     elif opts.verbose:
42         logging.basicConfig(level = logging.INFO )
43     else:
44         logging.basicConfig(level = logging.WARNING )        
45     
46     apidata = api.make_auth_from_opts(opts, parser)
47
48     model = get_model(opts.load_model)
49     mapper = DAFMapper(opts.name, opts.daf,  model)
50     submission_uri = get_submission_uri(opts.name)
51     if opts.load_rdf is not None:
52         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
53
54     if opts.makeddf and opts.daf is None:
55         parser.error("Please specify your daf when making ddf files")
56
57     if len(args) == 0:
58         parser.error("I need at least one library submission-dir input file")
59         
60     library_result_map = []
61     for a in args:
62         library_result_map.extend(read_library_result_map(a))
63
64     if opts.make_tree_from is not None:
65         make_tree_from(opts.make_tree_from, library_result_map)
66             
67     #if opts.daf is not None:
68     #    link_daf(opts.daf, library_result_map)
69
70     if opts.fastq:
71         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
72                                        force=opts.force)
73         extractor.build_fastqs(library_result_map)
74
75     if opts.scan_submission:
76         scan_submission_dirs(mapper, library_result_map)
77
78     if opts.makeddf:
79         make_all_ddfs(mapper, library_result_map, force=opts.force)
80
81     if opts.print_rdf:
82         writer = get_serializer()
83         print writer.serialize_model_to_string(model)
84
85         
86 def make_parser():
87     parser = OptionParser()
88
89     parser.add_option('--name', help="Set submission name")
90     parser.add_option('--load-model', default=None,
91       help="Load model database")
92     parser.add_option('--load-rdf', default=None,
93       help="load rdf statements into model")
94     parser.add_option('--print-rdf', action="store_true", default=False,
95       help="print ending model state")
96
97     # commands
98     parser.add_option('--make-tree-from',
99                       help="create directories & link data files",
100                       default=None)
101     parser.add_option('--fastq', help="generate scripts for making fastq files",
102                       default=False, action="store_true")
103
104     parser.add_option('--scan-submission', default=False, action="store_true",
105                       help="Import metadata for submission into our model")
106     
107     parser.add_option('--makeddf', help='make the ddfs', default=False,
108                       action="store_true")
109     
110     parser.add_option('--daf', default=None, help='specify daf name')
111     parser.add_option('--force', default=False, action="store_true",
112                       help="Force regenerating fastqs")
113
114     # debugging
115     parser.add_option('--verbose', default=False, action="store_true",
116                       help='verbose logging')
117     parser.add_option('--debug', default=False, action="store_true",
118                       help='debug logging')
119
120     api.add_auth_options(parser)
121     
122     return parser
123
124 def make_tree_from(source_path, library_result_map):
125     """Create a tree using data files from source path.
126     """
127     for lib_id, lib_path in library_result_map:
128         if not os.path.exists(lib_path):
129             logging.info("Making dir {0}".format(lib_path))
130             os.mkdir(lib_path)
131         source_lib_dir = os.path.join(source_path, lib_path)
132         if os.path.exists(source_lib_dir):
133             pass
134         for filename in os.listdir(source_lib_dir):
135             source_pathname = os.path.join(source_lib_dir, filename)
136             target_pathname = os.path.join(lib_path, filename)
137             if not os.path.exists(source_pathname):
138                 raise IOError("{0} does not exist".format(source_pathname))
139             if not os.path.exists(target_pathname):
140                 os.symlink(source_pathname, target_pathname)
141                 logging.info(
142                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
143     
144
145 def link_daf(daf_path, library_result_map):
146     if not os.path.exists(daf_path):
147         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
148
149     base_daf = os.path.basename(daf_path)
150     
151     for lib_id, result_dir in library_result_map:
152         if not os.path.exists(result_dir):
153             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
154         submission_daf = os.path.join(result_dir, base_daf)
155         if not os.path.exists(submission_daf):
156             if not os.path.exists(daf_path):
157                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
158             os.link(daf_path, submission_daf)
159
160
161 def scan_submission_dirs(view_map, library_result_map):
162     """Look through our submission directories and collect needed information
163     """
164     for lib_id, result_dir in library_result_map:
165         view_map.import_submission_dir(result_dir, lib_id)
166         
167 def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False):
168     dag_fragment = []
169     for lib_id, result_dir in library_result_map:
170         submissionNode = view_map.get_submission_node(result_dir)
171         dag_fragment.extend(
172             make_ddf(view_map, submissionNode, make_condor, result_dir)
173         )
174
175     if make_condor and len(dag_fragment) > 0:
176         dag_filename = 'submission.dagman'
177         if not force and os.path.exists(dag_filename):
178             logging.warn("%s exists, please delete" % (dag_filename,))
179         else:
180             f = open(dag_filename,'w')
181             f.write( os.linesep.join(dag_fragment))
182             f.write( os.linesep )
183             f.close()
184             
185
186 def make_ddf(view_map, submissionNode, make_condor=False, outdir=None):
187     """
188     Make ddf files, and bonus condor file
189     """
190     dag_fragments = []
191     curdir = os.getcwd()
192     if outdir is not None:
193         os.chdir(outdir)
194     output = sys.stdout
195
196     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
197     if name is None:
198         logging.error("Need name for %s" % (str(submissionNode)))
199         return []
200     
201     ddf_name = name + '.ddf'
202     output = sys.stdout
203     # output = open(ddf_name,'w')
204
205     # filename goes first
206     variables = ['filename']
207     variables.extend(view_map.get_daf_variables())
208     output.write('\t'.join(variables))
209     output.write(os.linesep)
210     
211     submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
212     file_list = []
213     for viewNode in submission_views:
214         record = []
215         for variable_name in variables:
216             varNode = dafTermOntology[variable_name]
217             values = [fromTypedNode(v) for v in list(view_map.model.get_targets(viewNode, varNode))]
218             if variable_name == 'filename':
219                 file_list.extend(values)
220             if len(values) == 0:
221                 attribute = "#None#"
222             elif len(values) == 1:
223                 attribute = values[0]
224             else:
225                 attribute = ",".join(values)
226             record.append(attribute)
227         output.write('\t'.join(record))
228         output.write(os.linesep)
229             
230     logging.info(
231         "Examined {0}, found files: {1}".format(
232             str(submissionNode), ", ".join(file_list)))
233
234     #file_list.append(daf_name)
235     #if ddf_name is not None:
236     #    file_list.append(ddf_name)
237     #
238     #if make_condor:
239     #    archive_condor = make_condor_archive_script(ininame, file_list)
240     #    upload_condor = make_condor_upload_script(ininame)
241     #    
242     #    dag_fragments.extend( 
243     #        make_dag_fragment(ininame, archive_condor, upload_condor)
244     #    ) 
245         
246     os.chdir(curdir)
247     
248     return dag_fragments
249
250
251 def read_library_result_map(filename):
252     """
253     Read a file that maps library id to result directory.
254     Does not support spaces in filenames. 
255     
256     For example:
257       10000 result/foo/bar
258     """
259     stream = open(filename,'r')
260
261     results = []
262     for line in stream:
263         line = line.rstrip()
264         if not line.startswith('#') and len(line) > 0 :
265             library_id, result_dir = line.split()
266             results.append((library_id, result_dir))
267     return results
268
269
270 def make_condor_archive_script(ininame, files):
271     script = """Universe = vanilla
272
273 Executable = /bin/tar
274 arguments = czvhf ../%(archivename)s %(filelist)s
275
276 Error = compress.err.$(Process).log
277 Output = compress.out.$(Process).log
278 Log = /tmp/submission-compress-%(user)s.log
279 initialdir = %(initialdir)s
280 environment="GZIP=-3"
281 request_memory = 20
282
283 queue 
284 """
285     for f in files:
286         if not os.path.exists(f):
287             raise RuntimeError("Missing %s" % (f,))
288
289     context = {'archivename': make_submission_name(ininame),
290                'filelist': " ".join(files),
291                'initialdir': os.getcwd(),
292                'user': os.getlogin()}
293
294     condor_script = make_condor_name(ininame, 'archive')
295     condor_stream = open(condor_script,'w')
296     condor_stream.write(script % context)
297     condor_stream.close()
298     return condor_script
299
300
301 def make_condor_upload_script(ininame):
302     script = """Universe = vanilla
303
304 Executable = /usr/bin/lftp
305 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
306
307 Error = upload.err.$(Process).log
308 Output = upload.out.$(Process).log
309 Log = /tmp/submission-upload-%(user)s.log
310 initialdir = %(initialdir)s
311
312 queue 
313 """
314     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
315     
316     encodeftp = 'encodeftp.cse.ucsc.edu'
317     ftpuser = auth.hosts[encodeftp][0]
318     ftppassword = auth.hosts[encodeftp][2]
319     context = {'archivename': make_submission_name(ininame),
320                'initialdir': os.getcwd(),
321                'user': os.getlogin(),
322                'ftpuser': ftpuser,
323                'ftppassword': ftppassword,
324                'ftphost': encodeftp}
325
326     condor_script = make_condor_name(ininame, 'upload')
327     condor_stream = open(condor_script,'w')
328     condor_stream.write(script % context)
329     condor_stream.close()
330     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
331
332     return condor_script
333
334
335 def make_dag_fragment(ininame, archive_condor, upload_condor):
336     """
337     Make the couple of fragments compress and then upload the data.
338     """
339     cur_dir = os.getcwd()
340     archive_condor = os.path.join(cur_dir, archive_condor)
341     upload_condor = os.path.join(cur_dir, upload_condor)
342     job_basename = make_base_name(ininame)
343
344     fragments = []
345     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
346     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
347     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
348
349     return fragments
350
351
352 def get_library_info(host, apidata, library_id):
353     url = api.library_url(host, library_id)
354     contents = api.retrieve_info(url, apidata)
355     return contents
356
357
358 def make_submission_section(line_counter, files, attributes):
359     """
360     Create a section in the submission ini file
361     """
362     inifile = [ "[line%s]" % (line_counter,) ]
363     inifile += ["files=%s" % (",".join(files))]
364
365     for k,v in attributes.items():
366         inifile += ["%s=%s" % (k,v)]
367     return inifile
368
369
370 def make_base_name(pathname):
371     base = os.path.basename(pathname)
372     name, ext = os.path.splitext(base)
373     return name
374
375
376 def make_submission_name(ininame):
377     name = make_base_name(ininame)
378     return name + ".tgz"
379
380
381 def make_ddf_name(pathname):
382     name = make_base_name(pathname)
383     return name + ".ddf"
384
385
386 def make_condor_name(pathname, run_type=None):
387     name = make_base_name(pathname)
388     elements = [name]
389     if run_type is not None:
390         elements.append(run_type)
391     elements.append("condor")
392     return ".".join(elements)
393
394
395 def parse_filelist(file_string):
396     return file_string.split(",")
397
398
399 def validate_filelist(files):
400     """
401     Die if a file doesn't exist in a file list
402     """
403     for f in files:
404         if not os.path.exists(f):
405             raise RuntimeError("%s does not exist" % (f,))
406
407 def make_md5sum(filename):
408     """Quickly find the md5sum of a file
409     """
410     md5_cache = os.path.join(filename+".md5")
411     print md5_cache
412     if os.path.exists(md5_cache):
413         logging.debug("Found md5sum in {0}".format(md5_cache))
414         stream = open(md5_cache,'r')
415         lines = stream.readlines()
416         md5sum = parse_md5sum_line(lines, filename)
417     else:
418         md5sum = make_md5sum_unix(filename, md5_cache)
419     return md5sum
420     
421 def make_md5sum_unix(filename, md5_cache):
422     cmd = ["md5sum", filename]
423     logging.debug("Running {0}".format(" ".join(cmd)))
424     p = Popen(cmd, stdout=PIPE)
425     stdin, stdout = p.communicate()
426     retcode = p.wait()
427     logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
428     if retcode != 0:
429         logging.error("Trouble with md5sum for {0}".format(filename))
430         return None
431     lines = stdin.split(os.linesep)
432     md5sum = parse_md5sum_line(lines, filename)
433     if md5sum is not None:
434         logging.debug("Caching sum in {0}".format(md5_cache))
435         stream = open(md5_cache, "w")
436         stream.write(stdin)
437         stream.close()
438     return md5sum
439
440 def parse_md5sum_line(lines, filename):
441     md5sum, md5sum_filename = lines[0].split()
442     if md5sum_filename != filename:
443         errmsg = "MD5sum and I disagre about filename. {0} != {1}"
444         logging.error(errmsg.format(filename, md5sum_filename))
445         return None
446     return md5sum
447
448 if __name__ == "__main__":
449     main()