diff --git a/circus/config.py b/circus/config.py index 4f5672351..1a5117270 100644 --- a/circus/config.py +++ b/circus/config.py @@ -31,6 +31,7 @@ def watcher_defaults(): 'send_hup': False, 'stop_signal': signal.SIGTERM, 'stop_children': False, + 'async_kill': False, 'max_retry': 5, 'graceful_timeout': 30, 'rlimits': dict(), @@ -223,7 +224,8 @@ def get_config(config_file): # default bool to False elif opt in ('shell', 'send_hup', 'stop_children', 'close_child_stderr', 'use_sockets', 'singleton', - 'copy_env', 'copy_path', 'close_child_stdout'): + 'copy_env', 'copy_path', 'close_child_stdout', + 'async_kill'): watcher[opt] = dget(section, opt, False, bool) elif opt == 'stop_signal': watcher['stop_signal'] = to_signum(val) diff --git a/circus/watcher.py b/circus/watcher.py index 84cf6f62a..0c2b5a342 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -17,6 +17,7 @@ from psutil import NoSuchProcess, TimeoutExpired import zmq.utils.jsonapi as json from zmq.eventloop import ioloop +from functools import partial from circus.process import Process, DEAD_OR_ZOMBIE, UNEXISTING from circus.papa_process_proxy import PapaProcessProxy @@ -69,6 +70,12 @@ class Watcher(object): - **stop_children**: send the **stop_signal** to the children too. Defaults to False. + - **async_kill**: don't wait for kill total completion (SIGTERM + + graceful timeout + SIGKILL), can help if you have a big graceful + timeout and if you don't worry about to have more processes than + numprocesses during the graceful killing phase. + Defaults to False. + - **env**: a mapping containing the environment variables the command will run with. Optional. @@ -194,7 +201,7 @@ class Watcher(object): def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., working_dir=None, shell=False, shell_args=None, uid=None, - max_retry=5, gid=None, send_hup=False, + max_retry=5, gid=None, send_hup=False, async_kill=False, stop_signal=signal.SIGTERM, stop_children=False, env=None, graceful_timeout=30.0, prereload_fn=None, rlimits=None, executable=None, stdout_stream=None, stderr_stream=None, @@ -278,6 +285,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., self.working_dir = working_dir self.processes = {} + self.async_killing_futures = {} self.shell = shell self.shell_args = shell_args self.uid = uid @@ -311,6 +319,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., self.send_hup = send_hup self.stop_signal = stop_signal self.stop_children = stop_children + self.async_kill = async_kill self.sockets = self.evpub_socket = None self.arbiter = None self.hooks = {} @@ -520,6 +529,10 @@ def reap_processes(self): for pid in list(self.processes.keys()): self.reap_process(pid) + def _async_kill_cb(self, pid, future): + logger.debug("_async_kill_cb called") + self.async_killing_futures.pop(pid) + @gen.coroutine @util.debuglog def manage_processes(self): @@ -539,8 +552,10 @@ def manage_processes(self): if len(self.processes) < self.numprocesses and not self.is_stopping(): if self.respawn: yield self.spawn_processes() - elif not len(self.processes) and not self.on_demand: - yield self._stop() + elif not len(self.processes) and \ + not self.on_demand and \ + len(self.async_killing_futures) == 0: + yield self._stop(True) # removing extra processes if len(self.processes) > self.numprocesses: @@ -552,12 +567,20 @@ def manage_processes(self): self.processes.pop(process.pid) else: processes_to_kill.append(process) - - removes = yield [self.kill_process(process) - for process in processes_to_kill] - for i, process in enumerate(processes_to_kill): - if removes[i]: + if self.async_kill: + for process in processes_to_kill: self.processes.pop(process.pid) + future = self.kill_process(process) + self.async_killing_futures[process.pid] = future + self.loop.add_future(future, + partial(self._async_kill_cb, + process.pid)) + else: + removes = yield [self.kill_process(process) + for process in processes_to_kill] + for i, process in enumerate(processes_to_kill): + if removes[i]: + self.processes.pop(process.pid) @gen.coroutine @util.debuglog @@ -565,10 +588,19 @@ def remove_expired_processes(self): expired_processes = [p for p in self.processes.values() if p.age() > (self.max_age + randint(0, self.max_age_variance))] - removes = yield [self.kill_process(x) for x in expired_processes] - for i, process in enumerate(expired_processes): - if removes[i]: + if self.async_kill: + for process in expired_processes: self.processes.pop(process.pid) + future = self.kill_process(process) + self.async_killing_futures[process.pid] = future + self.loop.add_future(future, + partial(self._async_kill_cb, + process.pid)) + else: + removes = yield [self.kill_process(x) for x in expired_processes] + for i, process in enumerate(expired_processes): + if removes[i]: + self.processes.pop(process.pid) @gen.coroutine @util.debuglog @@ -597,7 +629,8 @@ def spawn_processes(self): for i in range(self.numprocesses - len(self.processes)): res = self.spawn_process() if res is False: - yield self._stop() + if len(self.async_killing_futures) == 0: + yield self._stop(True) break delay = self.warmup_delay if isinstance(res, float): @@ -691,7 +724,7 @@ def send_signal_process(self, process, signum): children = process.children() # sending the signal to the process itself - self.send_signal(process.pid, signum) + self._send_signal(process, signum) self.notify_event("kill", {"process_pid": process.pid, "time": time.time()}) except NoSuchProcess: @@ -726,7 +759,7 @@ def kill_process(self, process, stop_signal=None, graceful_timeout=None): if self.stop_children: self.send_signal_process(process, stop_signal) else: - self.send_signal(process.pid, stop_signal) + self._send_signal(process, stop_signal) self.notify_event("kill", {"process_pid": process.pid, "time": time.time()}) except NoSuchProcess: @@ -768,18 +801,22 @@ def kill_processes(self, stop_signal=None, graceful_timeout=None): raise @util.debuglog - def send_signal(self, pid, signum): + def _send_signal(self, process, signum): is_sigkill = hasattr(signal, 'SIGKILL') and signum == signal.SIGKILL + pid = process.pid + hook_result = self.call_hook("before_signal", + pid=pid, signum=signum) + if not is_sigkill and not hook_result: + logger.debug("before_signal hook didn't return True " + "=> signal %i is not sent to %i" % (signum, pid)) + else: + process.send_signal(signum) + self.call_hook("after_signal", pid=pid, signum=signum) + + @util.debuglog + def send_signal(self, pid, signum): if pid in self.processes: - process = self.processes[pid] - hook_result = self.call_hook("before_signal", - pid=pid, signum=signum) - if not is_sigkill and not hook_result: - logger.debug("before_signal hook didn't return True " - "=> signal %i is not sent to %i" % (signum, pid)) - else: - process.send_signal(signum) - self.call_hook("after_signal", pid=pid, signum=signum) + self._send_signal(self.processes[pid], signum) else: logger.debug('process %s does not exist' % pid) @@ -846,6 +883,9 @@ def _stop(self, close_output_streams=False, for_shutdown=False): # We ignore the hook result self.call_hook('before_stop') yield self.kill_processes() + if len(self.async_killing_futures) > 0: + yield self.async_killing_futures.values() + self.async_killing_futures = {} self.reap_processes() # stop redirectors