Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue987 work in progress (do not merge) #988

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion circus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def watcher_defaults():
'send_hup': False,
'stop_signal': signal.SIGTERM,
'stop_children': False,
'async_kill': False,
'max_retry': 5,
'graceful_timeout': 30,
'rlimits': dict(),
Expand Down Expand Up @@ -223,7 +224,8 @@ def get_config(config_file):
# default bool to False
elif opt in ('shell', 'send_hup', 'stop_children',
'close_child_stderr', 'use_sockets', 'singleton',
'copy_env', 'copy_path', 'close_child_stdout'):
'copy_env', 'copy_path', 'close_child_stdout',
'async_kill'):
watcher[opt] = dget(section, opt, False, bool)
elif opt == 'stop_signal':
watcher['stop_signal'] = to_signum(val)
Expand Down
88 changes: 64 additions & 24 deletions circus/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from psutil import NoSuchProcess, TimeoutExpired
import zmq.utils.jsonapi as json
from zmq.eventloop import ioloop
from functools import partial

from circus.process import Process, DEAD_OR_ZOMBIE, UNEXISTING
from circus.papa_process_proxy import PapaProcessProxy
Expand Down Expand Up @@ -69,6 +70,12 @@ class Watcher(object):
- **stop_children**: send the **stop_signal** to the children too.
Defaults to False.

- **async_kill**: don't wait for kill total completion (SIGTERM +
graceful timeout + SIGKILL), can help if you have a big graceful
timeout and if you don't worry about to have more processes than
numprocesses during the graceful killing phase.
Defaults to False.

- **env**: a mapping containing the environment variables the command
will run with. Optional.

Expand Down Expand Up @@ -194,7 +201,7 @@ class Watcher(object):

def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
working_dir=None, shell=False, shell_args=None, uid=None,
max_retry=5, gid=None, send_hup=False,
max_retry=5, gid=None, send_hup=False, async_kill=False,
stop_signal=signal.SIGTERM, stop_children=False, env=None,
graceful_timeout=30.0, prereload_fn=None, rlimits=None,
executable=None, stdout_stream=None, stderr_stream=None,
Expand Down Expand Up @@ -278,6 +285,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,

self.working_dir = working_dir
self.processes = {}
self.async_killing_futures = {}
self.shell = shell
self.shell_args = shell_args
self.uid = uid
Expand Down Expand Up @@ -311,6 +319,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
self.send_hup = send_hup
self.stop_signal = stop_signal
self.stop_children = stop_children
self.async_kill = async_kill
self.sockets = self.evpub_socket = None
self.arbiter = None
self.hooks = {}
Expand Down Expand Up @@ -520,6 +529,10 @@ def reap_processes(self):
for pid in list(self.processes.keys()):
self.reap_process(pid)

def _async_kill_cb(self, pid, future):
logger.debug("_async_kill_cb called")
self.async_killing_futures.pop(pid)

@gen.coroutine
@util.debuglog
def manage_processes(self):
Expand All @@ -539,8 +552,10 @@ def manage_processes(self):
if len(self.processes) < self.numprocesses and not self.is_stopping():
if self.respawn:
yield self.spawn_processes()
elif not len(self.processes) and not self.on_demand:
yield self._stop()
elif not len(self.processes) and \
not self.on_demand and \
len(self.async_killing_futures) == 0:
yield self._stop(True)

# removing extra processes
if len(self.processes) > self.numprocesses:
Expand All @@ -552,23 +567,40 @@ def manage_processes(self):
self.processes.pop(process.pid)
else:
processes_to_kill.append(process)

removes = yield [self.kill_process(process)
for process in processes_to_kill]
for i, process in enumerate(processes_to_kill):
if removes[i]:
if self.async_kill:
for process in processes_to_kill:
self.processes.pop(process.pid)
future = self.kill_process(process)
self.async_killing_futures[process.pid] = future
self.loop.add_future(future,
partial(self._async_kill_cb,
process.pid))
else:
removes = yield [self.kill_process(process)
for process in processes_to_kill]
for i, process in enumerate(processes_to_kill):
if removes[i]:
self.processes.pop(process.pid)

@gen.coroutine
@util.debuglog
def remove_expired_processes(self):
expired_processes = [p for p in self.processes.values()
if p.age() > (self.max_age + randint(0,
self.max_age_variance))]
removes = yield [self.kill_process(x) for x in expired_processes]
for i, process in enumerate(expired_processes):
if removes[i]:
if self.async_kill:
for process in expired_processes:
self.processes.pop(process.pid)
future = self.kill_process(process)
self.async_killing_futures[process.pid] = future
self.loop.add_future(future,
partial(self._async_kill_cb,
process.pid))
else:
removes = yield [self.kill_process(x) for x in expired_processes]
for i, process in enumerate(expired_processes):
if removes[i]:
self.processes.pop(process.pid)

@gen.coroutine
@util.debuglog
Expand Down Expand Up @@ -597,7 +629,8 @@ def spawn_processes(self):
for i in range(self.numprocesses - len(self.processes)):
res = self.spawn_process()
if res is False:
yield self._stop()
if len(self.async_killing_futures) == 0:
yield self._stop(True)
break
delay = self.warmup_delay
if isinstance(res, float):
Expand Down Expand Up @@ -691,7 +724,7 @@ def send_signal_process(self, process, signum):
children = process.children()

# sending the signal to the process itself
self.send_signal(process.pid, signum)
self._send_signal(process, signum)
self.notify_event("kill", {"process_pid": process.pid,
"time": time.time()})
except NoSuchProcess:
Expand Down Expand Up @@ -726,7 +759,7 @@ def kill_process(self, process, stop_signal=None, graceful_timeout=None):
if self.stop_children:
self.send_signal_process(process, stop_signal)
else:
self.send_signal(process.pid, stop_signal)
self._send_signal(process, stop_signal)
self.notify_event("kill", {"process_pid": process.pid,
"time": time.time()})
except NoSuchProcess:
Expand Down Expand Up @@ -768,18 +801,22 @@ def kill_processes(self, stop_signal=None, graceful_timeout=None):
raise

@util.debuglog
def send_signal(self, pid, signum):
def _send_signal(self, process, signum):
is_sigkill = hasattr(signal, 'SIGKILL') and signum == signal.SIGKILL
pid = process.pid
hook_result = self.call_hook("before_signal",
pid=pid, signum=signum)
if not is_sigkill and not hook_result:
logger.debug("before_signal hook didn't return True "
"=> signal %i is not sent to %i" % (signum, pid))
else:
process.send_signal(signum)
self.call_hook("after_signal", pid=pid, signum=signum)

@util.debuglog
def send_signal(self, pid, signum):
if pid in self.processes:
process = self.processes[pid]
hook_result = self.call_hook("before_signal",
pid=pid, signum=signum)
if not is_sigkill and not hook_result:
logger.debug("before_signal hook didn't return True "
"=> signal %i is not sent to %i" % (signum, pid))
else:
process.send_signal(signum)
self.call_hook("after_signal", pid=pid, signum=signum)
self._send_signal(self.processes[pid], signum)
else:
logger.debug('process %s does not exist' % pid)

Expand Down Expand Up @@ -846,6 +883,9 @@ def _stop(self, close_output_streams=False, for_shutdown=False):
# We ignore the hook result
self.call_hook('before_stop')
yield self.kill_processes()
if len(self.async_killing_futures) > 0:
yield self.async_killing_futures.values()
self.async_killing_futures = {}
self.reap_processes()

# stop redirectors
Expand Down