replace xrange with enumerate
[htsworkflow.git] / htsworkflow / submission / daf.py
1 """Parse UCSC DAF File
2 """
3 import collections
4 import logging
5 import os
6 from pprint import pformat
7 import re
8 import string
9 from six.moves import StringIO
10 import types
11 from six.moves import urllib
12
13 import RDF
14 from htsworkflow.util.rdfhelp import \
15      blankOrUri, \
16      dafTermOntology, \
17      dump_model, \
18      get_model, \
19      libraryOntology, \
20      owlNS, \
21      rdfNS, \
22      submissionLog, \
23      submissionOntology, \
24      toTypedNode, \
25      fromTypedNode
26 from htsworkflow.util.hashfile import make_md5sum
27
28 LOGGER = logging.getLogger(__name__)
29
30 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
31 VARIABLES_TERM_NAME = 'variables'
32 DAF_PRE_VARIABLES = ['files', 'view']
33 DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
34
35
36 class ModelException(RuntimeError):
37     """Assumptions about the RDF model failed"""
38     pass
39
40
41 class MetadataLookupException(RuntimeError):
42     """Problem accessing metadata"""
43     pass
44
45
46 # STATES
47 DAF_HEADER = 1
48 DAF_VIEW = 2
49
50
51 def parse_into_model(model, subject, filename):
52     """Read a DAF into RDF Model
53
54     requires a subject node to attach statements to
55     """
56     attributes = parse(filename)
57     add_to_model(model, attributes, subject)
58
59
60 def fromstream_into_model(model, subject, daf_stream):
61     """Load daf stream into model attached to node subject
62     """
63     attributes = parse_stream(daf_stream)
64     add_to_model(model, attributes, subject)
65
66
67 def fromstring_into_model(model, subject, daf_string):
68     """Read a string containing a DAF into RDF Model
69
70     requires a short submission name
71     """
72     attributes = fromstring(daf_string)
73     add_to_model(model, attributes, subject)
74
75
76 def parse(filename):
77     """Parse daf from a file
78     """
79     stream = open(filename, 'r')
80     attributes = parse_stream(stream)
81     stream.close()
82     return attributes
83
84
85 def fromstring(daf_string):
86     """Parse UCSC daf from a provided string"""
87     stream = StringIO(daf_string)
88     return parse_stream(stream)
89
90
91 def parse_stream(stream):
92     """Parse UCSC dat stored in a stream"""
93     comment_re = re.compile("#.*$")
94
95     state = DAF_HEADER
96     attributes = {'views': {}}
97     view_name = None
98     view_attributes = {}
99     for line in stream:
100         #remove comments
101         line = comment_re.sub("", line)
102         nstop = _extract_name_index(line)
103         name = line[0:nstop]
104         sstop = _consume_whitespace(line, start=nstop)
105         vstop = _extract_value_index(line, start=sstop)
106         value = line[sstop:vstop]
107
108         if value.lower() in ('yes',):
109             value = True
110         elif value.lower() in ('no',):
111             value = False
112
113         if len(name) == 0:
114             if view_name is not None:
115                 attributes['views'][view_name] = view_attributes
116                 view_name = None
117                 view_attributes = {}
118             state = DAF_HEADER
119         elif state == DAF_HEADER and name in DAF_VARIABLE_NAMES:
120             attributes[name] = [x.strip() for x in value.split(',')]
121         elif state == DAF_HEADER and name == 'view':
122             view_name = value
123             view_attributes['view'] = value
124             state = DAF_VIEW
125         elif state == DAF_HEADER:
126             attributes[name] = value
127         elif state == DAF_VIEW:
128             view_attributes[name] = value
129
130     # save last block
131     if view_name is not None:
132         attributes['views'][view_name] = view_attributes
133
134     LOGGER.debug("DAF Attributes" + pformat(attributes))
135     return attributes
136
137
138 def _consume_whitespace(line, start=0):
139     """return index of next non whitespace character
140
141     returns length of string if it can't find anything
142     """
143     for i, c in enumerate(line[start:]):
144         if c not in string.whitespace:
145             return i+start
146
147     return len(line)
148
149
150 def _extract_name_index(line, start=0):
151     """Used to find end of word by looking for a whitespace character
152
153     returns length of string if nothing matches
154     """
155     for i, c in enumerate(line[start:]):
156         if c in string.whitespace:
157             return i+start
158
159     return len(line)
160
161
162 def _extract_value_index(line, start=0):
163     """Returns position of last non-whitespace character
164     """
165     shortline = line.rstrip()
166     return len(shortline)
167
168
169 def convert_to_rdf_statements(attributes, subject):
170     """Convert dictionary of DAF attributes into rdf statements
171
172     The statements are attached to the provided subject node
173     """
174     variables_term = dafTermOntology[VARIABLES_TERM_NAME]
175     statements = []
176     for daf_key in attributes:
177         predicate = dafTermOntology[daf_key]
178         if daf_key == 'views':
179             statements.extend(_views_to_statements(subject,
180                                                    dafTermOntology,
181                                                    attributes[daf_key]))
182         elif daf_key in DAF_VARIABLE_NAMES:
183             for var in attributes.get(daf_key, []):
184                 obj = toTypedNode(var)
185                 statements.append(RDF.Statement(subject, variables_term, obj))
186         else:
187             value = attributes[daf_key]
188             obj = toTypedNode(value)
189             statements.append(RDF.Statement(subject, predicate, obj))
190
191     return statements
192
193
194 def _views_to_statements(subject, dafNS, views):
195     """Attach view attributes to new view nodes atached to provided subject
196     """
197     viewNS = get_view_namespace(subject)
198
199     statements = []
200     for view_name in views:
201         view_attributes = views[view_name]
202         viewSubject = viewNS[view_name]
203         statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
204         statements.append(
205             RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
206         for view_attribute_name in view_attributes:
207             predicate = dafNS[view_attribute_name]
208             obj = toTypedNode(view_attributes[view_attribute_name])
209             statements.append(RDF.Statement(viewSubject, predicate, obj))
210
211         #statements.extend(convert_to_rdf_statements(view, viewNode))
212     return statements
213
214
215 def add_to_model(model, attributes, subject):
216     for statement in convert_to_rdf_statements(attributes, subject):
217         model.add_statement(statement)
218
219
220 def get_submission_uri(name):
221     return submissionLog[name].uri
222
223
224 def submission_uri_to_string(submission_uri):
225     if isinstance(submission_uri, RDF.Node):
226         submission_uri = str(submission_uri.uri)
227     elif isinstance(submission_uri, RDF.Uri):
228         submission_uri = str(submission_uri)
229     if submission_uri[-1] != '/':
230         submission_uri += '/'
231     return submission_uri
232
233
234 def get_view_namespace(submission_uri):
235     submission_uri = submission_uri_to_string(submission_uri)
236     view_uri = urllib.parse.urljoin(submission_uri, 'view/')
237     viewNS = RDF.NS(view_uri)
238     return viewNS
239
240
241 class UCSCSubmission(object):
242     """Build a submission by examining the DAF for what we need to submit
243     """
244     def __init__(self, name, daf_file=None, model=None):
245         """Construct a RDF backed model of a UCSC DAF
246
247         :args:
248           name (str): the name of this submission (used to construct DAF url)
249           daf_file (str, stream, or None):
250              if str, use as filename
251              if stream, parse as stream
252              if none, don't attempt to load the DAF into our model
253           model (RDF.Model or None):
254              if None, construct a memory backed model
255              otherwise specifies model to use
256         """
257         if daf_file is None and model is None:
258             LOGGER.error("We need a DAF or Model containing a DAF to work")
259
260         self.name = name
261         self.submissionSet = get_submission_uri(self.name)
262         self.viewNS = get_view_namespace(self.submissionSet)
263
264         if model is not None:
265             self.model = model
266         else:
267             self.model = get_model()
268
269         if isinstance(daf_file, collections.Iterable):
270             # its some kind of stream
271             self.daf = daf_file.read()
272         else:
273             # file
274             stream = open(daf_file, 'rt')
275             self.daf = stream.read()
276             stream.close()
277
278         fromstring_into_model(self.model, self.submissionSet, self.daf)
279
280         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
281         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
282         self.__view_map = None
283
284     def _get_daf_name(self):
285         return self.name + '.daf'
286     daf_name = property(_get_daf_name,doc="construct name for DAF file")
287
288     def add_pattern(self, view_name, filename_pattern):
289         """Map a filename regular expression to a view name
290         """
291         obj = toTypedNode(filename_pattern)
292         self.model.add_statement(
293             RDF.Statement(self.viewNS[view_name],
294                           dafTermOntology['filename_re'],
295                           obj))
296
297     def scan_submission_dirs(self, result_map):
298         """Examine files in our result directory
299         """
300         for lib_id, result_dir in result_map.items():
301             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
302             try:
303                 self.import_submission_dir(result_dir, lib_id)
304             except MetadataLookupException as e:
305                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
306
307     def import_submission_dir(self, submission_dir, library_id):
308         """Import a submission directories and update our model as needed
309         """
310         #attributes = get_filename_attribute_map(paired)
311         libNode = self.libraryNS[library_id + "/"]
312
313         self._add_library_details_to_model(libNode)
314
315         submission_files = os.listdir(submission_dir)
316         for filename in submission_files:
317             self.construct_track_attributes(submission_dir, libNode, filename)
318
319     def construct_track_attributes(self, submission_dir, libNode, pathname):
320         """Looking for the best extension
321         The 'best' is the longest match
322
323         :Args:
324         filename (str): the filename whose extention we are about to examine
325         """
326         path, filename = os.path.split(pathname)
327
328         LOGGER.debug("Searching for view")
329         view = self.find_view(filename)
330         if view is None:
331             LOGGER.warn("Unrecognized file: {0}".format(pathname))
332             return None
333         if str(view) == str(libraryOntology['ignore']):
334             return None
335
336         submission_name = self.make_submission_name(submission_dir)
337         submissionNode = self.get_submission_node(submission_dir)
338         submission_uri = str(submissionNode.uri)
339         view_name = fromTypedNode(self.model.get_target(view,
340                                        dafTermOntology['name']))
341         if view_name is None:
342             errmsg = 'Could not find view name for {0}'
343             LOGGER.warning(errmsg.format(str(view)))
344             return
345
346         view_name = str(view_name)
347         submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
348
349         self.model.add_statement(
350             RDF.Statement(self.submissionSet,
351                           dafTermOntology['has_submission'],
352                           submissionNode))
353         LOGGER.debug("Adding statements to {0}".format(str(submissionNode)))
354         self.model.add_statement(RDF.Statement(submissionNode,
355                                                submissionOntology['has_view'],
356                                                submissionView))
357         self.model.add_statement(RDF.Statement(submissionNode,
358                                                submissionOntology['name'],
359                                                toTypedNode(submission_name)))
360         self.model.add_statement(
361             RDF.Statement(submissionNode,
362                           rdfNS['type'],
363                           submissionOntology['submission']))
364         self.model.add_statement(RDF.Statement(submissionNode,
365                                                libraryOntology['library'],
366                                                libNode))
367
368         LOGGER.debug("Adding statements to {0}".format(str(submissionView)))
369         # add track specific information
370         self.model.add_statement(
371             RDF.Statement(submissionView, dafTermOntology['view'], view))
372         self.model.add_statement(
373             RDF.Statement(submissionView,
374                           dafTermOntology['paired'],
375                           toTypedNode(self._is_paired(libNode))))
376         self.model.add_statement(
377             RDF.Statement(submissionView,
378                           dafTermOntology['submission'],
379                           submissionNode))
380
381         # add file specific information
382         self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
383
384         LOGGER.debug("Done.")
385
386     def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
387         # add file specific information
388         LOGGER.debug("Updating file md5sum")
389         submission_pathname = os.path.join(submission_dir, filename)
390         fileNode = RDF.Node(RDF.Uri("file://" + submission_pathname))
391         self.model.add_statement(
392             RDF.Statement(submissionView,
393                           dafTermOntology['has_file'],
394                           fileNode))
395         self.model.add_statement(
396             RDF.Statement(fileNode,
397                           dafTermOntology['filename'],
398                           filename))
399
400         md5 = make_md5sum(submission_pathname)
401         if md5 is None:
402             errmsg = "Unable to produce md5sum for {0}"
403             LOGGER.warning(errmsg.format(submission_pathname))
404         else:
405             self.model.add_statement(
406                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
407
408     def _add_library_details_to_model(self, libNode):
409         parser = RDF.Parser(name='rdfa')
410         new_statements = parser.parse_as_stream(libNode.uri)
411         for s in new_statements:
412             # don't override things we already have in the model
413             targets = list(self.model.get_targets(s.subject, s.predicate))
414             if len(targets) == 0:
415                 self.model.append(s)
416
417     def get_daf_variables(self):
418         """Returns simple variables names that to include in the ddf
419         """
420         variables_term = dafTermOntology[VARIABLES_TERM_NAME]
421         results = []
422         results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
423         results = DAF_PRE_VARIABLES[:]
424         if self.need_replicate() and 'replicate' not in results:
425             results.append('replicate')
426
427         for obj in self.model.get_targets(self.submissionSet, variables_term):
428             value = str(fromTypedNode(obj))
429             if value not in results:
430                 results.append(value)
431         results.extend([v for v in DAF_POST_VARIABLES if v not in results])
432         return results
433
434     def make_submission_name(self, submission_dir):
435         submission_dir = os.path.normpath(submission_dir)
436         submission_dir_name = os.path.split(submission_dir)[1]
437         if len(submission_dir_name) == 0:
438             raise RuntimeError(
439                 "Submission dir name too short: {0}".format(submission_dir))
440         return submission_dir_name
441
442     def get_submission_node(self, submission_dir):
443         """Convert a submission directory name to a submission node
444         """
445         submission_name = self.make_submission_name(submission_dir)
446         return self.submissionSetNS[submission_name]
447
448     def _get_library_attribute(self, libNode, attribute):
449         if not isinstance(attribute, RDF.Node):
450             attribute = libraryOntology[attribute]
451
452         targets = list(self.model.get_targets(libNode, attribute))
453         if len(targets) > 0:
454             return self._format_library_attribute(targets)
455         else:
456             return None
457
458         #targets = self._search_same_as(libNode, attribute)
459         #if targets is not None:
460         #    return self._format_library_attribute(targets)
461
462         # we don't know anything about this attribute
463         self._add_library_details_to_model(libNode)
464
465         targets = list(self.model.get_targets(libNode, attribute))
466         if len(targets) > 0:
467             return self._format_library_attribute(targets)
468
469         return None
470
471     def _format_library_attribute(self, targets):
472         if len(targets) == 0:
473             return None
474         elif len(targets) == 1:
475             return fromTypedNode(targets[0])
476         elif len(targets) > 1:
477             return [fromTypedNode(t) for t in targets]
478
479     def _search_same_as(self, subject, predicate):
480         # look for alternate names
481         other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
482         for other in other_predicates:
483             targets = list(self.model.get_targets(subject, other))
484             if len(targets) > 0:
485                 return targets
486         return None
487
488     def find_view(self, filename):
489         """Search through potential DAF filename patterns
490         """
491         if self.__view_map is None:
492             self.__view_map = self._get_filename_view_map()
493
494         results = []
495         for pattern, view in self.__view_map.items():
496             if re.match(pattern, filename):
497                 results.append(view)
498
499         if len(results) > 1:
500             msg = "%s matched multiple views %s" % (
501                 filename,
502                 [str(x) for x in results])
503             raise ModelException(msg)
504         elif len(results) == 1:
505             return results[0]
506         else:
507             return None
508
509     def get_view_name(self, view):
510         view_term = submissionOntology['view_name']
511         names = list(self.model.get_targets(view, view_term))
512         if len(names) == 1:
513             return fromTypedNode(names[0])
514         else:
515             msg = "Found wrong number of view names for {0} len = {1}"
516             msg = msg.format(str(view), len(names))
517             LOGGER.error(msg)
518             raise RuntimeError(msg)
519
520     def _get_filename_view_map(self):
521         """Query our model for filename patterns
522
523         return a dictionary of compiled regular expressions to view names
524         """
525         filename_query = RDF.Statement(
526             None, dafTermOntology['filename_re'], None)
527
528         patterns = {}
529         for s in self.model.find_statements(filename_query):
530             view_name = s.subject
531             literal_re = s.object.literal_value['string']
532             LOGGER.debug("Found: %s" % (literal_re,))
533             try:
534                 filename_re = re.compile(literal_re)
535             except re.error as e:
536                 LOGGER.error("Unable to compile: %s" % (literal_re,))
537             patterns[literal_re] = view_name
538         return patterns
539
540     def _get_library_url(self):
541         return str(self.libraryNS[''].uri)
542
543     def _set_library_url(self, value):
544         self.libraryNS = RDF.NS(str(value))
545
546     library_url = property(_get_library_url, _set_library_url)
547
548     def _is_paired(self, libNode):
549         """Determine if a library is paired end"""
550         library_type = self._get_library_attribute(libNode, 'library_type')
551         if library_type is None:
552             errmsg = "%s doesn't have a library type"
553             raise ModelException(errmsg % (str(libNode),))
554
555         single = ['CSHL (lacking last nt)',
556                   'Single End (non-multiplexed)',
557                   'Small RNA (non-multiplexed)',]
558         paired = ['Barcoded Illumina',
559                   'Multiplexing',
560                   'Nextera',
561                   'Paired End (non-multiplexed)',]
562         if library_type in single:
563             return False
564         elif library_type in paired:
565             return True
566         else:
567             raise MetadataLookupException(
568                 "Unrecognized library type %s for %s" % \
569                 (library_type, str(libNode)))
570
571     def need_replicate(self):
572         viewTerm = dafTermOntology['views']
573         replicateTerm = dafTermOntology['hasReplicates']
574
575         views = self.model.get_targets(self.submissionSet, viewTerm)
576
577         for view in views:
578             replicate = self.model.get_target(view, replicateTerm)
579             if fromTypedNode(replicate):
580                 return True
581
582         return False
583
584
585     def link_daf(self, result_map):
586         if self.daf is None or len(self.daf) == 0:
587             raise RuntimeError(
588                 "DAF data does not exist, how can I link to it?")
589
590         base_daf = self.daf_name
591
592         for result_dir in result_map.values():
593             if not os.path.exists(result_dir):
594                 raise RuntimeError(
595                     "Couldn't find target directory %s" %(result_dir,))
596             submission_daf = os.path.join(result_dir, base_daf)
597             if os.path.exists(submission_daf):
598                 previous_daf = open(submission_daf, 'r').read()
599                 if self.daf != previous_daf:
600                     LOGGER.info("Old daf is different, overwriting it.")
601             stream = open(submission_daf, 'w')
602             stream.write(self.daf)
603             stream.close()
604
605
606 if __name__ == "__main__":
607     example_daf = """# Lab and general info
608 grant             Hardison
609 lab               Caltech-m
610 dataType          ChipSeq
611 variables         cell, antibody,sex,age,strain,control
612 compositeSuffix   CaltechHistone
613 assembly          mm9
614 dafVersion        2.0
615 validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000
616
617 # Track/view definition
618 view             FastqRd1
619 longLabelPrefix  Caltech Fastq Read 1
620 type             fastq
621 hasReplicates    yes
622 required         no
623
624 view             Signal
625 longLabelPrefix  Caltech Histone Signal
626 type             bigWig
627 hasReplicates    yes
628 required         no
629 """
630     model = get_model()
631     example_daf_stream = StringIO(example_daf)
632     name = "test_rep"
633     mapper = DAFMapper(name, daf_file = example_daf_stream, model=model)
634     dump_model(model)
635
636