diff --git a/yandextank/plugins/YASM/plugin.py b/yandextank/plugins/YASM/plugin.py index 443735f12..c733414d8 100644 --- a/yandextank/plugins/YASM/plugin.py +++ b/yandextank/plugins/YASM/plugin.py @@ -32,16 +32,16 @@ ] -def signals_stream(yasmapi_cfg): +def signals_stream(panel): ''' - :type yasmapi_cfg: YasmCfg - :return: + :type yasmapi_cfg: Panel + :return: Panel, float, dict ''' - for point in RtGolovanRequest(yasmapi_cfg.as_dict()): + for point in RtGolovanRequest(panel.as_dict): logger.debug('received YASM data for {} for following hosts an tags: {}'. format(point.ts, {host: tags.keys() for host, tags in point.values.items()})) - yield point.ts, {panel.alias: point.values[panel.host][panel.tags] for panel in yasmapi_cfg.panels} + yield point.ts, point.values[panel.host][panel.tags] def map_metric_name(name): @@ -69,27 +69,37 @@ def monitoring_data(ts, data, comment=''): class YasmCfg(object): - def __init__(self, panels): - self.panels = [self.Panel(alias, **attrs) for alias, attrs in panels.items()] + self.panels = [Panel(alias, **attrs) for alias, attrs in panels.items()] + self._as_dict = None + @property def as_dict(self): - yasmapi_cfg = {} - for panel in self.panels: - yasmapi_cfg.setdefault(panel.host, {})[panel.tags] = panel.signals - logger.info('yasmapi cfg: {}'.format(yasmapi_cfg)) - return yasmapi_cfg - - class Panel(object): - def __init__(self, alias, host, tags, signals=None, default_signals=True): - self.alias = alias - custom_signals = signals if signals else [] - self.signals = DEFAULT_SIGNALS + custom_signals if default_signals else custom_signals - self.host = host - self.tags = tags - if len(self.signals) == 0: - logger.warning('No signals specified for {} panel'.format(self.alias)) - self.dict_cfg = {self.host: {self.tags: self.signals}} + if self._as_dict is None: + yasmapi_cfg = {} + for panel in self.panels: + yasmapi_cfg.setdefault(panel.host, {})[panel.tags] = panel.signals + logger.info('yasmapi cfg: {}'.format(yasmapi_cfg)) + self._as_dict = yasmapi_cfg + return self._as_dict + + +class Panel(object): + def __init__(self, alias, host, tags, signals=None, default_signals=True): + self.queue = Queue() + self.alias = alias + custom_signals = signals if signals else [] + self.signals = DEFAULT_SIGNALS + custom_signals if default_signals else custom_signals + self.host = host + self.tags = tags + if len(self.signals) == 0: + logger.warning('No signals specified for {} panel'.format(self.alias)) + self.as_dict = {self.host: {self.tags: self.signals}} + self.last_ts = 0 + self.stop_trigger = Event() + + def stop(self): + self.stop_trigger.set() class ImmutableDict(dict): @@ -108,18 +118,110 @@ def set_copy(self, key=None, value=None): return copy -class Plugin(MonitoringPlugin): +class YasmMPReceiver(object): + def __init__(self, yasm_cfg, yasmapi_timeout): + """ + + :type data_queue: Queue + :type yasm_cfg: YasmCfg + """ + self.panels = yasm_cfg.panels + self.data_queue = Queue() + self.timeout = yasmapi_timeout + self._data_buffer = [] + self._start_event = Event() + self._stop_event = Event() + self.ps_pool = {panel.alias: Process(target=self.single_receiver, + args=(panel,)) + for panel in self.panels} + self.consumers = {panel.alias: Thread(target=self.single_controller, args=(panel, self.ps_pool[panel.alias])) + for panel in self.panels} + + def get_buffered_data(self): + data, self._data_buffer = self._data_buffer, [] + return data + + def start_collecting(self): + [p.start() for p in self.ps_pool.values()] + + def start_transmitting(self): + self._start_event.set() + [consumer.start() for consumer in self.consumers.values()] + + def single_receiver(self, panel): + # ignore SIGINT (process is controlled by .stop_event) + """ + + :type panel: Panel + """ + signal.signal(signal.SIGINT, signal.SIG_IGN) + + stream = signals_stream(panel) + try: + while not panel.stop_trigger.is_set(): + ts, data = stream.next() + if self._start_event.is_set(): + panel.queue.put((ts, {panel.alias: data})) + finally: + logger.info('Closing panel {} receiver thread'.format(panel.alias)) + + def single_controller(self, panel, ps): + """ + + :type ps: Process + :type panel: Panel + """ + while not panel.stop_trigger.is_set(): + try: + ts, data = panel.queue.get(timeout=self.timeout) + panel.last_ts = ts + # logger.info('Received monitoring data for {}'.format(ts)) + self._data_buffer.append(monitoring_data(ts, data)) + except Empty: + logger.warning( + 'Not receiving any data from YASM. Probably your hosts/tags specification is not correct') + panel.stop_trigger.set() + if ps.is_alive(): + ps.terminate() + break + except KeyboardInterrupt: + logging.warning('Interrupting collecting metrics for panel {}'.format(panel.alias)) + panel.stop_trigger.set() + if ps.is_alive(): + ps.terminate() + break + + def stop_now(self): + end_time = time.time() + active_panels = self.panels + while len(active_panels) > 0: + try: + for panel in active_panels: + if panel.last_ts < end_time and not self._stop_event.is_set(): + logger.info('Waiting for yasm metrics for panel {}'.format(panel.alias)) + else: + panel.stop_trigger.set() + self.ps_pool[panel.alias].join() + self.consumers[panel.alias].join() + active_panels = [panel for panel in active_panels if not panel.stop_trigger.is_set()] + if len(active_panels) > 0: + time.sleep(5) + except KeyboardInterrupt: + logger.info('Metrics receiving interrupted') + [panel.stop_trigger.set() for panel in active_panels] + [(self.ps_pool[panel.alias].join(), self.consumers[panel.alias].join()) for panel in active_panels] + +class Plugin(MonitoringPlugin): def __init__(self, core, cfg, cfg_updater=None): super(Plugin, self).__init__(core, cfg) - self.data_queue = Queue() self.start_event = Event() self.stop_event = Event() self.last_ts = 0 - self.data_buffer = [] - self.timeout = self.get_option('timeout') if self.get_option('verbose_logging'): logger.setLevel(logging.DEBUG) + self.yasm_receiver = YasmMPReceiver(YasmCfg(self.get_option('panels')), + self.get_option('timeout')) def add_listener(self, plugin): self.listeners.append(plugin) @@ -130,66 +232,24 @@ def send_collected_data(self, data): listener.monitoring_data(data) def is_test_finished(self): - if len(self.data_buffer) > 0: - data, self.data_buffer = self.data_buffer, [] + data = self.yasm_receiver.get_buffered_data() + if len(data) > 0: self.send_collected_data(data) return -1 def prepare_test(self): - yasmapi_cfg = YasmCfg(self.get_option('panels')) - self.yasm_receiver_ps = Process(target=self.yasm_receiver, args=(yasmapi_cfg,),) - self.yasm_receiver_ps.start() - self.consumer_thread = Thread(target=self.consumer) - self.consumer_thread.start() + self.yasm_receiver.start_collecting() def start_test(self): - self.start_event.set() + self.yasm_receiver.start_transmitting() logger.info('Listeners: {}'.format(self.listeners)) def end_test(self, retcode): - self.end_time = time.time() - while self.last_ts < self.end_time and not self.stop_event.is_set(): - try: - logger.info('Waiting for yasm metrics till {}'.format(self.end_time)) - time.sleep(5) - except KeyboardInterrupt: - logger.info('Metrics receiving interrupted') - break - self.stop_event.set() - self.consumer_thread.join() - self.yasm_receiver_ps.join() + self.yasm_receiver.stop_now() self.send_rest() return retcode def send_rest(self): - if len(self.data_buffer) > 0: - data, self.data_buffer = self.data_buffer, [] + data = self.yasm_receiver.get_buffered_data() + if len(data) > 0: self.send_collected_data(data) - - def consumer(self): - while not self.stop_event.is_set(): - try: - ts, data = self.data_queue.get(timeout=self.timeout) - # logger.info('Received monitoring data for {}'.format(ts)) - self.last_ts = ts - self.data_buffer.append(monitoring_data(ts, data)) - except Empty: - logger.warning( - 'Not receiving any data from YASM. Probably your hosts/tags specification is not correct') - self.stop_event.set() - if self.yasm_receiver_ps.is_alive(): - self.yasm_receiver_ps.terminate() - break - - def yasm_receiver(self, yasmapi_cfg): - # ignore SIGINT (process is controlled by .stop_event) - signal.signal(signal.SIGINT, signal.SIG_IGN) - - stream = signals_stream(yasmapi_cfg) - try: - while not self.stop_event.is_set(): - ts, data = stream.next() - if self.start_event.is_set(): - self.data_queue.put((ts, data)) - finally: - logger.info('Closing YASM receiver thread')