diff --git a/circus/arbiter.py b/circus/arbiter.py index 1b98f9869..4d7d548d7 100644 --- a/circus/arbiter.py +++ b/circus/arbiter.py @@ -2,6 +2,7 @@ import logging import os import gc +import operator from circus.fixed_threading import Thread, get_ident import sys import select @@ -473,6 +474,16 @@ def load_from_config(cls, config_file, loop=None): def iter_watchers(self, reverse=True): return sorted(self.watchers, key=lambda a: a.priority, reverse=reverse) + def iter_active_watchers(self, fd, reverse=True): + a_watchers = [] + for watcher in self.watchers: + wanted_sockets = [name for name, sock in watcher.sockets.items() + if sock.fileno() == fd] + if len(set(watcher.sockets).intersection(set(wanted_sockets))) > 0: + a_watchers.append(watcher) + return sorted(a_watchers, key=operator.attrgetter('priority'), + reverse=reverse) + @debuglog def initialize(self): # set process title @@ -647,9 +658,9 @@ def manage_watchers(self): if need_on_demand: sockets = [x.fileno() for x in self.sockets.values()] rlist, wlist, xlist = select.select(sockets, [], [], 0) - if rlist: + for r in rlist: self.socket_event = True - self._start_watchers() + self._start_watchers(watcher_fd=r) self.socket_event = False @synchronized("arbiter_reload") @@ -743,8 +754,10 @@ def start_watchers(self, watcher_iter_func=None): yield self._start_watchers(watcher_iter_func=watcher_iter_func) @gen.coroutine - def _start_watchers(self, watcher_iter_func=None): - if watcher_iter_func is None: + def _start_watchers(self, watcher_iter_func=None, watcher_fd=None): + if watcher_fd is not None: + watchers = self.iter_active_watchers(watcher_fd) + elif watcher_iter_func is None: watchers = self.iter_watchers() else: watchers = watcher_iter_func() diff --git a/circus/sockets.py b/circus/sockets.py index d80c53179..880db0c8a 100644 --- a/circus/sockets.py +++ b/circus/sockets.py @@ -37,7 +37,8 @@ class PapaSocketProxy(object): def __init__(self, name='', host=None, port=None, family=None, type=None, proto=None, backlog=None, path=None, umask=None, replace=None, - interface=None, so_reuseport=False, blocking=False): + interface=None, so_reuseport=False, blocking=False, + group=None): if path is not None: if not hasattr(socket, 'AF_UNIX'): raise NotImplementedError("AF_UNIX not supported on this" @@ -73,6 +74,7 @@ def __init__(self, name='', host=None, port=None, self.so_reuseport = papa_socket.get('so_reuseport', False) self._fileno = papa_socket.get('fileno') self.use_papa = True + self.group = group if log_differences: differences = [] if host != self.host: @@ -122,7 +124,8 @@ class CircusSocket(socket.socket): def __init__(self, name='', host='localhost', port=8080, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, backlog=2048, path=None, umask=None, replace=False, - interface=None, so_reuseport=False, blocking=False): + interface=None, so_reuseport=False, blocking=False, + group=None): if path is not None: if not hasattr(socket, 'AF_UNIX'): raise NotImplementedError("AF_UNIX not supported on this" @@ -138,6 +141,7 @@ def __init__(self, name='', host='localhost', port=8080, self.umask = umask self.replace = replace self.use_papa = False + self.group = group if hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: self.host = self.port = None @@ -246,7 +250,8 @@ def load_from_config(cls, config): 'so_reuseport': to_bool(config.get('so_reuseport')), 'umask': int(config.get('umask', 8)), 'replace': config.get('replace'), - 'blocking': to_bool(config.get('blocking'))} + 'blocking': to_bool(config.get('blocking')), + 'group': config.get('group')} use_papa = to_bool(config.get('use_papa')) and papa is not None proto_name = config.get('proto') if proto_name is not None: diff --git a/circus/watcher.py b/circus/watcher.py index 7a810f308..8c6c8e273 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -138,6 +138,10 @@ class Watcher(object): descriptors, thus can reuse the sockets opened by circusd. (default: False) + - **socket_group** -- Name of the group that sockets need to be in, to be + attached to this watcher. If unset, all sockets can be used. + (default: None) + - **on_demand** -- If True, the processes will be started only at the first connection to the socket (default: False) @@ -203,8 +207,8 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., graceful_timeout=30.0, prereload_fn=None, rlimits=None, executable=None, stdout_stream=None, stderr_stream=None, priority=0, loop=None, singleton=False, use_sockets=False, - copy_env=False, copy_path=False, max_age=0, - max_age_variance=30, hooks=None, respawn=True, + socket_group=None, copy_env=False, copy_path=False, + max_age=0, max_age_variance=30, hooks=None, respawn=True, autostart=True, on_demand=False, virtualenv=None, stdin_socket=None, close_child_stdin=True, close_child_stdout=False, @@ -212,6 +216,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., use_papa=False, **options): self.name = name self.use_sockets = use_sockets + self.socket_group = socket_group self.on_demand = on_demand self.res_name = name.lower().replace(" ", "_") self.numprocesses = int(numprocesses) @@ -271,7 +276,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., "stop_children", "shell", "shell_args", "env", "max_retry", "cmd", "args", "respawn", "graceful_timeout", "executable", "use_sockets", - "priority", "copy_env", "singleton", + "socket_group", "priority", "copy_env", "singleton", "stdout_stream_conf", "on_demand", "stderr_stream_conf", "max_age", "max_age_variance", "close_child_stdin", "close_child_stdout", @@ -418,7 +423,11 @@ def load_from_config(cls, config): @util.debuglog def initialize(self, evpub_socket, sockets, arbiter): self.evpub_socket = evpub_socket - self.sockets = sockets + if self.socket_group is None: + self.sockets = sockets + else: + self.sockets = {name: socket for (name, socket) in sockets.items() + if socket.group == self.socket_group} self.arbiter = arbiter def __len__(self):