From 886e0f7bda2c29f6d6f19025f8005b2b8eaf1291 Mon Sep 17 00:00:00 2001 From: Vijesh Shetty Date: Sun, 3 Nov 2024 17:37:15 +0530 Subject: [PATCH] Revert "simple batch for rmq" This reverts commit 2f2c1559be31116d709e4741814eb271c30b8f77. --- .../src/fixtures/rabbitmq_clickhouse.sql | 2 +- .../src/services/rabbitmq/producer.ts | 20 +++++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/kafka-clickhouse/src/fixtures/rabbitmq_clickhouse.sql b/kafka-clickhouse/src/fixtures/rabbitmq_clickhouse.sql index 3f7fdf8..09ea56d 100644 --- a/kafka-clickhouse/src/fixtures/rabbitmq_clickhouse.sql +++ b/kafka-clickhouse/src/fixtures/rabbitmq_clickhouse.sql @@ -22,7 +22,7 @@ CREATE TABLE IF NOT EXISTS eurl_data.click_analytics_rq ( `city` String -- The city where the user is located ) ENGINE = RabbitMQ -- Use RabbitMQ as the table engine for real-time data ingestion SETTINGS - rabbitmq_host_port = '172.18.0.9:5672', -- RabbitMQ broker address + rabbitmq_host_port = '172.17.0.5:5672', -- RabbitMQ broker address rabbitmq_routing_key_list = 'eurl_click_analytics', -- RabbitMQ routing key for message filtering rabbitmq_exchange_name = 'exchange', -- The RabbitMQ exchange name where messages are published rabbitmq_format = 'JSONEachRow'; -- Format for incoming messages from RabbitMQ (JSON format, one message per row) diff --git a/kafka-clickhouse/src/services/rabbitmq/producer.ts b/kafka-clickhouse/src/services/rabbitmq/producer.ts index 7b08a1b..a79a15e 100644 --- a/kafka-clickhouse/src/services/rabbitmq/producer.ts +++ b/kafka-clickhouse/src/services/rabbitmq/producer.ts @@ -31,8 +31,6 @@ const pub = rabbitmqConnection.createPublisher({ // TODO: Implement the Producer class // TODO: Make it batch processing -let dump = []; - class Producer { async produceLogic(ip: string, browser: string, os: string, device: string, code: string): Promise { const location: UserLocation = await UserLocationService.getUserLocation(ip); @@ -45,15 +43,15 @@ class Producer { region: location.region, city: location.city, }; - console.log('Producing message:', JSON.stringify(message)); - dump.push(message); - if (dump.length >= 10) { - pub.send({ exchange: exchangeName, routingKey: routingKey }, dump) - .then(() => console.log(`Published ${dump.length} messages, at ${new Date().toISOString()}`)) - .catch((error) => console.error('Error publishing message:', error)); - dump = []; - } - + console.log('Producing message:', message); + pub.send( + {exchange: exchangeName, routingKey: routingKey}, // metadata + message + ).catch((error) => { + console.error('Error producing message:', error); + }).finally(() => { + console.log('Message produced'); + }); } }