From 41e04953b28906d80e2ddb4db58f95e466e95b22 Mon Sep 17 00:00:00 2001 From: Fabien MARTY Date: Tue, 14 Jun 2016 16:57:28 +0200 Subject: [PATCH 1/3] first commit (not tested at all :-)) --- circus/config.py | 4 +++- circus/watcher.py | 40 ++++++++++++++++++++++++++++++---------- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/circus/config.py b/circus/config.py index 4f5672351..1a5117270 100644 --- a/circus/config.py +++ b/circus/config.py @@ -31,6 +31,7 @@ def watcher_defaults(): 'send_hup': False, 'stop_signal': signal.SIGTERM, 'stop_children': False, + 'async_kill': False, 'max_retry': 5, 'graceful_timeout': 30, 'rlimits': dict(), @@ -223,7 +224,8 @@ def get_config(config_file): # default bool to False elif opt in ('shell', 'send_hup', 'stop_children', 'close_child_stderr', 'use_sockets', 'singleton', - 'copy_env', 'copy_path', 'close_child_stdout'): + 'copy_env', 'copy_path', 'close_child_stdout', + 'async_kill'): watcher[opt] = dget(section, opt, False, bool) elif opt == 'stop_signal': watcher['stop_signal'] = to_signum(val) diff --git a/circus/watcher.py b/circus/watcher.py index 84cf6f62a..00370330e 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -69,6 +69,12 @@ class Watcher(object): - **stop_children**: send the **stop_signal** to the children too. Defaults to False. + - **async_kill**: don't wait for kill total completion (SIGTERM + + graceful timeout + SIGKILL), can help if you have a big graceful + timeout and if you don't worry about to have more processes than + numprocesses during the graceful killing phase. + Defaults to False. + - **env**: a mapping containing the environment variables the command will run with. Optional. @@ -194,7 +200,7 @@ class Watcher(object): def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., working_dir=None, shell=False, shell_args=None, uid=None, - max_retry=5, gid=None, send_hup=False, + max_retry=5, gid=None, send_hup=False, async_kill=False, stop_signal=signal.SIGTERM, stop_children=False, env=None, graceful_timeout=30.0, prereload_fn=None, rlimits=None, executable=None, stdout_stream=None, stderr_stream=None, @@ -311,6 +317,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., self.send_hup = send_hup self.stop_signal = stop_signal self.stop_children = stop_children + self.async_kill = async_kill self.sockets = self.evpub_socket = None self.arbiter = None self.hooks = {} @@ -520,6 +527,9 @@ def reap_processes(self): for pid in list(self.processes.keys()): self.reap_process(pid) + def _async_kill_cb(self, future): + pass + @gen.coroutine @util.debuglog def manage_processes(self): @@ -552,12 +562,16 @@ def manage_processes(self): self.processes.pop(process.pid) else: processes_to_kill.append(process) - - removes = yield [self.kill_process(process) - for process in processes_to_kill] - for i, process in enumerate(processes_to_kill): - if removes[i]: - self.processes.pop(process.pid) + if self.async_kill: + for process in processes_to_kill: + self.loop.add_future(self.kill_process(process), + self._async_kill_cb) + else: + removes = yield [self.kill_process(process) + for process in processes_to_kill] + for i, process in enumerate(processes_to_kill): + if removes[i]: + self.processes.pop(process.pid) @gen.coroutine @util.debuglog @@ -565,10 +579,16 @@ def remove_expired_processes(self): expired_processes = [p for p in self.processes.values() if p.age() > (self.max_age + randint(0, self.max_age_variance))] - removes = yield [self.kill_process(x) for x in expired_processes] - for i, process in enumerate(expired_processes): - if removes[i]: + if self.async_kill: + for process in expired_processes: self.processes.pop(process.pid) + self.loop.add_future(self.kill_process(process), + self._async_kill_cb) + else: + removes = yield [self.kill_process(x) for x in expired_processes] + for i, process in enumerate(expired_processes): + if removes[i]: + self.processes.pop(process.pid) @gen.coroutine @util.debuglog From 62117742dfcbd2823459fd92938d288edb955f1a Mon Sep 17 00:00:00 2001 From: Fabien MARTY Date: Wed, 15 Jun 2016 11:26:15 +0200 Subject: [PATCH 2/3] work in progress about #987 --- circus/watcher.py | 60 +++++++++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/circus/watcher.py b/circus/watcher.py index 00370330e..3b8e17f65 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -17,6 +17,7 @@ from psutil import NoSuchProcess, TimeoutExpired import zmq.utils.jsonapi as json from zmq.eventloop import ioloop +from functools import partial from circus.process import Process, DEAD_OR_ZOMBIE, UNEXISTING from circus.papa_process_proxy import PapaProcessProxy @@ -284,6 +285,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., self.working_dir = working_dir self.processes = {} + self.async_killing_futures = {} self.shell = shell self.shell_args = shell_args self.uid = uid @@ -527,8 +529,9 @@ def reap_processes(self): for pid in list(self.processes.keys()): self.reap_process(pid) - def _async_kill_cb(self, future): - pass + def _async_kill_cb(self, pid, future): + logger.debug("_async_kill_cb called") + self.async_killing_futures.pop(pid) @gen.coroutine @util.debuglog @@ -549,8 +552,10 @@ def manage_processes(self): if len(self.processes) < self.numprocesses and not self.is_stopping(): if self.respawn: yield self.spawn_processes() - elif not len(self.processes) and not self.on_demand: - yield self._stop() + elif not len(self.processes) and \ + not self.on_demand and \ + len(self.async_killing_futures) == 0: + yield self._stop(True) # removing extra processes if len(self.processes) > self.numprocesses: @@ -564,8 +569,12 @@ def manage_processes(self): processes_to_kill.append(process) if self.async_kill: for process in processes_to_kill: - self.loop.add_future(self.kill_process(process), - self._async_kill_cb) + self.processes.pop(process.pid) + future = self.kill_process(process) + self.async_killing_futures[process.pid] = future + self.loop.add_future(future, + partial(self._async_kill_cb, + process.pid)) else: removes = yield [self.kill_process(process) for process in processes_to_kill] @@ -582,8 +591,11 @@ def remove_expired_processes(self): if self.async_kill: for process in expired_processes: self.processes.pop(process.pid) - self.loop.add_future(self.kill_process(process), - self._async_kill_cb) + future = self.kill_process(process) + self.async_killing_futures[process.pid] = future + self.loop.add_future(future, + partial(self._async_kill_cb, + process.pid)) else: removes = yield [self.kill_process(x) for x in expired_processes] for i, process in enumerate(expired_processes): @@ -617,7 +629,8 @@ def spawn_processes(self): for i in range(self.numprocesses - len(self.processes)): res = self.spawn_process() if res is False: - yield self._stop() + if len(self.async_killing_futures) == 0: + yield self._stop(True) break delay = self.warmup_delay if isinstance(res, float): @@ -746,7 +759,7 @@ def kill_process(self, process, stop_signal=None, graceful_timeout=None): if self.stop_children: self.send_signal_process(process, stop_signal) else: - self.send_signal(process.pid, stop_signal) + self._send_signal(process, stop_signal) self.notify_event("kill", {"process_pid": process.pid, "time": time.time()}) except NoSuchProcess: @@ -788,18 +801,22 @@ def kill_processes(self, stop_signal=None, graceful_timeout=None): raise @util.debuglog - def send_signal(self, pid, signum): + def _send_signal(self, process, signum): is_sigkill = hasattr(signal, 'SIGKILL') and signum == signal.SIGKILL + pid = process.pid + hook_result = self.call_hook("before_signal", + pid=pid, signum=signum) + if not is_sigkill and not hook_result: + logger.debug("before_signal hook didn't return True " + "=> signal %i is not sent to %i" % (signum, pid)) + else: + process.send_signal(signum) + self.call_hook("after_signal", pid=pid, signum=signum) + + @util.debuglog + def send_signal(self, pid, signum): if pid in self.processes: - process = self.processes[pid] - hook_result = self.call_hook("before_signal", - pid=pid, signum=signum) - if not is_sigkill and not hook_result: - logger.debug("before_signal hook didn't return True " - "=> signal %i is not sent to %i" % (signum, pid)) - else: - process.send_signal(signum) - self.call_hook("after_signal", pid=pid, signum=signum) + self._send_signal(self.processes[pid], signum) else: logger.debug('process %s does not exist' % pid) @@ -866,6 +883,9 @@ def _stop(self, close_output_streams=False, for_shutdown=False): # We ignore the hook result self.call_hook('before_stop') yield self.kill_processes() + if len(self.async_killing_futures) > 0: + yield self.async_killing_futures.values() + self.async_killing_futures = {} self.reap_processes() # stop redirectors From c918f4e9a309523142ddd553d45f227acaee9f60 Mon Sep 17 00:00:00 2001 From: Fabien MARTY Date: Fri, 1 Jul 2016 09:19:34 +0200 Subject: [PATCH 3/3] bugfix with async_kill --- circus/watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/circus/watcher.py b/circus/watcher.py index 3b8e17f65..0c2b5a342 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -724,7 +724,7 @@ def send_signal_process(self, process, signum): children = process.children() # sending the signal to the process itself - self.send_signal(process.pid, signum) + self._send_signal(process, signum) self.notify_event("kill", {"process_pid": process.pid, "time": time.time()}) except NoSuchProcess: