f4e854a0fe4cd742a7481513f67455b70291b2e0
[htsworkflow.git] / scripts / make-library-tree
1 #!/usr/bin/env python
2
3 from ConfigParser import SafeConfigParser
4 import json
5 import logging
6 import os
7 from optparse import OptionParser
8 import stat
9 import re
10 import shelve
11 import urllib
12 import urllib2
13 import urlparse
14
15 eland_re = re.compile('s_(?P<lane>\d)(?P<read>_\d)?_eland_')
16 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+')
17 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2')
18
19 class SequenceFile(object):
20     def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None):
21         self.filetype = filetype
22         self.path = path
23         self.flowcell = flowcell
24         self.lane = lane
25         self.read = read
26         self.pf = pf
27         self.cycle = cycle
28
29     def __hash__(self):
30         return hash(self.key())
31
32     def key(self):
33         return (self.flowcell, self.lane)
34
35     def unicode(self):
36         return unicode(self.path)
37
38     def __repr__(self):
39         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
40
41     def make_target_name(self, root):
42         """
43         Create target name for where we need to link this sequence too
44         """
45         path, basename = os.path.split(self.path)
46         # Because the names aren't unque we include the flowcel name
47         # because there were different eland files for different length
48         # analyses, we include the cycle length in the name.
49         if self.filetype == 'eland':
50             template = "%(flowcell)s_%(cycle)s_%(eland)s"
51             basename = template % { 'flowcell': self.flowcell,
52                                     'cycle': self.cycle,
53                                     'eland': basename }
54         # else:
55         # all the other file types have names that include flowcell/lane
56         # information and thus are unique so we don't have to do anything
57         return os.path.join(root, basename)
58         
59 def parse_srf(path, filename):
60     basename, ext = os.path.splitext(filename)
61     records = basename.split('_')
62     flowcell = records[4]
63     lane = int(records[5][0])
64     fullpath = os.path.join(path, filename)
65     return SequenceFile('srf', fullpath, flowcell, lane)
66
67 def parse_qseq(path, filename):
68     basename, ext = os.path.splitext(filename)
69     records = basename.split('_')
70     fullpath = os.path.join(path, filename)
71     flowcell = records[4]
72     lane = int(records[5][1])
73     read = int(records[6][1])
74     return SequenceFile('qseq', fullpath, flowcell, lane, read)
75
76 def parse_fastq(path, filename):
77     basename, ext = os.path.splitext(filename)
78     records = basename.split('_')
79     fullpath = os.path.join(path, filename)
80     flowcell = records[4]
81     lane = int(records[5][1])
82     read = int(records[6][1])
83     if records[-1] == 'pass':
84         pf = True
85     else:
86         pf = False
87     return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf)
88
89 def parse_eland(path, filename, eland_match):
90     fullpath = os.path.join(path, filename)
91     rest, cycle = os.path.split(path)
92     rest, flowcell = os.path.split(rest)
93     lane = eland_match.group('lane')
94     read = eland_match.group('read')
95     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle)
96     
97 def scan_for_sequences(dirs):
98     sequences = []
99     for d in dirs:
100         logging.info("Scanning %s for sequences" % (d,))
101         for path, dirname, filenames in os.walk(d):
102             for f in filenames:
103                 seq = None
104                 # find sequence files
105                 if raw_seq_re.match(f):
106                     if f.endswith('.md5'):
107                         continue
108                     elif f.endswith('.srf') or f.endswith('.srf.bz2'):
109                         seq = parse_srf(path, f)
110                     elif qseq_re.match(f):
111                         seq = parse_qseq(path, f)
112                     elif f.endswith('fastq') or f.endswith('.fastq.bz2'):
113                         seq = parse_fastq(path, f)
114                 eland_match = eland_re.match(f)
115                 if eland_match:
116                     if f.endswith('.md5'):
117                         continue
118                     seq = parse_eland(path, f, eland_match)
119                 if seq:
120                     sequences.append(seq)
121                     logging.debug("Found sequence at %s" % (f,))
122                     
123     return sequences
124
125 def retrieve_info(url, apidata):
126     """
127     Return a dictionary from the HTSworkflow API
128     """
129     try:
130         apipayload = urllib.urlencode(apidata)
131         web = urllib2.urlopen(url, apipayload)
132     except urllib2.URLError, e:
133         if e.code == 404:
134             logging.info("%s was not found" % (url,))
135             return None
136         else:
137             errmsg = 'URLError: %d %s' % (e.code, e.msg)
138             raise IOError(errmsg)
139     
140     contents = web.read()
141     headers = web.info()
142
143     return json.loads(contents)
144
145 def build_flowcell_db(fcdb_filename, sequences, baseurl, apiid, apikey):
146     """
147     compare our flowcell database with our list of sequences and return
148     a fully populated database
149     """
150     fcdb = shelve.open(fcdb_filename)
151     libdb = {}
152     apidata = {'apiid': apiid, 'apikey': apikey}
153     for seq in sequences:
154         flowcell = seq.flowcell
155         flowcell_info = None
156
157         # get info about flowcell from server or shelf
158         if not fcdb.has_key(flowcell):
159             url = urlparse.urljoin(baseurl, 'experiments/config/%s/json' % (flowcell,))
160             flowcell_info = retrieve_info(url, apidata)
161             if flowcell_info is not None:
162                 fcdb[flowcell] = flowcell_info
163         else:
164             flowcell_info = fcdb[flowcell]
165
166         # make library id db
167         if flowcell_info is not None:
168             seq_library_id = flowcell_info['lane_set'][unicode(seq.lane)]['library_id']
169             libdb.setdefault(seq_library_id, []).append(seq)
170            
171     fcdb.sync()
172     return fcdb, libdb
173
174 def carefully_make_hardlink(source, destination, dry_run=False):
175     """
176     Make a hard link, failing if a different link already exists
177
178     Checking to see if the link already exists and is
179     the same as the link we want to make.
180     If the link already exists and is different, throw an error.
181
182     If we didn't update anything return 0, if we did update
183     return 1.
184     """
185     logging.debug("CHECKING: %s -> %s", source, destination)
186
187     if not os.path.exists(source):
188         logging.warning("%s doesn't exist", source)
189         return 0
190
191     if os.path.exists(destination):
192         if os.path.samefile(source, destination):
193             logging.debug('SAME: %s -> %s' % (source, destination))
194             return 0
195         else:
196             raise IOError('%s and %s are different files' % \
197                            (source, destination))
198     logging.debug('Linking: %s -> %s' % (source, destination))
199
200     # we would do something by this part
201     if dry_run: return 1
202
203     os.link(source, destination)
204     os.chmod(destination,
205              stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH )
206     return 1
207     
208 def make_library_links(root, library_db, dry_run=False):
209     """
210     Make a tree of sequencer roots organized by library id
211
212     Root is the root of the library tree
213     library_db is a dictionary of SequenceFiles organized by library id
214     """
215     count = 0
216     root = os.path.abspath(root)
217     for lib_id, sequences in library_db.items():
218         target_dir = os.path.join(root, lib_id)
219         if not os.path.exists(target_dir):
220             logging.info("mkdir %s" % (target_dir,))
221             if not dry_run:
222                 os.mkdir(target_dir)
223             
224         for s in sequences:
225             count += carefully_make_hardlink(s.path,
226                                              s.make_target_name(target_dir),
227                                              dry_run=dry_run)
228     return count
229
230 def configure_logging(opts):
231     # setup logging
232     level = logging.WARN
233     if opts.verbose:
234         level = logging.INFO
235     if opts.debug:
236         level = logging.DEBUG
237     logging.basicConfig(level=level)
238     
239
240 def configure_opts(opts):
241     """
242     Load in options from config file
243     """
244     SECTION_NAME = 'sequence_archive'
245     ARCHIVE_OPT = 'sequence_archive'
246     CACHE_OPT = 'cache'
247     HOST_OPT = 'host'
248     APIID_OPT = 'apiid'
249     APIKEY_OPT = 'apikey'
250
251     # figure out what config file to read
252     config_path = [os.path.expanduser('~/.htsworkflow.ini'),
253                    '/etc/htsworkflow.ini']
254     if opts.config is not None:
255         config_path = [opts.config]
256     # parse options from config file
257     config_file = SafeConfigParser()
258     config_file.read(config_path)
259
260     # load defaults from config file if not overriden by the command line
261     if opts.cache is None:
262         if config_file.has_option(SECTION_NAME, CACHE_OPT):
263             opts.cache = config_file.get(FRONTEND_NAME, CACHE_OPT)
264         else:
265             opts.cache = os.path.expanduser('~/.flowcelldb.shelve')
266
267     if opts.sequence_archive is None and \
268        config_file.has_option(SECTION_NAME, ARCHIVE_OPT):
269         opts.sequence_archive = config_file.get(SECTION_NAME, ARCHIVE_OPT)
270
271     opts.sequence_archive = os.path.abspath(opts.sequence_archive)
272     opts.library_tree = os.path.join(opts.sequence_archive, 'libraries')
273     opts.flowcells = os.path.join(opts.sequence_archive, 'flowcells')
274     opts.srfs = os.path.join(opts.sequence_archive, 'srfs')
275
276     if opts.host is None and config_file.has_option(SECTION_NAME, HOST_OPT):
277         opts.host = config_file.get(SECTION_NAME, HOST_OPT)
278
279     if opts.apiid is None and config_file.has_option(SECTION_NAME, APIID_OPT):
280         opts.apiid = config_file.get(SECTION_NAME, APIID_OPT)
281
282     if opts.apikey is None and config_file.has_option(SECTION_NAME, APIKEY_OPT):
283         opts.apikey = config_file.get(SECTION_NAME, APIKEY_OPT)
284       
285     return opts
286
287 def make_parser():
288     """
289     Make parser
290     """
291     parser = OptionParser()
292     parser.add_option('-c', '--config', default=None,
293                       help='path to a configuration file containing a '
294                            'sequence archive section')
295     parser.add_option('--cache', default=None,
296                       help="default flowcell cache")
297     
298     parser.add_option('--host', default=None,
299                       help="specify http://host for quering flowcell information")
300     parser.add_option('--apiid', default=None,
301                       help="API ID to use when retriving information")
302     parser.add_option("--apikey", default=None,
303                       help="API Key for when retriving information")
304     
305     parser.add_option('-a', '--sequence-archive', default=None,
306                       help='path to where the sequence archive lives')
307
308     parser.add_option('-v', '--verbose', action='store_true', default=False,
309                       help='be more verbose')
310     parser.add_option('-d', '--debug', action='store_true', default=False,
311                       help='report everything')
312              
313     parser.add_option("--dry-run", dest="dry_run", action="store_true",
314                       default=False,
315                       help="Don't modify the filesystem")
316     return parser
317
318 def main(cmdline=None):
319     parser = make_parser()
320     opts, args = parser.parse_args(cmdline)
321
322     configure_logging(opts)
323     opts = configure_opts(opts)
324   
325     # complain if critical things are missing
326     if opts.cache is None:
327        parser.error('Need location of htsworkflow frontend database')
328
329     if opts.sequence_archive is None:
330        parser.error('Need the root path for the sequence archive')
331
332     seq_dirs = [ opts.flowcells, opts.srfs ]
333     if len(args) > 0:
334         seq_dirs = [os.path.abspath(f) for f in args]
335     
336     seqs = scan_for_sequences(seq_dirs)
337     fcdb, libdb = build_flowcell_db(opts.cache, seqs, opts.host, opts.apiid, opts.apikey)
338     updates = make_library_links(opts.library_tree, libdb, dry_run=opts.dry_run)
339     
340     logging.warn("%s flowcells in database" % (len(fcdb),))
341     logging.warn("found %s sequence files" % (len(seqs),))
342     logging.warn("%s libraries being checked" % (len(libdb),))
343     logging.warn("%s sequence files were linked" % (updates,))
344     
345     return 0
346     
347 if __name__ == "__main__":
348     main()