From 61ce2870651f2f3d7d63d3f7efce2d4bd84cc92f Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Fri, 4 Nov 2011 16:05:26 -0700 Subject: [PATCH] Use a logger initialized to the module name much more consistently. e.g. LOGGER = logging.getLogger(__name__) (Used uppercase since LOGGER ends up being a module level variable) --- extra/ucsc_encode_submission/encode_find.py | 14 +-- extra/ucsc_encode_submission/ucsc_gather.py | 16 +-- htsworkflow/automation/copier.py | 96 +++++++++--------- htsworkflow/automation/runner.py | 64 ++++++------ htsworkflow/automation/spoolwatcher.py | 50 +++++----- htsworkflow/frontend/inventory/models.py | 87 ++++++++-------- htsworkflow/frontend/samples/views.py | 4 +- htsworkflow/pipelines/bustard.py | 20 ++-- htsworkflow/pipelines/configure_run.py | 84 ++++++++-------- htsworkflow/pipelines/eland.py | 24 ++--- htsworkflow/pipelines/firecrest.py | 15 +-- htsworkflow/pipelines/genome_mapper.py | 31 +++--- htsworkflow/pipelines/gerald.py | 12 ++- htsworkflow/pipelines/ipar.py | 11 ++- htsworkflow/pipelines/retrieve_config.py | 28 +++--- htsworkflow/pipelines/runfolder.py | 104 ++++++++++---------- htsworkflow/pipelines/sequences.py | 32 +++--- htsworkflow/pipelines/srf.py | 22 +++-- htsworkflow/pipelines/srf2fastq.py | 24 ++--- htsworkflow/pipelines/summary.py | 23 ++--- htsworkflow/submission/condorfastq.py | 96 +++++++++--------- htsworkflow/submission/daf.py | 4 +- htsworkflow/util/api.py | 13 +-- htsworkflow/util/queuecommands.py | 33 +++---- htsworkflow/util/test/test_queuecommands.py | 5 +- htsworkflow/version.py | 10 +- scripts/htsw-gerald2bed | 8 +- scripts/htsw-runfolder | 25 ++--- scripts/htsw-update-archive | 50 +++++----- scripts/rerun_eland.py | 56 ++++++----- 30 files changed, 551 insertions(+), 510 deletions(-) diff --git a/extra/ucsc_encode_submission/encode_find.py b/extra/ucsc_encode_submission/encode_find.py index a56bf1d..fdf7542 100644 --- a/extra/ucsc_encode_submission/encode_find.py +++ b/extra/ucsc_encode_submission/encode_find.py @@ -196,7 +196,7 @@ def load_my_submissions(model, limit=None, cookie=None): update_submission_detail(model, subUrn, status, last_mod, cookie=cookie) - logging.info("Processed {0}".format(subUrn)) + LOGGER.info("Processed {0}".format(subUrn)) @@ -268,7 +268,7 @@ def update_submission_detail(model, subUrn, status, recent_update, cookie): if len(status_nodes) == 0: # has no status node, add one - logging.info("Adding status node to {0}".format(subUrn)) + LOGGER.info("Adding status node to {0}".format(subUrn)) status_node = create_status_node(subUrn, recent_update) add_stmt(model, subUrn, HasStatusN, status_node) add_stmt(model, status_node, rdfsNS['type'], StatusN) @@ -277,7 +277,7 @@ def update_submission_detail(model, subUrn, status, recent_update, cookie): update_ddf(model, subUrn, status_node, cookie=cookie) update_daf(model, subUrn, status_node, cookie=cookie) else: - logging.info("Found {0} status blanks".format(len(status_nodes))) + LOGGER.info("Found {0} status blanks".format(len(status_nodes))) for status_statement in status_nodes: status_node = status_statement.object last_modified_query = RDF.Statement(status_node, @@ -298,7 +298,7 @@ def update_daf(model, submission_url, status_node, cookie): status_is_daf = RDF.Statement(status_node, TYPE_N, dafTermOntology['']) if not model.contains_statement(status_is_daf): - logging.info('Adding daf to {0}, {1}'.format(submission_url, + LOGGER.info('Adding daf to {0}, {1}'.format(submission_url, status_node)) daf_text = get_url_as_text(download_daf_uri, 'GET', cookie) daf.fromstring_into_model(model, status_node, daf_text) @@ -310,7 +310,7 @@ def update_ddf(model, subUrn, statusNode, cookie): status_is_ddf = RDF.Statement(statusNode, TYPE_N, DDF_NS['']) if not model.contains_statement(status_is_ddf): - logging.info('Adding ddf to {0}, {1}'.format(subUrn, statusNode)) + LOGGER.info('Adding ddf to {0}, {1}'.format(subUrn, statusNode)) ddf_text = get_url_as_text(download_ddf_url, 'GET', cookie) add_ddf_statements(model, statusNode, ddf_text) model.add_statement(status_is_ddf) @@ -382,7 +382,7 @@ def load_library_detail(model, libraryUrn): elif len(results) == 1: pass # Assuming that a loaded dataset has one record else: - logging.warning("Many dates for {0}".format(libraryUrn)) + LOGGER.warning("Many dates for {0}".format(libraryUrn)) def get_library_id(name): @@ -450,7 +450,7 @@ def login(cookie=None): 'POST', headers=headers, body=urllib.urlencode(credentials)) - logging.debug("Login to {0}, status {1}".format(LOGIN_URL, + LOGGER.debug("Login to {0}, status {1}".format(LOGIN_URL, response['status'])) cookie = response.get('set-cookie', None) diff --git a/extra/ucsc_encode_submission/ucsc_gather.py b/extra/ucsc_encode_submission/ucsc_gather.py index 1e6b6a6..fd8db12 100755 --- a/extra/ucsc_encode_submission/ucsc_gather.py +++ b/extra/ucsc_encode_submission/ucsc_gather.py @@ -146,7 +146,7 @@ def make_tree_from(source_path, library_result_map): """ for lib_id, lib_path in library_result_map: if not os.path.exists(lib_path): - logging.info("Making dir {0}".format(lib_path)) + logger.info("Making dir {0}".format(lib_path)) os.mkdir(lib_path) source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path)) if os.path.exists(source_lib_dir): @@ -158,7 +158,7 @@ def make_tree_from(source_path, library_result_map): raise IOError("{0} does not exist".format(source_pathname)) if not os.path.exists(target_pathname): os.symlink(source_pathname, target_pathname) - logging.info( + logger.info( 'LINK {0} to {1}'.format(source_pathname, target_pathname)) @@ -182,11 +182,11 @@ def scan_submission_dirs(view_map, library_result_map): """Look through our submission directories and collect needed information """ for lib_id, result_dir in library_result_map: - logging.info("Importing %s from %s" % (lib_id, result_dir)) + logger.info("Importing %s from %s" % (lib_id, result_dir)) try: view_map.import_submission_dir(result_dir, lib_id) except MetadataLookupException, e: - logging.error("Skipping %s: %s" % (lib_id, str(e))) + logger.error("Skipping %s: %s" % (lib_id, str(e))) def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False): dag_fragment = [] @@ -199,7 +199,7 @@ def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, forc if make_condor and len(dag_fragment) > 0: dag_filename = 'submission.dagman' if not force and os.path.exists(dag_filename): - logging.warn("%s exists, please delete" % (dag_filename,)) + logger.warn("%s exists, please delete" % (dag_filename,)) else: f = open(dag_filename,'w') f.write( os.linesep.join(dag_fragment)) @@ -245,7 +245,7 @@ ORDER BY ?submitView""" name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name'])) if name is None: - logging.error("Need name for %s" % (str(submissionNode))) + logger.error("Need name for %s" % (str(submissionNode))) return [] ddf_name = name + '.ddf' @@ -275,7 +275,7 @@ ORDER BY ?submitView""" for variable_name in variables: value = str(fromTypedNode(row[variable_name])) if value is None or value == 'None': - logging.warn("{0}: {1} was None".format(outfile, variable_name)) + logger.warn("{0}: {1} was None".format(outfile, variable_name)) if variable_name in ('files', 'md5sum'): current.setdefault(variable_name,[]).append(value) else: @@ -292,7 +292,7 @@ ORDER BY ?submitView""" output.write(os.linesep) all_files.extend(all_views[view]['files']) - logging.info( + logger.info( "Examined {0}, found files: {1}".format( str(submissionNode), ", ".join(all_files))) diff --git a/htsworkflow/automation/copier.py b/htsworkflow/automation/copier.py index 0b1256e..1dc719d 100644 --- a/htsworkflow/automation/copier.py +++ b/htsworkflow/automation/copier.py @@ -14,7 +14,9 @@ import urlparse from benderjab import rpc from htsworkflow.automation.solexa import is_runfolder - + +LOGGER = logging.getLogger(__name__) + class rsync(object): def __init__(self, sources, dest, pwfile): self.cmd = ['/usr/bin/rsync', ] @@ -29,21 +31,21 @@ class rsync(object): """ Get a directory listing for all our sources """ - logging.debug("searching for entries in: %s" % (self.source_base_list,)) + LOGGER.debug("searching for entries in: %s" % (self.source_base_list,)) entries = [] for source in self.source_base_list: - logging.debug("Scanning %s" % (source,)) + LOGGER.debug("Scanning %s" % (source,)) args = copy.copy(self.cmd) args.append(source) - logging.debug("Rsync cmd:" + " ".join(args)) + LOGGER.debug("Rsync cmd:" + " ".join(args)) short_process = subprocess.Popen(args, stdout=subprocess.PIPE) exit_code = short_process.wait() stdout = short_process.stdout # We made sure source ends in a / earlier cur_list = [ source+subdir for subdir in self.list_filter(stdout)] entries.extend(cur_list) - logging.debug(u"Found the following: %s" % (unicode(entries))) + LOGGER.debug(u"Found the following: %s" % (unicode(entries))) return entries def list_filter(self, lines): @@ -52,10 +54,10 @@ class rsync(object): """ dirs_to_copy = [] direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ] - logging.debug(u'direntries: %s' % (unicode(direntries),)) + LOGGER.debug(u'direntries: %s' % (unicode(direntries),)) for permissions, size, filedate, filetime, filename in direntries: if permissions[0] == 'd': - # hey its a directory, the first step to being something we want to + # hey its a directory, the first step to being something we want to # copy if re.match("[0-9]{6}", filename): # it starts with something that looks like a 6 digit date @@ -67,14 +69,14 @@ class rsync(object): args = copy.copy(self.cmd) # args.append('--dry-run') # Makes testing easier # we want to copy everything - args.append('-rlt') + args.append('-rlt') # from here args.append(urlname) # to here args.append(self.dest_base) - logging.debug("Rsync cmd:" + " ".join(args)) + LOGGER.debug("Rsync cmd:" + " ".join(args)) return subprocess.Popen(args) - + def copy(self, url_list=None): """ copy any interesting looking directories over @@ -83,24 +85,24 @@ class rsync(object): # clean up any lingering non-running processes self.poll() - if url_list is None or len(url_list) == 0: + if url_list is None or len(url_list) == 0: # what's available to copy? dirs_to_copy = self.list() else: dirs_to_copy = url_list - - logging.info("dirs to copy %s" % (dirs_to_copy,)) + + LOGGER.info("dirs to copy %s" % (dirs_to_copy,)) # lets start copying started = [] for d in dirs_to_copy: process = self.processes.get(d, None) - + if process is None: # we don't have a process, so make one - logging.info("rsyncing %s" % (d)) + LOGGER.info("rsyncing %s" % (d)) self.processes[d] = self.create_copy_process(d) - started.append(d) + started.append(d) return started def _normalize_rsync_source(self, source): @@ -117,7 +119,7 @@ class rsync(object): def poll(self): """ check currently running processes to see if they're done - + return path roots that have finished. """ for dir_key, proc_value in self.processes.items(): @@ -126,19 +128,19 @@ class rsync(object): # process hasn't finished yet pass elif retcode == 0: - logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode)) + LOGGER.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode)) del self.processes[dir_key] else: - logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode)) - + LOGGER.error("rsync failed for %s, exit code %d" % (dir_key, retcode)) + def __len__(self): """ Return how many active rsync processes we currently have - + Call poll first to close finished processes. """ return len(self.processes) - + def keys(self): """ Return list of current run folder names @@ -149,34 +151,34 @@ class CopierBot(rpc.XmlRpcBot): def __init__(self, section=None, configfile=None): #if configfile is None: # configfile = '~/.htsworkflow' - + super(CopierBot, self).__init__(section, configfile) - + # options for rsync command self.cfg['rsync_password_file'] = None self.cfg['rsync_sources'] = None - self.cfg['rsync_destination'] = None - - # options for reporting we're done + self.cfg['rsync_destination'] = None + + # options for reporting we're done self.cfg['notify_users'] = None self.cfg['notify_runner'] = None - + self.pending = [] self.rsync = None self.notify_users = None self.notify_runner = None - + self.register_function(self.startCopy) self.register_function(self.sequencingFinished) self.eventTasks.append(self.update) - + def _init_rsync(self): """ Initalize rsync class This is only accessible for test purposes. """ - # we can't call any logging function until after start finishes. + # we can't call any LOGGER function until after start finishes. # this got moved to a seperate function from run to help with test code if self.rsync is None: self.rsync = rsync(self.sources, self.destination, self.password) @@ -186,11 +188,11 @@ class CopierBot(rpc.XmlRpcBot): read the config file """ super(CopierBot, self).read_config(section, configfile) - + self.sources = shlex.split(self._check_required_option('rsync_sources')) self.password = self._check_required_option('rsync_password_file') self.destination = self._check_required_option('rsync_destination') - + self.notify_users = self._parse_user_list(self.cfg['notify_users']) try: self.notify_runner = \ @@ -213,37 +215,37 @@ class CopierBot(rpc.XmlRpcBot): start our copy """ # Note, args comes in over the network, so don't trust it. - logging.debug("Arguments to startCopy %s" % (unicode(args),)) + LOGGER.debug("Arguments to startCopy %s" % (unicode(args),)) copy_urls = [] for a in args: clean_url = self.validate_url(a) if clean_url is not None: copy_urls.append(clean_url) - logging.info("Validated urls = %s" % (copy_urls,)) + LOGGER.info("Validated urls = %s" % (copy_urls,)) started = self.rsync.copy(copy_urls) - logging.info("copying:" + " ".join(started)+".") + LOGGER.info("copying:" + " ".join(started)+".") return started - + def sequencingFinished(self, runDir, *args): """ - The run was finished, if we're done copying, pass the message on + The run was finished, if we're done copying, pass the message on """ # close any open processes self.rsync.poll() - + # see if we're still copying if is_runfolder(runDir): - logging.info("recevied sequencing finshed for %s" % (runDir)) + LOGGER.info("recevied sequencing finshed for %s" % (runDir)) self.pending.append(runDir) self.startCopy() return "PENDING" else: errmsg = "received bad runfolder name (%s)" % (runDir) - logging.warning(errmsg) + LOGGER.warning(errmsg) # maybe I should use a different error message raise RuntimeError(errmsg) - + def reportSequencingFinished(self, runDir): """ Send the sequencingFinished message to the interested parties @@ -254,8 +256,8 @@ class CopierBot(rpc.XmlRpcBot): if self.notify_runner is not None: for r in self.notify_runner: self.rpc_send(r, (runDir,), 'sequencingFinished') - logging.info("forwarding sequencingFinshed message for %s" % (runDir)) - + LOGGER.info("forwarding sequencingFinshed message for %s" % (runDir)) + def update(self, *args): """ Update our current status. @@ -266,7 +268,7 @@ class CopierBot(rpc.XmlRpcBot): if p not in self.rsync.keys(): self.reportSequencingFinished(p) self.pending.remove(p) - + def _parser(self, msg, who): """ Parse xmpp chat messages @@ -274,7 +276,7 @@ class CopierBot(rpc.XmlRpcBot): help = u"I can [copy], or report current [status]" if re.match(u"help", msg): reply = help - elif re.match("copy", msg): + elif re.match("copy", msg): started = self.startCopy() reply = u"started copying " + ", ".join(started) elif re.match(u"status", msg): @@ -306,7 +308,7 @@ class CopierBot(rpc.XmlRpcBot): def main(args=None): bot = CopierBot() bot.main(args) - + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/htsworkflow/automation/runner.py b/htsworkflow/automation/runner.py index 45d4ffc..7b0c4c9 100644 --- a/htsworkflow/automation/runner.py +++ b/htsworkflow/automation/runner.py @@ -11,6 +11,8 @@ from benderjab import rpc from htsworkflow.pipelines.configure_run import * +LOGGER = logging.getLogger(__name__) + #s_fc = re.compile('FC[0-9]+') s_fc = re.compile('_[0-9a-zA-Z]*$') @@ -23,22 +25,22 @@ def _get_flowcell_from_rundir(run_dir): junk, dirname = os.path.split(run_dir) mo = s_fc.search(dirname) if not mo: - logging.error('RunDir 2 FlowCell error: %s' % (run_dir)) + LOGGER.error('RunDir 2 FlowCell error: %s' % (run_dir)) return None return dirname[mo.start()+1:] - + class Runner(rpc.XmlRpcBot): """ Manage running pipeline jobs. - """ + """ def __init__(self, section=None, configfile=None): #if configfile is None: # self.configfile = "~/.htsworkflow" super(Runner, self).__init__(section, configfile) - + self.cfg['notify_users'] = None self.cfg['genome_dir'] = None self.cfg['base_analysis_dir'] = None @@ -47,11 +49,11 @@ class Runner(rpc.XmlRpcBot): self.cfg['notify_postanalysis'] = None self.conf_info_dict = {} - + self.register_function(self.sequencingFinished) #self.eventTasks.append(self.update) - + def read_config(self, section=None, configfile=None): super(Runner, self).read_config(section, configfile) @@ -60,8 +62,8 @@ class Runner(rpc.XmlRpcBot): self.notify_users = self._parse_user_list(self.cfg['notify_users']) #FIXME: process notify_postpipeline cfg - - + + def _parser(self, msg, who): """ Parse xmpp chat messages @@ -88,13 +90,13 @@ class Runner(rpc.XmlRpcBot): else: reply = u"I didn't understand '%s'" %(msg) - logging.debug("reply: " + str(reply)) + LOGGER.debug("reply: " + str(reply)) return reply def getStatusReport(self, fc_num): """ - Returns text status report for flow cell number + Returns text status report for flow cell number """ if fc_num not in self.conf_info_dict: return "No record of a %s run." % (fc_num) @@ -108,18 +110,18 @@ class Runner(rpc.XmlRpcBot): output = status.statusReport() return '\n'.join(output) - - + + def sequencingFinished(self, run_dir): """ Sequenceing (and copying) is finished, time to start pipeline """ - logging.debug("received sequencing finished message") + LOGGER.debug("received sequencing finished message") # Setup config info object ci = ConfigInfo() ci.base_analysis_dir = self.base_analysis_dir - ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir) + ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir) # get flowcell from run_dir name flowcell = _get_flowcell_from_rundir(run_dir) @@ -131,22 +133,22 @@ class Runner(rpc.XmlRpcBot): # Launch the job in it's own thread and turn. self.launchJob(run_dir, flowcell, ci) return "started" - - + + def pipelineFinished(self, run_dir): # need to strip off self.watch_dir from rundir I suspect. - logging.info("pipeline finished in" + str(run_dir)) + LOGGER.info("pipeline finished in" + str(run_dir)) #pattern = self.watch_dir #if pattern[-1] != os.path.sep: # pattern += os.path.sep #stripped_run_dir = re.sub(pattern, "", run_dir) - #logging.debug("stripped to " + stripped_run_dir) + #LOGGER.debug("stripped to " + stripped_run_dir) # Notify each user that the run has finished. if self.notify_users is not None: for u in self.notify_users: self.send(u, 'Pipeline run %s finished' % (run_dir)) - + #if self.notify_runner is not None: # for r in self.notify_runner: # self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished') @@ -168,39 +170,39 @@ class Runner(rpc.XmlRpcBot): cfg_filepath, self.genome_dir) if status_retrieve_cfg: - logging.info("Runner: Retrieve config: success") + LOGGER.info("Runner: Retrieve config: success") self.reportMsg("Retrieve config (%s): success" % (run_dir)) else: - logging.error("Runner: Retrieve config: failed") + LOGGER.error("Runner: Retrieve config: failed") self.reportMsg("Retrieve config (%s): FAILED" % (run_dir)) - + # configure step if status_retrieve_cfg: status = configure(conf_info) if status: - logging.info("Runner: Configure: success") + LOGGER.info("Runner: Configure: success") self.reportMsg("Configure (%s): success" % (run_dir)) self.reportMsg( os.linesep.join(glob(os.path.join(run_dir,'Data','C*'))) ) else: - logging.error("Runner: Configure: failed") + LOGGER.error("Runner: Configure: failed") self.reportMsg("Configure (%s): FAILED" % (run_dir)) #if successful, continue if status: # Setup status cmdline status monitor #startCmdLineStatusMonitor(ci) - + # running step print 'Running pipeline now!' run_status = run_pipeline(conf_info) if run_status is True: - logging.info('Runner: Pipeline: success') + LOGGER.info('Runner: Pipeline: success') self.reportMsg("Pipeline run (%s): Finished" % (run_dir,)) else: - logging.info('Runner: Pipeline: failed') + LOGGER.info('Runner: Pipeline: failed') self.reportMsg("Pipeline run (%s): FAILED" % (run_dir)) @@ -212,13 +214,13 @@ class Runner(rpc.XmlRpcBot): args=[run_dir, flowcell, conf_info]) t.setDaemon(True) t.start() - - + + def main(args=None): bot = Runner() return bot.main(args) - + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) - + diff --git a/htsworkflow/automation/spoolwatcher.py b/htsworkflow/automation/spoolwatcher.py index af5932e..e226234 100644 --- a/htsworkflow/automation/spoolwatcher.py +++ b/htsworkflow/automation/spoolwatcher.py @@ -17,6 +17,8 @@ IN_UNMOUNT = EventsCodes.ALL_FLAGS['IN_UNMOUNT'] from benderjab import rpc +LOGGER = logging.getLogger(__name__) + class WatcherEvent(object): """ Track information about a file event @@ -27,7 +29,7 @@ class WatcherEvent(object): self.time = time.time() self.event_root = event_root self.complete = False - + def __unicode__(self): if self.complete: complete = "(completed)" @@ -64,7 +66,7 @@ class Handler(pyinotify.ProcessEvent): if not is_runfolder(target): self.log.debug("Skipping %s, not a runfolder" % (target,)) continue - + # grab the previous events for this watch path watch_path_events = self.last_event.setdefault(watch_path, {}) @@ -102,14 +104,14 @@ class Handler(pyinotify.ProcessEvent): class SpoolWatcher(rpc.XmlRpcBot): """ Watch a directory and send a message when another process is done writing. - + This monitors a directory tree using inotify (linux specific) and after some files having been written will send a message after seconds of no file writing. - + (Basically when the solexa machine finishes dumping a round of data this'll hopefully send out a message saying hey look theres data available - + """ # these params need to be in the config file # I wonder where I should put the documentation @@ -121,23 +123,23 @@ class SpoolWatcher(rpc.XmlRpcBot): # `notify_timeout` - how often to timeout from notify # `completion_files` - what files indicates we've finished sequencing # defaults to: netcopy_complete.txt - + def __init__(self, section=None, configfile=None): #if configfile is None: # self.configfile = "~/.htsworkflow" super(SpoolWatcher, self).__init__(section, configfile) - + self.cfg['watchdirs'] = None self.cfg['write_timeout'] = 10 self.cfg['notify_users'] = None self.cfg['notify_runner'] = None self.cfg['completion_files'] = 'ImageAnalysis_Netcopy_complete_READ2.txt ImageAnalysis_Netcopy_complete_SINGLEREAD.txt' - + self.watchdirs = [] self.watchdir_url_map = {} self.notify_timeout = 0.001 - self.wm = None + self.wm = None self.notify_users = None self.notify_runner = None self.wdds = [] @@ -147,14 +149,14 @@ class SpoolWatcher(rpc.XmlRpcBot): # keep track of which mount points tie to which watch directories # so maybe we can remount them. self.mounts_to_watches = {} - + self.eventTasks.append(self.process_notify) def read_config(self, section=None, configfile=None): - # Don't give in to the temptation to use logging functions here, + # Don't give in to the temptation to use logging functions here, # need to wait until after we detach in start super(SpoolWatcher, self).read_config(section, configfile) - + self.watchdirs = shlex.split(self._check_required_option('watchdirs')) # see if there's an alternate url that should be used for the watchdir for watchdir in self.watchdirs: @@ -162,7 +164,7 @@ class SpoolWatcher(rpc.XmlRpcBot): self.write_timeout = int(self.cfg['write_timeout']) self.completion_files = shlex.split(self.cfg['completion_files']) - + self.notify_users = self._parse_user_list(self.cfg['notify_users']) try: self.notify_runner = \ @@ -207,7 +209,7 @@ class SpoolWatcher(rpc.XmlRpcBot): self.wdds.append(self.wm.add_watch(w, mask, rec=True, auto_add=True)) def unmount_watch(self, event_path): - # remove backwards so we don't get weirdness from + # remove backwards so we don't get weirdness from # the list getting shorter for i in range(len(self.wdds),0, -1): wdd = self.wdds[i] @@ -223,7 +225,7 @@ class SpoolWatcher(rpc.XmlRpcBot): copy_url = root_copy_url + list_event_dir self.log.debug('Copy url: %s' % (copy_url,)) return copy_url - + def process_notify(self, *args): if self.notifier is None: # nothing to do yet @@ -240,7 +242,7 @@ class SpoolWatcher(rpc.XmlRpcBot): for last_event_dir, last_event_detail in last_events.items(): time_delta = time.time() - last_event_detail.time if time_delta > self.write_timeout: - print "timeout", unicode(last_event_detail) + LOGGER.info("timeout: %s" % (unicode(last_event_detail),)) copy_url = self.make_copy_url(watchdir, last_event_dir) self.startCopy(copy_url) if last_event_detail.complete: @@ -267,7 +269,7 @@ class SpoolWatcher(rpc.XmlRpcBot): help = u"I can send [copy] message, or squencer [finished]" if re.match(u"help", msg): reply = help - elif re.match("copy", msg): + elif re.match("copy", msg): self.startCopy(msg) reply = u"sent copy message" elif re.match(u"finished", msg): @@ -278,9 +280,9 @@ class SpoolWatcher(rpc.XmlRpcBot): else: reply = u"need runfolder name" else: - reply = u"I didn't understand '%s'" %(msg) + reply = u"I didn't understand '%s'" %(msg) return reply - + def run(self): """ Start application @@ -290,7 +292,7 @@ class SpoolWatcher(rpc.XmlRpcBot): # after it's initialized. self.add_watch() super(SpoolWatcher, self).run() - + def stop(self): """ shutdown application @@ -299,7 +301,7 @@ class SpoolWatcher(rpc.XmlRpcBot): if self.notifier is not None: self.notifier.stop() super(SpoolWatcher, self).stop() - + def startCopy(self, copy_url=None): self.log.debug("writes seem to have stopped") if self.notify_runner is not None: @@ -308,13 +310,13 @@ class SpoolWatcher(rpc.XmlRpcBot): if self.notify_users is not None: for u in self.notify_users: self.send(u, 'startCopy %s.' % (copy_url,)) - + def sequencingFinished(self, run_dir): # need to strip off self.watchdirs from rundir I suspect. self.log.info("run.completed in " + str(run_dir)) for watch in self.watchdirs: if not run_dir.startswith(watch): - print "%s didn't start with %s" % (run_dir, watch) + LOGGER.info("%s didn't start with %s" % (run_dir, watch)) continue if watch[-1] != os.path.sep: watch += os.path.sep @@ -334,7 +336,7 @@ class SpoolWatcher(rpc.XmlRpcBot): def main(args=None): bot = SpoolWatcher() return bot.main(args) - + if __name__ == "__main__": ret = main(sys.argv[1:]) #sys.exit(ret) diff --git a/htsworkflow/frontend/inventory/models.py b/htsworkflow/frontend/inventory/models.py index dfeb7db..46b37ec 100644 --- a/htsworkflow/frontend/inventory/models.py +++ b/htsworkflow/frontend/inventory/models.py @@ -7,13 +7,14 @@ from htsworkflow.frontend.samples.models import Library from htsworkflow.frontend.experiments.models import FlowCell from htsworkflow.frontend.bcmagic.models import Printer +LOGGER = logging.getLogger(__name__) try: import uuid except ImportError, e: # Some systems are using python 2.4, which doesn't have uuid # this is a stub - logging.warning('Real uuid is not available, initializing fake uuid module') + LOGGER.warning('Real uuid is not available, initializing fake uuid module') class uuid: def uuid1(self): self.hex = None @@ -33,11 +34,11 @@ def _switch_default(sender, instance, **kwargs): """ if instance.default: other_defaults = PrinterTemplate.objects.filter(default=True) - + for other in other_defaults: other.default = False other.save() - + class Vendor(models.Model): name = models.CharField(max_length=256) @@ -48,14 +49,14 @@ class Vendor(models.Model): class Location(models.Model): - + name = models.CharField(max_length=256, unique=True) location_description = models.TextField() - + uuid = models.CharField(max_length=32, blank=True, help_text="Leave blank for automatic UUID generation", editable=False) - + notes = models.TextField(blank=True, null=True) - + def __unicode__(self): if len(self.location_description) > 16: return u"%s: %s" % (self.name, self.location_description[0:16]+u"...") @@ -69,17 +70,17 @@ class ItemInfo(models.Model): model_id = models.CharField(max_length=256, blank=True, null=True) part_number = models.CharField(max_length=256, blank=True, null=True) lot_number = models.CharField(max_length=256, blank=True, null=True) - + url = models.URLField(blank=True, null=True) - + qty_purchased = models.IntegerField(default=1) - + vendor = models.ForeignKey(Vendor) purchase_date = models.DateField(blank=True, null=True) warranty_months = models.IntegerField(blank=True, null=True) - + notes = models.TextField(blank=True, null=True) - + def __unicode__(self): name = u'' if self.model_id: @@ -88,64 +89,64 @@ class ItemInfo(models.Model): name += u"part:%s " % (self.part_number) if self.lot_number: name += u"lot:%s " % (self.lot_number) - + return u"%s: %s" % (name, self.purchase_date) - + class Meta: verbose_name_plural = "Item Info" class ItemType(models.Model): - + name = models.CharField(max_length=64, unique=True) description = models.TextField(blank=True, null=True) - + def __unicode__(self): return u"%s" % (self.name) class ItemStatus(models.Model): name = models.CharField(max_length=64, unique=True) notes = models.TextField(blank=True, null=True) - + def __unicode__(self): return self.name - + class Meta: verbose_name_plural = "Item Status" class Item(models.Model): - + item_type = models.ForeignKey(ItemType) - + #Automatically assigned uuid; used for barcode if one is not provided in # barcode_id uuid = models.CharField(max_length=32, blank=True, help_text="Leave blank for automatic UUID generation", unique=True, editable=False) - + # field for existing barcodes; used instead of uuid if provided barcode_id = models.CharField(max_length=256, blank=True, null=True) force_use_uuid = models.BooleanField(default=False) - + item_info = models.ForeignKey(ItemInfo) - + location = models.ForeignKey(Location) - + status = models.ForeignKey(ItemStatus, blank=True, null=True) - + creation_date = models.DateTimeField(auto_now_add=True) modified_date = models.DateTimeField(auto_now=True) - + notes = models.TextField(blank=True, null=True) - + def __unicode__(self): if self.barcode_id is None or len(self.barcode_id) == 0: return u"invu|%s" % (self.uuid) else: return u"invb|%s" % (self.barcode_id) - + def get_absolute_url(self): return '/inventory/%s/' % (self.uuid) - + pre_save.connect(_assign_uuid, sender=Item) @@ -155,11 +156,11 @@ class PrinterTemplate(models.Model): """ item_type = models.ForeignKey(ItemType) printer = models.ForeignKey(Printer) - + default = models.BooleanField() - + template = models.TextField() - + def __unicode__(self): if self.default: return u'%s %s' % (self.item_type.name, self.printer.name) @@ -170,30 +171,30 @@ pre_save.connect(_switch_default, sender=PrinterTemplate) class LongTermStorage(models.Model): - + flowcell = models.ForeignKey(FlowCell) libraries = models.ManyToManyField(Library) storage_devices = models.ManyToManyField(Item) - + creation_date = models.DateTimeField(auto_now_add=True) modified_date = models.DateTimeField(auto_now=True) - + def __unicode__(self): return u"%s: %s" % (str(self.flowcell), ', '.join([ str(s) for s in self.storage_devices.iterator() ])) - + class Meta: verbose_name_plural = "Long Term Storage" - + class ReagentBase(models.Model): - + reagent = models.ManyToManyField(Item) - + creation_date = models.DateTimeField(auto_now_add=True) modified_date = models.DateTimeField(auto_now=True) - + class Meta: abstract = True @@ -203,16 +204,16 @@ class ReagentFlowcell(ReagentBase): Links reagents and flowcells """ flowcell = models.ForeignKey(FlowCell) - + def __unicode__(self): return u"%s: %s" % (str(self.flowcell), ', '.join([ str(s) for s in self.reagent.iterator() ])) - + class ReagentLibrary(ReagentBase): """ Links libraries and flowcells """ library = models.ForeignKey(Library) - + def __unicode__(self): return u"%s: %s" % (str(self.library), ', '.join([ str(s) for s in self.reagent.iterator() ])) diff --git a/htsworkflow/frontend/samples/views.py b/htsworkflow/frontend/samples/views.py index 4703cd3..7152237 100644 --- a/htsworkflow/frontend/samples/views.py +++ b/htsworkflow/frontend/samples/views.py @@ -38,6 +38,8 @@ SAMPLES_CONTEXT_DEFAULTS = { 'bcmagic': BarcodeMagicForm() } +LOGGER = logging.getLogger(__name__) + def count_lanes(lane_set): single = 0 paired = 1 @@ -316,7 +318,7 @@ def _summary_stats(flowcell_id, lane_id): #except Exception, e: # summary_list.append("Summary report needs to be updated.") - # logging.error("Exception: " + str(e)) + # LOGGER.error("Exception: " + str(e)) return (summary_list, err_list) diff --git a/htsworkflow/pipelines/bustard.py b/htsworkflow/pipelines/bustard.py index 49c1f16..5ebde39 100644 --- a/htsworkflow/pipelines/bustard.py +++ b/htsworkflow/pipelines/bustard.py @@ -1,8 +1,8 @@ """ Extract configuration from Illumina Bustard Directory. -This includes the version number, run date, bustard executable parameters, and -phasing estimates. +This includes the version number, run date, bustard executable parameters, and +phasing estimates. """ from copy import copy from datetime import date @@ -18,6 +18,8 @@ from htsworkflow.pipelines.runfolder import \ VERSION_RE, \ EUROPEAN_STRPTIME +LOGGER = logging.getLogger(__name__) + # make epydoc happy __docformat__ = "restructuredtext en" @@ -95,7 +97,7 @@ class CrosstalkMatrix(object): self.base['A'] = [ float(v) for v in data[:4] ] self.base['C'] = [ float(v) for v in data[4:8] ] self.base['G'] = [ float(v) for v in data[8:12] ] - self.base['T'] = [ float(v) for v in data[12:16] ] + self.base['T'] = [ float(v) for v in data[12:16] ] else: raise RuntimeError("matrix file %s is unusual" % (pathname,)) def get_elements(self): @@ -171,7 +173,7 @@ def crosstalk_matrix_from_bustard_config(bustard_path, bustard_config_tree): auto_lane_fragment = "" else: auto_lane_fragment = "_%d" % ( matrix_auto_lane,) - + for matrix_name in ['s%s_02_matrix.txt' % (auto_lane_fragment,), 's%s_1_matrix.txt' % (auto_lane_fragment,), ]: @@ -228,7 +230,7 @@ class Bustard(object): ElementTree.dump(self.get_elements()) def get_elements(self): - root = ElementTree.Element('Bustard', + root = ElementTree.Element('Bustard', {'version': str(Bustard.XML_VERSION)}) version = ElementTree.SubElement(root, Bustard.SOFTWARE_VERSION) version.text = self.version @@ -248,7 +250,7 @@ class Bustard(object): # add crosstalk matrix if it exists if self.crosstalk is not None: root.append(self.crosstalk.get_elements()) - + # add bustard config if it exists if self.bustard_config is not None: root.append(self.bustard_config) @@ -259,7 +261,7 @@ class Bustard(object): raise ValueError('Expected "Bustard" SubElements') xml_version = int(tree.attrib.get('version', 0)) if xml_version > Bustard.XML_VERSION: - logging.warn('Bustard XML tree is a higher version than this class') + LOGGER.warn('Bustard XML tree is a higher version than this class') for element in list(tree): if element.tag == Bustard.SOFTWARE_VERSION: self.version = element.text @@ -277,7 +279,7 @@ class Bustard(object): self.bustard_config = element else: raise ValueError("Unrecognized tag: %s" % (element.tag,)) - + def bustard(pathname): """ Construct a Bustard object by analyzing an Illumina Bustard directory. @@ -349,7 +351,7 @@ def main(cmdline): for bustard_dir in args: print u'analyzing bustard directory: ' + unicode(bustard_dir) bustard_object = bustard(bustard_dir) - bustard_object.dump() + bustard_object.dump() bustard_object2 = Bustard(xml=bustard_object.get_elements()) print ('-------------------------------------') diff --git a/htsworkflow/pipelines/configure_run.py b/htsworkflow/pipelines/configure_run.py index 4d5cf86..83c7569 100644 --- a/htsworkflow/pipelines/configure_run.py +++ b/htsworkflow/pipelines/configure_run.py @@ -16,8 +16,10 @@ from htsworkflow.pipelines.run_status import GARunStatus from pyinotify import WatchManager, ThreadedNotifier from pyinotify import EventsCodes, ProcessEvent +LOGGER = logging.getLogger(__name__) + class ConfigInfo: - + def __init__(self): #run_path = firecrest analysis directory to run analysis from self.run_path = None @@ -70,12 +72,12 @@ class RunEvent(ProcessEvent): self._ci = conf_info ProcessEvent.__init__(self) - + def process_IN_CREATE(self, event): fullpath = os.path.join(event.path, event.name) if s_finished.search(fullpath): - logging.info("File Found: %s" % (fullpath)) + LOGGER.info("File Found: %s" % (fullpath)) if s_firecrest_finished.search(fullpath): self.run_status_dict['firecrest'] = True @@ -99,7 +101,7 @@ class RunEvent(ProcessEvent): self._ci.status.updateBustard(event.name) elif s_firecrest_all.search(fullpath): self._ci.status.updateFirecrest(event.name) - + #print "Create: %s" % (os.path.join(event.path, event.name)) def process_IN_DELETE(self, event): @@ -209,19 +211,19 @@ def config_stdout_handler(line, conf_info): # Detect invalid command-line arguments elif s_invalid_cmdline.search(line): - logging.error("Invalid commandline options!") + LOGGER.error("Invalid commandline options!") # Detect starting of configuration elif s_start.search(line): - logging.info('START: Configuring pipeline') + LOGGER.info('START: Configuring pipeline') # Detect it made it past invalid arguments elif s_gerald.search(line): - logging.info('Running make now') + LOGGER.info('Running make now') # Detect that make files have been generated (based on output) elif s_generating.search(line): - logging.info('Make files generted') + LOGGER.info('Make files generted') return True # Capture run directory @@ -238,7 +240,7 @@ def config_stdout_handler(line, conf_info): conf_info.bustard_path = firecrest_bustard conf_info.run_path = firecrest - + #Standard output handling else: print 'Sequence line:', line @@ -248,7 +250,7 @@ def config_stdout_handler(line, conf_info): # Log all other output for debugging purposes else: - logging.warning('CONF:?: %s' % (line)) + LOGGER.warning('CONF:?: %s' % (line)) return False @@ -271,29 +273,29 @@ def config_stderr_handler(line, conf_info): # Detect invalid species directory error if s_species_dir_err.search(line): - logging.error(line) + LOGGER.error(line) return RUN_ABORT # Detect goat_pipeline.py traceback elif s_goat_traceb.search(line): - logging.error("Goat config script died, traceback in debug output") + LOGGER.error("Goat config script died, traceback in debug output") return RUN_ABORT # Detect indication of successful configuration (from stderr; odd, but ok) elif s_stderr_taskcomplete.search(line): - logging.info('Configure step successful (from: stderr)') + LOGGER.info('Configure step successful (from: stderr)') return True # Detect missing cycles elif s_missing_cycles.search(line): # Only display error once if not SUPPRESS_MISSING_CYCLES: - logging.error("Missing cycles detected; Not all cycles copied?") - logging.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line)) + LOGGER.error("Missing cycles detected; Not all cycles copied?") + LOGGER.debug("CONF:STDERR:MISSING_CYCLES: %s" % (line)) SUPPRESS_MISSING_CYCLES = True return RUN_ABORT - + # Log all other output as debug output else: - logging.debug('CONF:STDERR:?: %s' % (line)) + LOGGER.debug('CONF:STDERR:?: %s' % (line)) # Neutral (not failure; nor success) return False @@ -334,19 +336,19 @@ def pipeline_stderr_handler(line, conf_info): if pl_stderr_ignore(line): pass elif s_make_error.search(line): - logging.error("make error detected; run failed") + LOGGER.error("make error detected; run failed") return RUN_FAILED elif s_no_gnuplot.search(line): - logging.error("gnuplot not found") + LOGGER.error("gnuplot not found") return RUN_FAILED elif s_no_convert.search(line): - logging.error("imagemagick's convert command not found") + LOGGER.error("imagemagick's convert command not found") return RUN_FAILED elif s_no_ghostscript.search(line): - logging.error("ghostscript not found") + LOGGER.error("ghostscript not found") return RUN_FAILED else: - logging.debug('PIPE:STDERR:?: %s' % (line)) + LOGGER.debug('PIPE:STDERR:?: %s' % (line)) return False @@ -368,7 +370,7 @@ def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir): options = getCombinedOptions() if options.url is None: - logging.error("%s or %s missing base_host_url option" % \ + LOGGER.error("%s or %s missing base_host_url option" % \ (CONFIG_USER, CONFIG_SYSTEM)) return False @@ -376,16 +378,16 @@ def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir): saveConfigFile(flowcell, options.url, cfg_filepath) conf_info.config_filepath = cfg_filepath except FlowCellNotFound, e: - logging.error(e) + LOGGER.error(e) return False except WebError404, e: - logging.error(e) + LOGGER.error(e) return False except IOError, e: - logging.error(e) + LOGGER.error(e) return False except Exception, e: - logging.error(e) + LOGGER.error(e) return False f = open(cfg_filepath, 'r') @@ -395,14 +397,14 @@ def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir): genome_dict = getAvailableGenomes(genome_dir) mapper_dict = constructMapperDict(genome_dict) - logging.debug(data) + LOGGER.debug(data) f = open(cfg_filepath, 'w') f.write(data % (mapper_dict)) f.close() - + return True - + def configure(conf_info): @@ -448,7 +450,7 @@ def configure(conf_info): fout = open(stdout_filepath, 'w') ferr = open(stderr_filepath, 'w') - + pipe = subprocess.Popen(['goat_pipeline.py', '--GERALD=%s' % (conf_info.config_filepath), '--make', @@ -462,12 +464,12 @@ def configure(conf_info): # Clean up fout.close() ferr.close() - - + + ################## # Process stdout fout = open(stdout_filepath, 'r') - + stdout_line = fout.readline() complete = False @@ -482,9 +484,9 @@ def configure(conf_info): #error_code = pipe.wait() if error_code: - logging.error('Recieved error_code: %s' % (error_code)) + LOGGER.error('Recieved error_code: %s' % (error_code)) else: - logging.info('We are go for launch!') + LOGGER.info('We are go for launch!') #Process stderr ferr = open(stderr_filepath, 'r') @@ -518,9 +520,9 @@ def configure(conf_info): # we didn't retrieve the path info, log it. if status is True: if conf_info.bustard_path is None or conf_info.run_path is None: - logging.error("Failed to retrieve run_path") + LOGGER.error("Failed to retrieve run_path") return False - + return status @@ -530,7 +532,7 @@ def run_pipeline(conf_info): """ # Fail if the run_path doesn't actually exist if not os.path.exists(conf_info.run_path): - logging.error('Run path does not exist: %s' \ + LOGGER.error('Run path does not exist: %s' \ % (conf_info.run_path)) return False @@ -550,8 +552,8 @@ def run_pipeline(conf_info): wdd = wm.add_watch(conf_info.run_path, mask, rec=True) # Log pipeline starting - logging.info('STARTING PIPELINE @ %s' % (time.ctime())) - + LOGGER.info('STARTING PIPELINE @ %s' % (time.ctime())) + # Start the pipeline (and hide!) #pipe = subprocess.Popen(['make', # '-j8', diff --git a/htsworkflow/pipelines/eland.py b/htsworkflow/pipelines/eland.py index bd478aa..c3aa327 100644 --- a/htsworkflow/pipelines/eland.py +++ b/htsworkflow/pipelines/eland.py @@ -13,6 +13,8 @@ from htsworkflow.pipelines.runfolder import ElementTree, LANE_LIST from htsworkflow.util.ethelp import indent, flatten from htsworkflow.util.opener import autoopen +LOGGER = logging.getLogger(__name__) + SAMPLE_NAME = 'SampleName' LANE_ID = 'LaneID' END = 'End' @@ -131,7 +133,7 @@ class ElandLane(ResultLane): if os.stat(self.pathname)[stat.ST_SIZE] == 0: raise RuntimeError("Eland isn't done, try again later.") - logging.info("summarizing results for %s" % (self.pathname)) + LOGGER.info("summarizing results for %s" % (self.pathname)) stream = autoopen(self.pathname, 'r') if self.eland_type == ELAND_SINGLE: @@ -427,7 +429,7 @@ class ElandLane(ResultLane): elif tag == READS.lower(): self._reads = int(element.text) else: - logging.warn("ElandLane unrecognized tag %s" % (element.tag,)) + LOGGER.warn("ElandLane unrecognized tag %s" % (element.tag,)) class SequenceLane(ResultLane): XML_VERSION=1 @@ -471,7 +473,7 @@ class SequenceLane(ResultLane): self._guess_sequence_type(self.pathname) - logging.info("summarizing results for %s" % (self.pathname)) + LOGGER.info("summarizing results for %s" % (self.pathname)) lines = 0 f = open(self.pathname) for l in f.xreadlines(): @@ -521,7 +523,7 @@ class SequenceLane(ResultLane): elif tag == SequenceLane.SEQUENCE_TYPE.lower(): self.sequence_type = lookup_sequence_type.get(element.text, None) else: - logging.warn("SequenceLane unrecognized tag %s" % (element.tag,)) + LOGGER.warn("SequenceLane unrecognized tag %s" % (element.tag,)) class ELAND(object): """ @@ -580,10 +582,10 @@ def check_for_eland_file(basedir, pattern, lane_id, end): full_lane_id = "%d_%d" % ( lane_id, end ) basename = pattern % (full_lane_id,) - logging.info("Eland pattern: %s" %(basename,)) + LOGGER.info("Eland pattern: %s" %(basename,)) pathname = os.path.join(basedir, basename) if os.path.exists(pathname): - logging.info('found eland file in %s' % (pathname,)) + LOGGER.info('found eland file in %s' % (pathname,)) return pathname else: return None @@ -594,7 +596,7 @@ def update_result_with_eland(gerald, results, lane_id, end, pathname, genome_map # but I needed to persist the sample_name/lane_id for # runfolder summary_report path, name = os.path.split(pathname) - logging.info("Adding eland file %s" %(name,)) + LOGGER.info("Adding eland file %s" %(name,)) # split_name = name.split('_') # lane_id = int(split_name[1]) @@ -674,14 +676,14 @@ def eland(gerald_dir, gerald=None, genome_maps=None): update_result_with_sequence(gerald, e.results, lane_id, end, pathname) break else: - logging.debug("No eland file found in %s for lane %s and end %s" %(basedir, lane_id, end)) + LOGGER.debug("No eland file found in %s for lane %s and end %s" %(basedir, lane_id, end)) continue return e def build_genome_fasta_map(genome_dir): # build fasta to fasta file map - logging.info("Building genome map") + LOGGER.info("Building genome map") genome = genome_dir.split(os.path.sep)[-1] fasta_map = {} for vld_file in glob(os.path.join(genome_dir, '*.vld')): @@ -717,9 +719,9 @@ def main(cmdline=None): from optparse import OptionParser parser = OptionParser("%prog: +") opts, args = parser.parse_args(cmdline) - logging.basicConfig(level=logging.DEBUG) + LOGGER.basicConfig(level=logging.DEBUG) for a in args: - logging.info("Starting scan of %s" % (a,)) + LOGGER.info("Starting scan of %s" % (a,)) e = eland(a) print e.get_elements() diff --git a/htsworkflow/pipelines/firecrest.py b/htsworkflow/pipelines/firecrest.py index fe5d01a..7d92050 100644 --- a/htsworkflow/pipelines/firecrest.py +++ b/htsworkflow/pipelines/firecrest.py @@ -1,17 +1,18 @@ """ Extract information about the Firecrest run -Firecrest +Firecrest class holding the properties we found -firecrest +firecrest Firecrest factory function initalized from a directory name -fromxml +fromxml Firecrest factory function initalized from an xml dump from the Firecrest object. """ from datetime import date from glob import glob +import logging import os import re import time @@ -21,6 +22,8 @@ from htsworkflow.pipelines.runfolder import \ VERSION_RE, \ EUROPEAN_STRPTIME +LOGGER = logging.getLogger(__name__) + __docformat__ = "restructuredtext en" class Firecrest(object): @@ -45,7 +48,7 @@ class Firecrest(object): if xml is not None: self.set_elements(xml) - + def _get_time(self): return time.mktime(self.date.timetuple()) time = property(_get_time, doc='return run time as seconds since epoch') @@ -80,7 +83,7 @@ class Firecrest(object): raise ValueError('Expected "Firecrest" SubElements') xml_version = int(tree.attrib.get('version', 0)) if xml_version > Firecrest.XML_VERSION: - logging.warn('Firecrest XML tree is a higher version than this class') + LOGGER.warn('Firecrest XML tree is a higher version than this class') for element in list(tree): if element.tag == Firecrest.SOFTWARE_VERSION: self.version = element.text @@ -121,7 +124,7 @@ def firecrest(pathname): f.user = groups[3] bustard_pattern = os.path.join(pathname, 'Bustard*') - # should I parse this deeper than just stashing the + # should I parse this deeper than just stashing the # contents of the matrix file? matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt') if os.path.exists(matrix_pathname): diff --git a/htsworkflow/pipelines/genome_mapper.py b/htsworkflow/pipelines/genome_mapper.py index 5ae788e..fb16d7f 100644 --- a/htsworkflow/pipelines/genome_mapper.py +++ b/htsworkflow/pipelines/genome_mapper.py @@ -8,6 +8,7 @@ import logging from htsworkflow.util.alphanum import alphanum +LOGGER = logging.getLogger(__name__) class DuplicateGenome(Exception): pass @@ -22,7 +23,7 @@ def getAvailableGenomes(genome_base_dir): """ raises IOError (on genome_base_dir not found) raises DuplicateGenome on duplicate genomes found. - + returns a double dictionary (i.e. d[species][build] = path) """ @@ -52,8 +53,8 @@ def getAvailableGenomes(genome_base_dir): try: species, build = line.split('|') except: - logging.warning('Skipping: Invalid metafile (%s) line: %s' \ - % (metafile, line)) + LOGGER.warning('Skipping: Invalid metafile (%s) line: %s' \ + % (metafile, line)) continue build_dict = d.setdefault(species, {}) @@ -64,12 +65,12 @@ def getAvailableGenomes(genome_base_dir): build_dict[build] = genome_dir return d - + class constructMapperDict(object): """ Emulate a dictionary to map genome|build names to paths. - + It uses the dictionary generated by getAvailableGenomes. """ def __init__(self, genome_dict): @@ -80,19 +81,19 @@ class constructMapperDict(object): Return the best match for key """ elements = re.split("\|", key) - + if len(elements) == 1: # we just the species name # get the set of builds builds = self.genome_dict[elements[0]] - + # sort build names the way humans would keys = builds.keys() keys.sort(cmp=alphanum) - + # return the path from the 'last' build name return builds[keys[-1]] - + elif len(elements) == 2: # we have species, and build name return self.genome_dict[elements[0]][elements[1]] @@ -104,21 +105,21 @@ class constructMapperDict(object): return self[key] except KeyError, e: return default - + def keys(self): keys = [] for species in self.genome_dict.keys(): for build in self.genome_dict[species]: keys.append([species+'|'+build]) return keys - + def values(self): values = [] for species in self.genome_dict.keys(): for build in self.genome_dict[species]: values.append(self.genome_dict[species][build]) return values - + def items(self): items = [] for species in self.genome_dict.keys(): @@ -127,7 +128,7 @@ class constructMapperDict(object): value = self.genome_dict[species][build] items.append((key, value)) return items - + if __name__ == '__main__': if len(sys.argv) != 2: @@ -139,5 +140,5 @@ if __name__ == '__main__': for k,v in d2.items(): print '%s: %s' % (k,v) - - + + diff --git a/htsworkflow/pipelines/gerald.py b/htsworkflow/pipelines/gerald.py index f62b4f2..06fc94b 100644 --- a/htsworkflow/pipelines/gerald.py +++ b/htsworkflow/pipelines/gerald.py @@ -16,6 +16,8 @@ from htsworkflow.pipelines.runfolder import \ VERSION_RE from htsworkflow.util.ethelp import indent, flatten +LOGGER = logging.getLogger(__name__) + class Gerald(object): """ Capture meaning out of the GERALD directory @@ -205,7 +207,7 @@ class Gerald(object): raise ValueError('exptected GERALD') xml_version = int(tree.attrib.get('version', 0)) if xml_version > Gerald.XML_VERSION: - logging.warn('XML tree is a higher version than this class') + LOGGER.warn('XML tree is a higher version than this class') self.eland_results = ELAND() for element in list(tree): tag = element.tag.lower() @@ -216,23 +218,23 @@ class Gerald(object): elif tag == ELAND.ELAND.lower(): self.eland_results = ELAND(xml=element) else: - logging.warn("Unrecognized tag %s" % (element.tag,)) + LOGGER.warn("Unrecognized tag %s" % (element.tag,)) def gerald(pathname): g = Gerald() g.pathname = os.path.expanduser(pathname) path, name = os.path.split(g.pathname) - logging.info("Parsing gerald config.xml") + LOGGER.info("Parsing gerald config.xml") config_pathname = os.path.join(g.pathname, 'config.xml') g.tree = ElementTree.parse(config_pathname).getroot() # parse Summary.htm file summary_pathname = os.path.join(g.pathname, 'Summary.xml') if os.path.exists(summary_pathname): - logging.info("Parsing Summary.xml") + LOGGER.info("Parsing Summary.xml") else: summary_pathname = os.path.join(g.pathname, 'Summary.htm') - logging.info("Parsing Summary.htm") + LOGGER.info("Parsing Summary.htm") g.summary = Summary(summary_pathname) # parse eland files g.eland_results = eland(g.pathname, g) diff --git a/htsworkflow/pipelines/ipar.py b/htsworkflow/pipelines/ipar.py index b7d5a54..e17db4b 100644 --- a/htsworkflow/pipelines/ipar.py +++ b/htsworkflow/pipelines/ipar.py @@ -1,7 +1,7 @@ """ Extract information about the IPAR run -IPAR +IPAR class holding the properties we found ipar IPAR factory function initalized from a directory name @@ -24,6 +24,7 @@ from htsworkflow.pipelines.runfolder import \ VERSION_RE, \ EUROPEAN_STRPTIME +LOGGER = logging.getLogger(__name__) SOFTWARE_NAMES = ('IPAR_1.01', 'IPAR_1.3', 'Intensities') class Tiles(object): @@ -163,7 +164,7 @@ class IPAR(object): raise ValueError('Expected "IPAR" SubElements') xml_version = int(tree.attrib.get('version', 0)) if xml_version > IPAR.XML_VERSION: - logging.warn('IPAR XML tree is a higher version than this class') + LOGGER.warn('IPAR XML tree is a higher version than this class') for element in list(tree): if element.tag == IPAR.RUN: self.tree = element @@ -184,14 +185,14 @@ def load_ipar_param_tree(paramfile): if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES: return run else: - logging.info("No run found") + LOGGER.info("No run found") return None def ipar(pathname): """ Examine the directory at pathname and initalize a IPAR object """ - logging.info("Searching IPAR directory %s" % (pathname,)) + LOGGER.info("Searching IPAR directory %s" % (pathname,)) i = IPAR() i.pathname = pathname @@ -216,7 +217,7 @@ def ipar(pathname): os.path.join(path, '.params')] for paramfile in paramfiles: if os.path.exists(paramfile): - logging.info("Found IPAR Config file at: %s" % ( paramfile, )) + LOGGER.info("Found IPAR Config file at: %s" % ( paramfile, )) i.tree = load_ipar_param_tree(paramfile) mtime_local = os.stat(paramfile)[stat.ST_MTIME] i.time = mtime_local diff --git a/htsworkflow/pipelines/retrieve_config.py b/htsworkflow/pipelines/retrieve_config.py index 2292cb9..42ff263 100644 --- a/htsworkflow/pipelines/retrieve_config.py +++ b/htsworkflow/pipelines/retrieve_config.py @@ -25,6 +25,8 @@ from htsworkflow.pipelines.runfolder import LANE_LIST # JSON dictionaries use strings LANE_LIST_JSON = [ str(l) for l in LANE_LIST ] +LOGGER = logging.getLogger(__name__) + __docformat__ = "restructredtext en" CONFIG_SYSTEM = '/etc/htsworkflow.ini' @@ -48,8 +50,8 @@ def retrieve_flowcell_info(base_host_url, flowcell): web = urllib2.urlopen(url, apipayload) except urllib2.URLError, e: errmsg = 'URLError: %d %s' % (e.code, e.msg) - logging.error(errmsg) - logging.error('opened %s' % (url,)) + LOGGER.error(errmsg) + LOGGER.error('opened %s' % (url,)) raise IOError(errmsg) contents = web.read() @@ -143,10 +145,10 @@ def format_gerald_config(options, flowcell_info, genome_map): lane_prefix = u"".join(lane_numbers) species_path = genome_map.get(species, None) - logging.debug("Looked for genome '%s' got location '%s'" % (species, species_path)) + LOGGER.debug("Looked for genome '%s' got location '%s'" % (species, species_path)) if not is_sequencing and species_path is None: no_genome_msg = "Forcing lanes %s to sequencing as there is no genome for %s" - logging.warning(no_genome_msg % (lane_numbers, species)) + LOGGER.warning(no_genome_msg % (lane_numbers, species)) is_sequencing = True if is_sequencing: @@ -306,25 +308,25 @@ def saveConfigFile(options): retrieves the flowcell eland config file, give the base_host_url (i.e. http://sub.domain.edu:port) """ - logging.info('USING OPTIONS:') - logging.info(u' URL: %s' % (options.url,)) - logging.info(u' OUT: %s' % (options.output_filepath,)) - logging.info(u' FC: %s' % (options.flowcell,)) - #logging.info(': %s' % (options.genome_dir,)) - logging.info(u'post_run: %s' % ( unicode(options.post_run),)) + LOGGER.info('USING OPTIONS:') + LOGGER.info(u' URL: %s' % (options.url,)) + LOGGER.info(u' OUT: %s' % (options.output_filepath,)) + LOGGER.info(u' FC: %s' % (options.flowcell,)) + #LOGGER.info(': %s' % (options.genome_dir,)) + LOGGER.info(u'post_run: %s' % ( unicode(options.post_run),)) flowcell_info = retrieve_flowcell_info(options.url, options.flowcell) - logging.debug('genome_dir: %s' % ( options.genome_dir, )) + LOGGER.debug('genome_dir: %s' % ( options.genome_dir, )) available_genomes = getAvailableGenomes(options.genome_dir) genome_map = constructMapperDict(available_genomes) - logging.debug('available genomes: %s' % ( unicode( genome_map.keys() ),)) + LOGGER.debug('available genomes: %s' % ( unicode( genome_map.keys() ),)) #config = format_gerald_config(options, flowcell_info, genome_map) # #if options.output_filepath is not None: # outstream = open(options.output_filepath, 'w') - # logging.info('Writing config file to %s' % (options.output_filepath,)) + # LOGGER.info('Writing config file to %s' % (options.output_filepath,)) #else: # outstream = sys.stdout # diff --git a/htsworkflow/pipelines/runfolder.py b/htsworkflow/pipelines/runfolder.py index e1b474b..568e683 100644 --- a/htsworkflow/pipelines/runfolder.py +++ b/htsworkflow/pipelines/runfolder.py @@ -17,6 +17,8 @@ try: except ImportError, e: from elementtree import ElementTree +LOGGER = logging.getLogger(__name__) + EUROPEAN_STRPTIME = "%d-%m-%Y" EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})" VERSION_RE = "([0-9\.]+)" @@ -68,7 +70,7 @@ class PipelineRun(object): else: flowcell_id = 'unknown' - logging.warning( + LOGGER.warning( "Flowcell id was not found, guessing %s" % ( flowcell_id)) self._flowcell_id = flowcell_id @@ -121,7 +123,7 @@ class PipelineRun(object): elif tag == gerald.Gerald.GERALD.lower(): self.gerald = gerald.Gerald(xml=element) else: - logging.warn('PipelineRun unrecognized tag %s' % (tag,)) + LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,)) def _get_run_name(self): """ @@ -137,14 +139,14 @@ class PipelineRun(object): def save(self, destdir=None): if destdir is None: destdir = '' - logging.info("Saving run report " + self.name) + LOGGER.info("Saving run report " + self.name) xml = self.get_elements() indent(xml) dest_pathname = os.path.join(destdir, self.name) ElementTree.ElementTree(xml).write(dest_pathname) def load(self, filename): - logging.info("Loading run report from " + filename) + LOGGER.info("Loading run report from " + filename) tree = ElementTree.parse(filename).getroot() self.set_elements(tree) @@ -152,7 +154,7 @@ def load_pipeline_run_xml(pathname): """ Load and instantiate a Pipeline run from a run xml file - :Parameters: + :Parameters: - `pathname` : location of an run xml file :Returns: initialized PipelineRun object @@ -176,17 +178,17 @@ def get_runs(runfolder): from htsworkflow.pipelines import gerald def scan_post_image_analysis(runs, runfolder, image_analysis, pathname): - logging.info("Looking for bustard directories in %s" % (pathname,)) + LOGGER.info("Looking for bustard directories in %s" % (pathname,)) bustard_dirs = glob(os.path.join(pathname, "Bustard*")) # RTA BaseCalls looks enough like Bustard. bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls"))) for bustard_pathname in bustard_dirs: - logging.info("Found bustard directory %s" % (bustard_pathname,)) + LOGGER.info("Found bustard directory %s" % (bustard_pathname,)) b = bustard.bustard(bustard_pathname) gerald_glob = os.path.join(bustard_pathname, 'GERALD*') - logging.info("Looking for gerald directories in %s" % (pathname,)) + LOGGER.info("Looking for gerald directories in %s" % (pathname,)) for gerald_pathname in glob(gerald_glob): - logging.info("Found gerald directory %s" % (gerald_pathname,)) + LOGGER.info("Found gerald directory %s" % (gerald_pathname,)) try: g = gerald.gerald(gerald_pathname) p = PipelineRun(runfolder) @@ -195,18 +197,18 @@ def get_runs(runfolder): p.gerald = g runs.append(p) except IOError, e: - logging.error("Ignoring " + str(e)) + LOGGER.error("Ignoring " + str(e)) datadir = os.path.join(runfolder, 'Data') - logging.info('Searching for runs in ' + datadir) + LOGGER.info('Searching for runs in ' + datadir) runs = [] # scan for firecrest directories for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")): - logging.info('Found firecrest in ' + datadir) + LOGGER.info('Found firecrest in ' + datadir) image_analysis = firecrest.firecrest(firecrest_pathname) if image_analysis is None: - logging.warn( + LOGGER.warn( "%s is an empty or invalid firecrest directory" % (firecrest_pathname,) ) else: @@ -218,10 +220,10 @@ def get_runs(runfolder): # The Intensities directory from the RTA software looks a lot like IPAR ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities'))) for ipar_pathname in ipar_dirs: - logging.info('Found ipar directories in ' + datadir) + LOGGER.info('Found ipar directories in ' + datadir) image_analysis = ipar.ipar(ipar_pathname) if image_analysis is None: - logging.warn( + LOGGER.warn( "%s is an empty or invalid IPAR directory" % (ipar_pathname,) ) else: @@ -250,19 +252,19 @@ def get_specific_run(gerald_dir): runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..')) - logging.info('--- use-run detected options ---') - logging.info('runfolder: %s' % (runfolder_dir,)) - logging.info('image_dir: %s' % (image_dir,)) - logging.info('bustard_dir: %s' % (bustard_dir,)) - logging.info('gerald_dir: %s' % (gerald_dir,)) + LOGGER.info('--- use-run detected options ---') + LOGGER.info('runfolder: %s' % (runfolder_dir,)) + LOGGER.info('image_dir: %s' % (image_dir,)) + LOGGER.info('bustard_dir: %s' % (bustard_dir,)) + LOGGER.info('gerald_dir: %s' % (gerald_dir,)) # find our processed image dir image_run = None # split into parent, and leaf directory # leaf directory should be an IPAR or firecrest directory data_dir, short_image_dir = os.path.split(image_dir) - logging.info('data_dir: %s' % (data_dir,)) - logging.info('short_iamge_dir: %s' % (short_image_dir,)) + LOGGER.info('data_dir: %s' % (data_dir,)) + LOGGER.info('short_iamge_dir: %s' % (short_image_dir,)) # guess which type of image processing directory we have by looking # in the leaf directory name @@ -273,22 +275,22 @@ def get_specific_run(gerald_dir): elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None: image_run = ipar.ipar(image_dir) - # if we din't find a run, report the error and return + # if we din't find a run, report the error and return if image_run is None: msg = '%s does not contain an image processing step' % (image_dir,) - logging.error(msg) + LOGGER.error(msg) return None # find our base calling base_calling_run = bustard.bustard(bustard_dir) if base_calling_run is None: - logging.error('%s does not contain a bustard run' % (bustard_dir,)) + LOGGER.error('%s does not contain a bustard run' % (bustard_dir,)) return None # find alignments gerald_run = gerald.gerald(gerald_dir) if gerald_run is None: - logging.error('%s does not contain a gerald run' % (gerald_dir,)) + LOGGER.error('%s does not contain a gerald run' % (gerald_dir,)) return None p = PipelineRun(runfolder_dir) @@ -296,7 +298,7 @@ def get_specific_run(gerald_dir): p.bustard = base_calling_run p.gerald = gerald_run - logging.info('Constructed PipelineRun from %s' % (gerald_dir,)) + LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,)) return p def extract_run_parameters(runs): @@ -397,7 +399,7 @@ def save_flowcell_reports(data_dir, cycle_dir): cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ] if os.path.exists(status_file): cmd_list.extend(['Status.xml', 'Status.xsl']) - logging.info("Saving reports from " + reports_dir) + LOGGER.info("Saving reports from " + reports_dir) cwd = os.getcwd() os.chdir(data_dir) q = QueueCommands([" ".join(cmd_list)]) @@ -409,10 +411,10 @@ def save_summary_file(gerald_object, cycle_dir): # Copy Summary.htm summary_path = os.path.join(gerald_object.pathname, 'Summary.htm') if os.path.exists(summary_path): - logging.info('Copying %s to %s' % (summary_path, cycle_dir)) + LOGGER.info('Copying %s to %s' % (summary_path, cycle_dir)) shutil.copy(summary_path, cycle_dir) else: - logging.info('Summary file %s was not found' % (summary_path,)) + LOGGER.info('Summary file %s was not found' % (summary_path,)) def save_ivc_plot(bustard_object, cycle_dir): """ @@ -425,15 +427,15 @@ def save_ivc_plot(bustard_object, cycle_dir): plot_target_path = os.path.join(cycle_dir, 'Plots') if os.path.exists(plot_html): - logging.debug("Saving %s" % (plot_html,)) - logging.debug("Saving %s" % (plot_images,)) + LOGGER.debug("Saving %s" % (plot_html,)) + LOGGER.debug("Saving %s" % (plot_images,)) shutil.copy(plot_html, cycle_dir) if not os.path.exists(plot_target_path): os.mkdir(plot_target_path) for plot_file in glob(plot_images): shutil.copy(plot_file, plot_target_path) else: - logging.warning('Missing IVC.html file, not archiving') + LOGGER.warning('Missing IVC.html file, not archiving') def compress_score_files(bustard_object, cycle_dir): @@ -456,10 +458,10 @@ def compress_score_files(bustard_object, cycle_dir): bzip_cmd = [ 'bzip2', '-9', '-c' ] tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2') tar_dest = open(tar_dest_name, 'w') - logging.info("Compressing score files from %s" % (scores_path,)) - logging.info("Running tar: " + " ".join(tar_cmd[:10])) - logging.info("Running bzip2: " + " ".join(bzip_cmd)) - logging.info("Writing to %s" % (tar_dest_name,)) + LOGGER.info("Compressing score files from %s" % (scores_path,)) + LOGGER.info("Running tar: " + " ".join(tar_cmd[:10])) + LOGGER.info("Running bzip2: " + " ".join(bzip_cmd)) + LOGGER.info("Writing to %s" % (tar_dest_name,)) env = {'BZIP': '-9'} tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env, @@ -479,26 +481,26 @@ def compress_eland_results(gerald_object, cycle_dir, num_jobs=1): for eland_lane in lanes_dictionary.values(): source_name = eland_lane.pathname if source_name is None: - logging.info( + LOGGER.info( "Lane ID %s does not have a filename." % (eland_lane.lane_id,)) else: path, name = os.path.split(source_name) dest_name = os.path.join(cycle_dir, name) - logging.info("Saving eland file %s to %s" % \ + LOGGER.info("Saving eland file %s to %s" % \ (source_name, dest_name)) if is_compressed(name): - logging.info('Already compressed, Saving to %s' % (dest_name,)) + LOGGER.info('Already compressed, Saving to %s' % (dest_name,)) shutil.copy(source_name, dest_name) else: # not compressed dest_name += '.bz2' args = ['bzip2', '-9', '-c', source_name, '>', dest_name ] bz_commands.append(" ".join(args)) - #logging.info('Running: %s' % ( " ".join(args) )) + #LOGGER.info('Running: %s' % ( " ".join(args) )) #bzip_dest = open(dest_name, 'w') #bzip = subprocess.Popen(args, stdout=bzip_dest) - #logging.info('Saving to %s' % (dest_name, )) + #LOGGER.info('Saving to %s' % (dest_name, )) #bzip.wait() if len(bz_commands) > 0: @@ -520,17 +522,17 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, r for r in runs: result_dir = os.path.join(output_base_dir, r.flowcell_id) - logging.info("Using %s as result directory" % (result_dir,)) + LOGGER.info("Using %s as result directory" % (result_dir,)) if not os.path.exists(result_dir): os.mkdir(result_dir) # create cycle_dir cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop) - logging.info("Filling in %s" % (cycle,)) + LOGGER.info("Filling in %s" % (cycle,)) cycle_dir = os.path.join(result_dir, cycle) cycle_dir = os.path.abspath(cycle_dir) if os.path.exists(cycle_dir): - logging.error("%s already exists, not overwriting" % (cycle_dir,)) + LOGGER.error("%s already exists, not overwriting" % (cycle_dir,)) continue else: os.mkdir(cycle_dir) @@ -578,24 +580,24 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, r def rm_list(files, dry_run=True): for f in files: if os.path.exists(f): - logging.info('deleting %s' % (f,)) + LOGGER.info('deleting %s' % (f,)) if not dry_run: if os.path.isdir(f): shutil.rmtree(f) else: os.unlink(f) else: - logging.warn("%s doesn't exist." % (f,)) + LOGGER.warn("%s doesn't exist." % (f,)) def clean_runs(runs, dry_run=True): """ Clean up run folders to optimize for compression. """ if dry_run: - logging.info('In dry-run mode') + LOGGER.info('In dry-run mode') for run in runs: - logging.info('Cleaninging %s' % (run.pathname,)) + LOGGER.info('Cleaninging %s' % (run.pathname,)) # rm RunLog*.xml runlogs = glob(os.path.join(run.pathname, 'RunLog*xml')) rm_list(runlogs, dry_run) @@ -611,11 +613,11 @@ def clean_runs(runs, dry_run=True): calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*')) rm_list(calibration_dir, dry_run) # rm Images/L* - logging.info("Cleaning images") + LOGGER.info("Cleaning images") image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*')) rm_list(image_dirs, dry_run) # cd Data/C1-*_Firecrest* - logging.info("Cleaning intermediate files") + LOGGER.info("Cleaning intermediate files") # make clean_intermediate if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')): clean_process = subprocess.Popen(['make', 'clean_intermediate'], diff --git a/htsworkflow/pipelines/sequences.py b/htsworkflow/pipelines/sequences.py index 24cf884..993bcc9 100644 --- a/htsworkflow/pipelines/sequences.py +++ b/htsworkflow/pipelines/sequences.py @@ -5,6 +5,8 @@ import logging import os import re +LOGGER = logging.getLogger(__name__) + eland_re = re.compile('s_(?P\d)(_(?P\d))?_eland_') raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+') qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2') @@ -30,7 +32,7 @@ CREATE TABLE %(table)s ( class SequenceFile(object): """ Simple container class that holds the path to a sequence archive - and basic descriptive information. + and basic descriptive information. """ def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None): self.filetype = filetype @@ -87,7 +89,7 @@ class SequenceFile(object): Add this entry to a DB2.0 database. """ #FIXME: NEEDS SQL ESCAPING - header_macro = {'table': SEQUENCE_TABLE_NAME } + header_macro = {'table': SEQUENCE_TABLE_NAME } sql_header = "insert into %(table)s (" % header_macro sql_columns = ['filetype','path','flowcell','lane'] sql_middle = ") values (" @@ -125,9 +127,9 @@ def get_flowcell_cycle(path): stop = cycle_match.group('stop') if stop is not None: stop = int(stop) - + return flowcell, start, stop - + def parse_srf(path, filename): flowcell_dir, start, stop = get_flowcell_cycle(path) basename, ext = os.path.splitext(filename) @@ -137,7 +139,7 @@ def parse_srf(path, filename): fullpath = os.path.join(path, filename) if flowcell_dir != flowcell: - logging.warn("flowcell %s found in wrong directory %s" % \ + LOGGER.warn("flowcell %s found in wrong directory %s" % \ (flowcell, path)) return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop) @@ -152,7 +154,7 @@ def parse_qseq(path, filename): read = int(records[6][1]) if flowcell_dir != flowcell: - logging.warn("flowcell %s found in wrong directory %s" % \ + LOGGER.warn("flowcell %s found in wrong directory %s" % \ (flowcell, path)) return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop) @@ -168,9 +170,9 @@ def parse_fastq(path, filename): lane = int(records[5][1]) read = int(records[6][1]) pf = parse_fastq_pf_flag(records) - + if flowcell_dir != flowcell: - logging.warn("flowcell %s found in wrong directory %s" % \ + LOGGER.warn("flowcell %s found in wrong directory %s" % \ (flowcell, path)) return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf, cycle=stop) @@ -193,7 +195,7 @@ def parse_fastq_pf_flag(records): (records[-1], os.path.join(path,filename))) return pf - + def parse_eland(path, filename, eland_match=None): if eland_match is None: eland_match = eland_re.match(filename) @@ -208,18 +210,18 @@ def parse_eland(path, filename, eland_match=None): else: read = None return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop) - + def scan_for_sequences(dirs): """ Scan through a list of directories for sequence like files """ sequences = [] for d in dirs: - logging.info("Scanning %s for sequences" % (d,)) + LOGGER.info("Scanning %s for sequences" % (d,)) if not os.path.exists(d): - logging.warn("Flowcell directory %s does not exist" % (d,)) + LOGGER.warn("Flowcell directory %s does not exist" % (d,)) continue - + for path, dirname, filenames in os.walk(d): for f in filenames: seq = None @@ -240,6 +242,6 @@ def scan_for_sequences(dirs): seq = parse_eland(path, f, eland_match) if seq: sequences.append(seq) - logging.debug("Found sequence at %s" % (f,)) - + LOGGER.debug("Found sequence at %s" % (f,)) + return sequences diff --git a/htsworkflow/pipelines/srf.py b/htsworkflow/pipelines/srf.py index cc18198..7f11cbc 100644 --- a/htsworkflow/pipelines/srf.py +++ b/htsworkflow/pipelines/srf.py @@ -4,6 +4,8 @@ import os from htsworkflow.util import queuecommands +LOGGER = logging.getLogger(__name__) + SOLEXA2SRF = 0 ILLUMINA2SRF10 = 1 ILLUMINA2SRF11 = 2 @@ -36,17 +38,17 @@ def pathname_to_run_name(base): def make_srf_commands(run_name, bustard_dir, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11): """ make a subprocess-friendly list of command line arguments to run solexa2srf - generates files like: + generates files like: woldlab:080514_HWI-EAS229_0029_20768AAXX:8.srf site run name lane - + run_name - most of the file name (run folder name is a good choice) lanes - list of integers corresponding to which lanes to process site_name - name of your "sequencing site" or "Individual" destdir - where to write all the srf files """ # clean up pathname - logging.info("run_name %s" % (run_name,)) + LOGGER.info("run_name %s" % (run_name,)) cmd_list = [] for lane in lanes: @@ -75,7 +77,7 @@ def make_srf_commands(run_name, bustard_dir, lanes, site_name, destdir, cmdlevel else: raise ValueError("Unrecognized run level %d" % (cmdlevel,)) - logging.info("Generated command: " + " ".join(cmd)) + LOGGER.info("Generated command: " + " ".join(cmd)) cmd_list.append(" ".join(cmd)) return cmd_list @@ -112,17 +114,17 @@ def create_qseq_patterns(bustard_dir): def make_qseq_commands(run_name, bustard_dir, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11): """ make a subprocess-friendly list of command line arguments to run solexa2srf - generates files like: + generates files like: woldlab:080514_HWI-EAS229_0029_20768AAXX:8.srf site run name lane - + run_name - most of the file name (run folder name is a good choice) lanes - list of integers corresponding to which lanes to process site_name - name of your "sequencing site" or "Individual" destdir - where to write all the srf files """ # clean up pathname - logging.info("run_name %s" % (run_name,)) + LOGGER.info("run_name %s" % (run_name,)) cmd_list = [] for lane in lanes: @@ -139,13 +141,13 @@ def make_qseq_commands(run_name, bustard_dir, lanes, site_name, destdir, cmdleve dest_path = os.path.join(destdir, destname) cmd = " ".join(['tar', 'cjf', dest_path, pattern % (lane,) ]) - logging.info("Generated command: " + cmd) + LOGGER.info("Generated command: " + cmd) cmd_list.append(cmd) return cmd_list def run_commands(new_dir, cmd_list, num_jobs): - logging.info("chdir to %s" % (new_dir,)) + LOGGER.info("chdir to %s" % (new_dir,)) curdir = os.getcwd() os.chdir(new_dir) q = queuecommands.QueueCommands(cmd_list, num_jobs) @@ -166,7 +168,7 @@ def make_md5_commands(destdir): for f in file_list: cmd = " ".join(['md5sum', f, '>', f + '.md5']) - logging.info('generated command: ' + cmd) + LOGGER.info('generated command: ' + cmd) cmd_list.append(cmd) return cmd_list diff --git a/htsworkflow/pipelines/srf2fastq.py b/htsworkflow/pipelines/srf2fastq.py index 3131fdd..61f93c7 100755 --- a/htsworkflow/pipelines/srf2fastq.py +++ b/htsworkflow/pipelines/srf2fastq.py @@ -9,6 +9,8 @@ import sys from htsworkflow.util.opener import autoopen from htsworkflow.version import version +LOGGER = logging.getLogger(__name__) + # constants for our fastq finite state machine FASTQ_HEADER = 0 FASTQ_SEQUENCE = 1 @@ -41,7 +43,7 @@ def main(cmdline=None): else: left = open_write(opts.left, opts.force) right = open_write(opts.right, opts.force) - + # open the srf, fastq, or compressed fastq if is_srf(args[0]): source = srf_open(args[0], opts.cnf1) @@ -52,7 +54,7 @@ def main(cmdline=None): convert_single_to_fastq(source, left, header) else: convert_single_to_two_fastq(source, left, right, opts.mid, header) - + return 0 def make_parser(): @@ -67,7 +69,7 @@ You can also force the flowcell ID to be added to the header.""") help="add flowcell id header to sequence") parser.add_option('-l','--left', default="r1.fastq", help='left side filename') - parser.add_option('-m','--mid', default=None, + parser.add_option('-m','--mid', default=None, help='actual sequence mid point') parser.add_option('-r','--right', default="r2.fastq", help='right side filename') @@ -90,11 +92,11 @@ def srf_open(filename, cnf1=False): if cnf1 or is_cnf1(filename): cmd.append('-c') cmd.append(filename) - - logging.info('srf command: %s' % (" ".join(cmd),)) + + LOGGER.info('srf command: %s' % (" ".join(cmd),)) p = Popen(cmd, stdout=PIPE) return p.stdout - + def convert_single_to_fastq(instream, target1, header=''): @@ -111,7 +113,7 @@ def convert_single_to_fastq(instream, target1, header=''): state = FASTQ_SEQUENCE_HEADER # quality header elif state == FASTQ_SEQUENCE_HEADER: - # the sequence header isn't really sequence, but + # the sequence header isn't really sequence, but # we're just passing it through write_sequence(target1, line) state = FASTQ_QUALITY @@ -123,10 +125,10 @@ def convert_single_to_fastq(instream, target1, header=''): raise RuntimeError("Unrecognized STATE in fastq split") - + def convert_single_to_two_fastq(instream, target1, target2, mid=None, header=''): """ - read a fastq file where two paired ends have been run together into + read a fastq file where two paired ends have been run together into two halves. instream is the source stream @@ -151,7 +153,7 @@ def convert_single_to_two_fastq(instream, target1, target2, mid=None, header='') state = FASTQ_SEQUENCE_HEADER # quality header elif state == FASTQ_SEQUENCE_HEADER: - # the sequence header isn't really sequence, but + # the sequence header isn't really sequence, but # we're just passing it through write_sequence(target1, line) write_sequence(target2, line) @@ -197,7 +199,7 @@ def is_cnf1(filename): """ max_header = 1024 ** 2 PROGRAM_ID = 'PROGRAM_ID\000' - cnf4_apps = set(("solexa2srf v1.4", + cnf4_apps = set(("solexa2srf v1.4", "illumina2srf v1.11.5.Illumina.1.3")) if not is_srf(filename): diff --git a/htsworkflow/pipelines/summary.py b/htsworkflow/pipelines/summary.py index fb8efdc..d63f265 100644 --- a/htsworkflow/pipelines/summary.py +++ b/htsworkflow/pipelines/summary.py @@ -8,6 +8,7 @@ from pprint import pprint from htsworkflow.pipelines.runfolder import ElementTree from htsworkflow.util.ethelp import indent, flatten +LOGGER = logging.getLogger(__name__) nan = float('nan') class Summary(object): @@ -107,9 +108,9 @@ class Summary(object): for GeraldName, LRSName in Summary.LaneResultSummary.GERALD_TAGS.items(): node = element.find(GeraldName) if node is None: - logging.info("Couldn't find %s" % (GeraldName)) + LOGGER.info("Couldn't find %s" % (GeraldName)) setattr(self, LRSName, parse_xml_mean_range(node)) - + def get_elements(self): lane_result = ElementTree.Element( Summary.LaneResultSummary.LANE_RESULT_SUMMARY, @@ -145,7 +146,7 @@ class Summary(object): setattr(self, variable_name, parse_summary_element(element)) except KeyError, e: - logging.warn('Unrecognized tag %s' % (element.tag,)) + LOGGER.warn('Unrecognized tag %s' % (element.tag,)) def __init__(self, filename=None, xml=None): # lane results is a list of 1 or 2 ends containing @@ -186,7 +187,7 @@ class Summary(object): Extract just the lane results. Currently those are the only ones we care about. """ - + tables = self._extract_named_tables(pathname) @@ -220,7 +221,7 @@ class Summary(object): self._extract_lane_results_for_end(tables, name, end) if len(self.lane_results[0]) == 0: - logging.warning("No Lane Results Summary Found in %s" % (pathname,)) + LOGGER.warning("No Lane Results Summary Found in %s" % (pathname,)) def _extract_named_tables_from_gerald_xml(self, tree): """ @@ -243,7 +244,7 @@ class Summary(object): self.lane_results[lrs.end][lrs.lane] = lrs # probably not useful return tables - + ###### START HTML Table Extraction ######## def _extract_named_tables_from_html(self, tree): body = tree.find('body') @@ -301,7 +302,7 @@ class Summary(object): return ValueError("Expected %s" % (Summary.SUMMARY,)) xml_version = int(tree.attrib.get('version', 0)) if xml_version > Summary.XML_VERSION: - logging.warn('Summary XML tree is a higher version than this class') + LOGGER.warn('Summary XML tree is a higher version than this class') for element in list(tree): lrs = Summary.LaneResultSummary() lrs.set_elements(element) @@ -379,17 +380,17 @@ def parse_xml_mean_range(element): """ if element is None: return None - + mean = element.find('mean') stddev = element.find('stdev') if mean is None or stddev is None: raise RuntimeError("Summary.xml file format changed, expected mean/stddev tags") - if mean.text is None: + if mean.text is None: mean_value = float('nan') else: mean_value = tonumber(mean.text) - if stddev.text is None: + if stddev.text is None: stddev_value = float('nan') else: stddev_value = tonumber(stddev.text) @@ -407,4 +408,4 @@ if __name__ == "__main__": for fname in args: s = Summary(fname) s.dump() - + diff --git a/htsworkflow/submission/condorfastq.py b/htsworkflow/submission/condorfastq.py index 462c177..20afdfd 100644 --- a/htsworkflow/submission/condorfastq.py +++ b/htsworkflow/submission/condorfastq.py @@ -11,7 +11,7 @@ from htsworkflow.pipelines import srf2fastq from htsworkflow.util.api import HtswApi from htsworkflow.util.conversion import parse_flowcell_id -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) class CondorFastqExtract(object): def __init__(self, host, apidata, sequences_path, @@ -34,7 +34,7 @@ class CondorFastqExtract(object): def build_fastqs(self, library_result_map ): """ Generate condor scripts to build any needed fastq files - + Args: library_result_map (list): [(library_id, destination directory), ...] """ @@ -43,24 +43,24 @@ class CondorFastqExtract(object): srf_condor_header = self.get_srf_condor_header() srf_condor_entries = [] lib_db = self.find_archive_sequence_files(library_result_map) - + needed_targets = self.find_missing_targets(library_result_map, lib_db) - + for target_pathname, available_sources in needed_targets.items(): - logger.debug(' target : %s' % (target_pathname,)) - logger.debug(' candidate sources: %s' % (available_sources,)) + LOGGER.debug(' target : %s' % (target_pathname,)) + LOGGER.debug(' candidate sources: %s' % (available_sources,)) if available_sources.has_key('qseq'): source = available_sources['qseq'] qseq_condor_entries.append( - self.condor_qseq_to_fastq(source.path, - target_pathname, + self.condor_qseq_to_fastq(source.path, + target_pathname, source.flowcell) ) elif available_sources.has_key('srf'): source = available_sources['srf'] mid = getattr(source, 'mid_point', None) srf_condor_entries.append( - self.condor_srf_to_fastq(source.path, + self.condor_srf_to_fastq(source.path, target_pathname, source.paired, source.flowcell, @@ -68,17 +68,17 @@ class CondorFastqExtract(object): ) else: print " need file", target_pathname - + if len(srf_condor_entries) > 0: - make_submit_script('srf.fastq.condor', + make_submit_script('srf.fastq.condor', srf_condor_header, srf_condor_entries) - + if len(qseq_condor_entries) > 0: - make_submit_script('qseq.fastq.condor', + make_submit_script('qseq.fastq.condor', qseq_condor_header, qseq_condor_entries) - + def get_qseq_condor_header(self): return """Universe=vanilla @@ -97,18 +97,18 @@ output=%(log)s/srf_pair_fastq.$(process).out error=%(log)s/srf_pair_fastq.$(process).out log=%(log)s/srf_pair_fastq.log environment="PYTHONPATH=%(env)s" - + """ % {'exe': sys.executable, 'log': self.log_path, 'env': os.environ.get('PYTHONPATH', '') } - + def find_archive_sequence_files(self, library_result_map): """ Find archived sequence files associated with our results. """ - logger.debug("Searching for sequence files in: %s" %(self.sequences_path,)) - + LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,)) + lib_db = {} seq_dirs = set() candidate_lanes = {} @@ -116,35 +116,35 @@ environment="PYTHONPATH=%(env)s" lib_info = self.api.get_library(lib_id) lib_info['lanes'] = {} lib_db[lib_id] = lib_info - + for lane in lib_info['lane_set']: lane_key = (lane['flowcell'], lane['lane_number']) candidate_lanes[lane_key] = lib_id - seq_dirs.add(os.path.join(self.sequences_path, - 'flowcells', + seq_dirs.add(os.path.join(self.sequences_path, + 'flowcells', lane['flowcell'])) - logger.debug("Seq_dirs = %s" %(unicode(seq_dirs))) + LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs))) candidate_seq_list = scan_for_sequences(seq_dirs) - + # at this point we have too many sequences as scan_for_sequences # returns all the sequences in a flowcell directory # so lets filter out the extras - + for seq in candidate_seq_list: lane_key = (seq.flowcell, seq.lane) lib_id = candidate_lanes.get(lane_key, None) if lib_id is not None: lib_info = lib_db[lib_id] lib_info['lanes'].setdefault(lane_key, set()).add(seq) - + return lib_db - + def find_missing_targets(self, library_result_map, lib_db): """ Check if the sequence file exists. This requires computing what the sequence name is and checking to see if it can be found in the sequence location. - + Adds seq.paired flag to sequences listed in lib_db[*]['lanes'] """ fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq' @@ -154,15 +154,15 @@ environment="PYTHONPATH=%(env)s" for lib_id, result_dir in library_result_map: lib = lib_db[lib_id] lane_dict = make_lane_dict(lib_db, lib_id) - + for lane_key, sequences in lib['lanes'].items(): for seq in sequences: seq.paired = lane_dict[seq.flowcell]['paired_end'] lane_status = lane_dict[seq.flowcell]['status'] - + if seq.paired and seq.read is None: seq.read = 1 - filename_attributes = { + filename_attributes = { 'flowcell': seq.flowcell, 'lib_id': lib_id, 'lane': seq.lane, @@ -176,21 +176,21 @@ environment="PYTHONPATH=%(env)s" # 30DY0 only ran for 151 bases instead of 152 # it is actually 76 1st read, 75 2nd read seq.mid_point = 76 - + # end filters if seq.paired: target_name = fastq_paired_template % filename_attributes else: target_name = fastq_single_template % filename_attributes - + target_pathname = os.path.join(result_dir, target_name) if self.force or not os.path.exists(target_pathname): t = needed_targets.setdefault(target_pathname, {}) t[seq.filetype] = seq - + return needed_targets - + def condor_srf_to_fastq(self, srf_file, target_pathname, @@ -204,31 +204,31 @@ environment="PYTHONPATH=%(env)s" # this is ugly. I did it because I was pregenerating the target # names before I tried to figure out what sources could generate # those targets, and everything up to this point had been - # one-to-one. So I couldn't figure out how to pair the - # target names. + # one-to-one. So I couldn't figure out how to pair the + # target names. # With this at least the command will run correctly. # however if we rename the default targets, this'll break # also I think it'll generate it twice. - args.extend(['--right', + args.extend(['--right', target_pathname.replace('_r1.fastq', '_r2.fastq')]) else: args.extend(['--single', target_pathname ]) if flowcell is not None: args.extend(['--flowcell', flowcell]) - + if mid is not None: args.extend(['-m', str(mid)]) - + if self.force: args.extend(['--force']) - + script = """arguments="%s" queue """ % (" ".join(args),) - - return script - - + + return script + + def condor_qseq_to_fastq(self, qseq_file, target_pathname, flowcell=None): py = qseq2fastq.__file__ args = [py, '-i', qseq_file, '-o', target_pathname ] @@ -237,9 +237,9 @@ queue script = """arguments="%s" queue """ % (" ".join(args)) - - return script - + + return script + def make_submit_script(target, header, body_list): """ write out a text file @@ -247,7 +247,7 @@ def make_submit_script(target, header, body_list): this was intended for condor submit scripts Args: - target (str or stream): + target (str or stream): if target is a string, we will open and close the file if target is a stream, the caller is responsible. diff --git a/htsworkflow/submission/daf.py b/htsworkflow/submission/daf.py index bed304a..e477629 100644 --- a/htsworkflow/submission/daf.py +++ b/htsworkflow/submission/daf.py @@ -318,7 +318,7 @@ class DAFMapper(object): dafTermOntology['name'])) if view_name is None: errmsg = 'Could not find view name for {0}' - logging.warning(errmsg.format(str(view))) + logger.warning(errmsg.format(str(view))) return view_name = str(view_name) @@ -378,7 +378,7 @@ class DAFMapper(object): md5 = make_md5sum(submission_pathname) if md5 is None: errmsg = "Unable to produce md5sum for {0}" - logging.warning(errmsg.format(submission_pathname)) + logger.warning(errmsg.format(submission_pathname)) else: self.model.add_statement( RDF.Statement(fileNode, dafTermOntology['md5sum'], md5)) diff --git a/htsworkflow/util/api.py b/htsworkflow/util/api.py index 9685408..4c4a6e5 100644 --- a/htsworkflow/util/api.py +++ b/htsworkflow/util/api.py @@ -15,6 +15,7 @@ import urllib import urllib2 import urlparse +LOGGER = logging.getLogger(__name__) def add_auth_options(parser): """Add options OptParser configure authentication options @@ -24,7 +25,7 @@ def add_auth_options(parser): config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini' ]) - + sequence_archive = None apiid = None apikey = None @@ -52,7 +53,7 @@ def make_auth_from_opts(opts, parser): """ if opts.host is None or opts.apiid is None or opts.apikey is None: parser.error("Please specify host url, apiid, apikey") - + return {'apiid': opts.apiid, 'apikey': opts.apikey } @@ -100,7 +101,7 @@ def flowcell_url(root_url, flowcell_id): def lanes_for_user_url(root_url, username): """ Return the url for returning all the lanes associated with a username - + Args: username (str): a username in your target filesystem root_url (str): the root portion of the url, e.g. http://localhost @@ -126,12 +127,12 @@ def retrieve_info(url, apidata): web = urllib2.urlopen(url, apipayload) except urllib2.URLError, e: if hasattr(e, 'code') and e.code == 404: - logging.info("%s was not found" % (url,)) + LOGGER.info("%s was not found" % (url,)) return None else: errmsg = 'URLError: %s' % (str(e)) raise IOError(errmsg) - + contents = web.read() headers = web.info() @@ -156,4 +157,4 @@ class HtswApi(object): def get_url(self, url): return retrieve_info(url, self.authdata) - + diff --git a/htsworkflow/util/queuecommands.py b/htsworkflow/util/queuecommands.py index 23fff16..2b7da65 100644 --- a/htsworkflow/util/queuecommands.py +++ b/htsworkflow/util/queuecommands.py @@ -1,5 +1,5 @@ """ -Run up to N simultanous jobs from provided of commands +Run up to N simultanous jobs from provided of commands """ import logging @@ -10,6 +10,8 @@ import select import sys import time +LOGGER = logging.getLogger(__name__) + class QueueCommands(object): """ Queue up N commands from cmd_list, launching more jobs as the first @@ -19,11 +21,11 @@ class QueueCommands(object): def __init__(self, cmd_list, N=0, cwd=None, env=None): """ cmd_list is a list of elements suitable for subprocess - N is the number of simultanious processes to run. + N is the number of simultanious processes to run. 0 is all of them. - + WARNING: this will not work on windows - (It depends on being able to pass local file descriptors to the + (It depends on being able to pass local file descriptors to the select call with isn't supported by the Win32 API) """ self.to_run = cmd_list[:] @@ -49,25 +51,22 @@ class QueueCommands(object): Launch jobs until we have the maximum allowable running (or have run out of jobs) """ - queue_log = logging.getLogger('queue') - while (len(self.to_run) > 0) and self.under_process_limit(): - queue_log.info('%d left to run', len(self.to_run)) + LOGGER.info('%d left to run', len(self.to_run)) cmd = self.to_run.pop(0) - p = subprocess.Popen(cmd, - stdout=PIPE, - shell=True, - cwd=self.cwd, + p = subprocess.Popen(cmd, + stdout=PIPE, + shell=True, + cwd=self.cwd, env=self.env) self.running[p.stdout] = p - queue_log.info("Created process %d from %s" % (p.pid, str(cmd))) + LOGGER.info("Created process %d from %s" % (p.pid, str(cmd))) def run(self): """ run up to N jobs until we run out of jobs """ - queue_log = logging.getLogger('queue') - queue_log.debug('using %s as cwd' % (self.cwd,)) + LOGGER.debug('using %s as cwd' % (self.cwd,)) # to_run slowly gets consumed by start_jobs while len(self.to_run) > 0 or len(self.running) > 0: @@ -87,13 +86,13 @@ class QueueCommands(object): pending = self.running[pending_fd] # if it really did finish, remove it from running jobs if pending.poll() is not None: - queue_log.info("Process %d finished [%d]", - pending.pid, pending.returncode) + LOGGER.info("Process %d finished [%d]", + pending.pid, pending.returncode) del self.running[pending_fd] else: # It's still running, but there's some output buffer = pending_fd.readline() buffer = buffer.strip() msg = "%d:(%d) %s" %(pending.pid, len(buffer), buffer) - logging.debug(msg) + LOGGER.debug(msg) time.sleep(1) diff --git a/htsworkflow/util/test/test_queuecommands.py b/htsworkflow/util/test/test_queuecommands.py index f4807d7..f52e428 100644 --- a/htsworkflow/util/test/test_queuecommands.py +++ b/htsworkflow/util/test/test_queuecommands.py @@ -3,7 +3,6 @@ import logging import time import unittest - from htsworkflow.util.queuecommands import QueueCommands class testQueueCommands(unittest.TestCase): @@ -11,7 +10,7 @@ class testQueueCommands(unittest.TestCase): logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)-8s %(message)s') - + def test_unlimited_run_slow(self): """ @@ -45,7 +44,7 @@ class testQueueCommands(unittest.TestCase): end = time.time()-start # pity I had to add a 1 second sleep self.failUnless( end > 5.9 and end < 6.1, - "took %s seconds, expected ~6" % (end,)) + "took %s seconds, expected ~6" % (end,)) def suite(): return unittest.makeSuite(testQueueCommands, 'test') diff --git a/htsworkflow/version.py b/htsworkflow/version.py index ca7390d..8097edb 100644 --- a/htsworkflow/version.py +++ b/htsworkflow/version.py @@ -1,19 +1,21 @@ import logging +LOGGER = logging.getLogger(__name__) + def version(): - """Return version number + """Return version number """ version = None try: import pkg_resources except ImportError, e: - logging.error("Can't find version number, please install setuptools") + LOGGER.error("Can't find version number, please install setuptools") raise e try: version = pkg_resources.get_distribution("htsworkflow") except pkg_resources.DistributionNotFound, e: - logging.error("Package not installed") + LOGGER.error("Package not installed") return version - + diff --git a/scripts/htsw-gerald2bed b/scripts/htsw-gerald2bed index 7a726e7..768cd09 100755 --- a/scripts/htsw-gerald2bed +++ b/scripts/htsw-gerald2bed @@ -10,6 +10,8 @@ import os from htsworkflow.util.makebed import make_bed_from_eland_stream, make_description +LOGGER = logging.getLogger(__name__) + def make_bed_for_gerald(eland_dir, output_dir, prefix, database, flowcell): """ convert s_[1-8]_eland_result.txt to corresponding bed files @@ -19,12 +21,12 @@ def make_bed_for_gerald(eland_dir, output_dir, prefix, database, flowcell): if len(out_files) > 0: raise RuntimeError("please move old bedfiles") - logging.info('Processing %s using flowcell id %s' % (eland_dir, flowcell)) + LOGGER.info('Processing %s using flowcell id %s' % (eland_dir, flowcell)) for pathname in eland_files: path, name = os.path.split(pathname) lane = int(name[2]) outname = 's_%d_eland_result.bed' %(lane,) - logging.info('Converting lane %d to %s' % (lane, outname)) + LOGGER.info('Converting lane %d to %s' % (lane, outname)) outpathname = os.path.join(eland_dir, outname) # look up descriptions @@ -66,7 +68,7 @@ s_[12345678]_eland_result.txt""" return parser def main(command_line=None): - logging.basicConfig(level=logging.WARNING) + LOGGER.basicConfig(level=logging.WARNING) if command_line is None: command_line = sys.argv[1:] diff --git a/scripts/htsw-runfolder b/scripts/htsw-runfolder index 7e35c97..b9dfa90 100755 --- a/scripts/htsw-runfolder +++ b/scripts/htsw-runfolder @@ -7,24 +7,24 @@ Runfolder.py can generate a xml file capturing all the 'interesting' parameters * start/stop cycle numbers * Firecrest, bustard, gerald version numbers * Eland analysis types, and everything in the eland configuration file. - * cluster numbers and other values from the Summary.htm - LaneSpecificParameters table. + * cluster numbers and other values from the Summary.htm + LaneSpecificParameters table. * How many reads mapped to a genome from an eland file The ELAND "mapped reads" counter will also check for eland squashed file -that were symlinked from another directory. This is so I can track how -many reads landed on the genome of interest and on the spike ins. +that were symlinked from another directory. This is so I can track how +many reads landed on the genome of interest and on the spike ins. Basically my subdirectories something like: genomes/hg18 genomes/hg18/chr*.2bpb <- files for hg18 genome -genomes/hg18/chr*.vld +genomes/hg18/chr*.vld genomes/hg18/VATG.fa.2bp <- symlink to genomes/spikeins -genomes/spikein +genomes/spikein -runfolder.py can also spit out a simple summary report (-s option) -that contains the per lane post filter cluster numbers and the mapped +runfolder.py can also spit out a simple summary report (-s option) +that contains the per lane post filter cluster numbers and the mapped read counts. (The report isn't currently very pretty) """ from glob import glob @@ -36,6 +36,8 @@ import sys from htsworkflow.pipelines import runfolder from htsworkflow.pipelines.runfolder import ElementTree +LOGGER = logging.getLogger(__name__) + def make_parser(): usage = 'usage: %prog [options] runfolder_root_dir' parser = optparse.OptionParser(usage) @@ -91,10 +93,9 @@ def main(cmdlist=None): logging.basicConfig() if opt.verbose: - root_log = logging.getLogger() - root_log.setLevel(logging.INFO) + LOGGER.setLevel(logging.INFO) - logging.info('Starting htsworkflow illumina runfolder processing tool.') + LOGGER.info('Starting htsworkflow illumina runfolder processing tool.') runs = [] if opt.run_xml: # handle ~ shortcut @@ -108,7 +109,7 @@ def main(cmdlist=None): if specific_run is not None: runs.append(specific_run) else: - logging.warn("Couldn't find a run in %s" % (opt.use_run,)) + LOGGER.warn("Couldn't find a run in %s" % (opt.use_run,)) # scan runfolders for runs for run_pattern in args: diff --git a/scripts/htsw-update-archive b/scripts/htsw-update-archive index 2ccbec6..0cf33f3 100755 --- a/scripts/htsw-update-archive +++ b/scripts/htsw-update-archive @@ -11,6 +11,8 @@ import shelve from htsworkflow.util import api from htsworkflow.pipelines.sequences import scan_for_sequences +LOGGER = logging.getLogger(__name__) + def build_flowcell_db(fcdb_filename, sequences, baseurl, apiid, apikey): """ compare our flowcell database with our list of sequences and return @@ -36,7 +38,7 @@ def build_flowcell_db(fcdb_filename, sequences, baseurl, apiid, apikey): if flowcell_info is not None: seq_library_id = flowcell_info['lane_set'][unicode(seq.lane)]['library_id'] libdb.setdefault(seq_library_id, []).append(seq) - + fcdb.sync() return fcdb, libdb @@ -51,21 +53,21 @@ def carefully_make_hardlink(source, destination, dry_run=False): If we didn't update anything return 0, if we did update return 1. """ - logging.debug("CHECKING: %s -> %s", source, destination) + LOGGER.debug("CHECKING: %s -> %s", source, destination) if not os.path.exists(source): - logging.warning("%s doesn't exist", source) + LOGGER.warning("%s doesn't exist", source) return 0 if os.path.exists(destination): if os.path.samefile(source, destination): - logging.debug('SAME: %s -> %s' % (source, destination)) + LOGGER.debug('SAME: %s -> %s' % (source, destination)) return 0 else: - logging.error('%s and %s are different files, skipping' % \ - (source, destination)) + LOGGER.error('%s and %s are different files, skipping' % \ + (source, destination)) return 0 - logging.debug('Linking: %s -> %s' % (source, destination)) + LOGGER.debug('Linking: %s -> %s' % (source, destination)) # we would do something by this part if dry_run: return 1 @@ -74,7 +76,7 @@ def carefully_make_hardlink(source, destination, dry_run=False): os.chmod(destination, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ) return 1 - + def make_library_links(root, library_db, dry_run=False): """ Make a tree of sequencer roots organized by library id @@ -87,10 +89,10 @@ def make_library_links(root, library_db, dry_run=False): for lib_id, sequences in library_db.items(): target_dir = os.path.join(root, lib_id) if not os.path.exists(target_dir): - logging.info("mkdir %s" % (target_dir,)) + LOGGER.info("mkdir %s" % (target_dir,)) if not dry_run: os.mkdir(target_dir) - + for s in sequences: count += carefully_make_hardlink(s.path, s.make_target_name(target_dir), @@ -105,7 +107,7 @@ def configure_logging(opts): if opts.debug: level = logging.DEBUG logging.basicConfig(level=level) - + def configure_opts(opts): """ @@ -152,7 +154,7 @@ def configure_opts(opts): if opts.apikey is None and config_file.has_option(SECTION_NAME, APIKEY_OPT): opts.apikey = config_file.get(SECTION_NAME, APIKEY_OPT) - + return opts def make_parser(): @@ -165,14 +167,14 @@ def make_parser(): 'sequence archive section') parser.add_option('--cache', default=None, help="default flowcell cache") - + parser.add_option('--host', default=None, help="specify http://host for quering flowcell information") parser.add_option('--apiid', default=None, help="API ID to use when retriving information") parser.add_option("--apikey", default=None, help="API Key for when retriving information") - + parser.add_option('-a', '--sequence-archive', default=None, help='path to where the sequence archive lives') @@ -180,7 +182,7 @@ def make_parser(): help='be more verbose') parser.add_option('-d', '--debug', action='store_true', default=False, help='report everything') - + parser.add_option("--dry-run", dest="dry_run", action="store_true", default=False, help="Don't modify the filesystem") @@ -192,7 +194,7 @@ def main(cmdline=None): configure_logging(opts) opts = configure_opts(opts) - + # complain if critical things are missing if opts.cache is None: parser.error('Need location of htsworkflow frontend database') @@ -203,17 +205,17 @@ def main(cmdline=None): seq_dirs = [ opts.flowcells, opts.srfs ] if len(args) > 0: seq_dirs = [os.path.abspath(f) for f in args] - + seqs = scan_for_sequences(seq_dirs) fcdb, libdb = build_flowcell_db(opts.cache, seqs, opts.host, opts.apiid, opts.apikey) updates = make_library_links(opts.library_tree, libdb, dry_run=opts.dry_run) - - logging.warn("%s flowcells in database" % (len(fcdb),)) - logging.warn("found %s sequence files" % (len(seqs),)) - logging.warn("%s libraries being checked" % (len(libdb),)) - logging.warn("%s sequence files were linked" % (updates,)) - + + LOGGER.warn("%s flowcells in database" % (len(fcdb),)) + LOGGER.warn("found %s sequence files" % (len(seqs),)) + LOGGER.warn("%s libraries being checked" % (len(libdb),)) + LOGGER.warn("%s sequence files were linked" % (updates,)) + return 0 - + if __name__ == "__main__": main() diff --git a/scripts/rerun_eland.py b/scripts/rerun_eland.py index af06cdd..01c7a9f 100644 --- a/scripts/rerun_eland.py +++ b/scripts/rerun_eland.py @@ -10,34 +10,36 @@ from htsworkflow.pipelines import gerald from htsworkflow.pipelines.eland import extract_eland_sequence from htsworkflow.pipelines import runfolder +LOGGER = logging.getLogger(__name__) + def make_query_filename(eland_obj, output_dir): - query_name = '%s_%s_eland_query.txt' + query_name = '%s_%s_eland_query.txt' query_name %= (eland_obj.sample_name, eland_obj.lane_id) query_pathname = os.path.join(output_dir, query_name) - + if os.path.exists(query_pathname): - logging.warn("overwriting %s" % (query_pathname,)) + LOGGER.warn("overwriting %s" % (query_pathname,)) return query_pathname def make_result_filename(eland_obj, output_dir): - result_name = '%s_%s_eland_result.txt' + result_name = '%s_%s_eland_result.txt' result_name %= (eland_obj.sample_name, eland_obj.lane_id) result_pathname = os.path.join(output_dir, result_name) - + if os.path.exists(result_pathname): - logging.warn("overwriting %s" % (result_pathname,)) + LOGGER.warn("overwriting %s" % (result_pathname,)) return result_pathname def extract_sequence(inpathname, query_pathname, length, dry_run=False): - logging.info('extracting %d bases' %(length,)) - logging.info('extracting from %s' %(inpathname,)) - logging.info('extracting to %s' %(query_pathname,)) - - if not dry_run: + LOGGER.info('extracting %d bases' %(length,)) + LOGGER.info('extracting from %s' %(inpathname,)) + LOGGER.info('extracting to %s' %(query_pathname,)) + + if not dry_run: try: instream = open(inpathname, 'r') outstream = open(query_pathname, 'w') @@ -45,13 +47,13 @@ def extract_sequence(inpathname, query_pathname, length, dry_run=False): finally: outstream.close() instream.close() - + def run_eland(length, query_name, genome, result_name, multi=False, dry_run=False): cmdline = ['eland_%d' % (length,), query_name, genome, result_name] if multi: cmdline += ['--multi'] - logging.info('running eland: ' + " ".join(cmdline)) + LOGGER.info('running eland: ' + " ".join(cmdline)) if not dry_run: return subprocess.Popen(cmdline) else: @@ -62,12 +64,12 @@ def rerun(gerald_dir, output_dir, length=25, dry_run=False): """ look for eland files in gerald_dir and write a subset to output_dir """ - logging.info("Extracting %d bp from files in %s" % (length, gerald_dir)) + LOGGER.info("Extracting %d bp from files in %s" % (length, gerald_dir)) g = gerald.gerald(gerald_dir) # this will only work if we're only missing the last dir in output_dir if not os.path.exists(output_dir): - logging.info("Making %s" %(output_dir,)) + LOGGER.info("Making %s" %(output_dir,)) if not dry_run: os.mkdir(output_dir) processes = [] @@ -80,23 +82,23 @@ def rerun(gerald_dir, output_dir, length=25, dry_run=False): extract_sequence(inpathname, query_pathname, length, dry_run=dry_run) - p = run_eland(length, - query_pathname, - lane_param.eland_genome, - result_pathname, + p = run_eland(length, + query_pathname, + lane_param.eland_genome, + result_pathname, dry_run=dry_run) if p is not None: processes.append(p) for p in processes: p.wait() - + def make_parser(): usage = '%prog: [options] runfolder' parser = OptionParser(usage) - - parser.add_option('--gerald', + + parser.add_option('--gerald', help='specify location of GERALD directory', default=None) parser.add_option('-o', '--output', @@ -134,20 +136,20 @@ def main(cmdline=None): opts.gerald = runs[0].gerald.pathname if opts.output is None: opts.output = os.path.join( - runs[0].pathname, - 'Data', + runs[0].pathname, + 'Data', # pythons 0..n ==> elands 1..n+1 - 'C1-%d' % (opts.length+1,) + 'C1-%d' % (opts.length+1,) ) elif opts.gerald is None: parser.error("need gerald directory") - + if opts.output is None: parser.error("specify location for the new eland files") if opts.verbose: - root_logger = logging.getLogger() + root_logger = logging.getLogger('rerun_eland') root_logger.setLevel(logging.INFO) rerun(opts.gerald, opts.output, opts.length, dry_run=opts.dry_run) -- 2.30.2