From 89741ec4bfc9ce5a86aca9e1ddcf7b428646690f Mon Sep 17 00:00:00 2001 From: Allan Kumka Date: Tue, 6 Sep 2016 16:53:09 -0700 Subject: [PATCH] Fix #965 - Make Flapping Ignore Commanded Stops Adding commanded_reap flag to the reap message and setting flapping to only trigger when the flag is True. The flag is false when reaping a processes that was detected dead. --- circus/arbiter.py | 2 +- circus/plugins/flapping.py | 17 ++++++++++++----- circus/tests/test_plugin_flapping.py | 12 ++++++++++++ circus/watcher.py | 12 +++++++----- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/circus/arbiter.py b/circus/arbiter.py index 115d04344..2cf12fb15 100644 --- a/circus/arbiter.py +++ b/circus/arbiter.py @@ -619,7 +619,7 @@ def reap_processes(self): if pid in watchers_pids: watcher = watchers_pids[pid] - watcher.reap_process(pid, status) + watcher.reap_process(pid, status, commanded_reap=False) except OSError as e: if e.errno == errno.EAGAIN: sleep(0) diff --git a/circus/plugins/flapping.py b/circus/plugins/flapping.py index 5b7f90729..235894b8d 100644 --- a/circus/plugins/flapping.py +++ b/circus/plugins/flapping.py @@ -54,12 +54,19 @@ def handle_stop(self): def handle_recv(self, data): watcher_name, action, msg = self.split_data(data) - if action == "reap": - timeline = self.timelines.get(watcher_name, []) - timeline.append(time.time()) - self.timelines[watcher_name] = timeline + try: + message = self.load_message(msg) + except (ValueError, TypeError): + message = None + logger.error("Error while decoding json for message: %s", msg) - self.check(watcher_name) + if action == "reap": + commanded_reap = message and message.get('commanded_reap', False) + if not commanded_reap: + timeline = self.timelines.get(watcher_name, []) + timeline.append(time.time()) + self.timelines[watcher_name] = timeline + self.check(watcher_name) elif action == "updated": self.update_conf(watcher_name) diff --git a/circus/tests/test_plugin_flapping.py b/circus/tests/test_plugin_flapping.py index ef1ae5a17..05ed09a82 100644 --- a/circus/tests/test_plugin_flapping.py +++ b/circus/tests/test_plugin_flapping.py @@ -1,4 +1,5 @@ from mock import patch +import json from circus.tests.support import TestCircus, EasyTestSuite from circus.plugins.flapping import Flapping @@ -28,6 +29,17 @@ def test_reap_message_calls_check(self, check_mock): check_mock.assert_called_with('test') + @patch.object(Flapping, 'check') + def test_command_generated_reap_message_doesnt_call_check(self, + check_mock): + plugin = self._flapping_plugin() + topic = 'watcher.test.reap' + message = json.dumps({'commanded_reap': True}) + + plugin.handle_recv([topic, message]) + + check_mock.assert_not_called() + @patch.object(Flapping, 'cast') @patch('circus.plugins.flapping.Timer') def test_below_max_retry_triggers_restart(self, timer_mock, cast_mock): diff --git a/circus/watcher.py b/circus/watcher.py index 58c1cab1a..5d517d63d 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -435,7 +435,7 @@ def notify_event(self, topic, msg): self.evpub_socket.send_multipart(multipart_msg) @util.debuglog - def reap_process(self, pid, status=None): + def reap_process(self, pid, status=None, commanded_reap=True): """ensure that the process is killed (and not a zombie)""" if pid not in self.processes: return @@ -476,7 +476,8 @@ def reap_process(self, pid, status=None): "reap", {"process_pid": pid, "time": time.time(), - "exit_code": process.returncode()}) + "exit_code": process.returncode(), + "commanded_reap": commanded_reap}) process.stop() return @@ -512,10 +513,11 @@ def reap_process(self, pid, status=None): self.notify_event("reap", {"process_pid": pid, "time": time.time(), - "exit_code": exit_code}) + "exit_code": exit_code, + "commanded_reap": commanded_reap}) @util.debuglog - def reap_processes(self): + def reap_processes(self, commanded_reap=True): """Reap all the processes for this watcher. """ if self.is_stopped(): @@ -524,7 +526,7 @@ def reap_processes(self): # reap_process changes our dict, look through the copy of keys for pid in list(self.processes.keys()): - self.reap_process(pid) + self.reap_process(pid, commanded_reap=commanded_reap) @gen.coroutine @util.debuglog