From b9bbebdc39016bc7b01a7dc430184a4df5c8c915 Mon Sep 17 00:00:00 2001 From: Vijesh Shetty Date: Sun, 3 Nov 2024 17:37:00 +0530 Subject: [PATCH] Revert "batch insert" This reverts commit 0eac8a380426794ce35f2eaad3b9030716026820. --- .../src/services/rabbitmq/producer.ts | 89 ++++++++++--------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/kafka-clickhouse/src/services/rabbitmq/producer.ts b/kafka-clickhouse/src/services/rabbitmq/producer.ts index 065d9a2..7b08a1b 100644 --- a/kafka-clickhouse/src/services/rabbitmq/producer.ts +++ b/kafka-clickhouse/src/services/rabbitmq/producer.ts @@ -2,58 +2,61 @@ import rabbitmqConnection from './connection.js'; import UserLocationService from '../location/location.js'; import { ProduceMessage, UserLocation } from './types.js'; + const exchangeName = 'exchange'; // The exchange name const routingKey = 'eurl_click_analytics'; // The routing key -const batchSize = 10; // Set batch size for messages const pub = rabbitmqConnection.createPublisher({ - confirm: true, // Enable publish confirmations - maxAttempts: 2, // Enable retries - exchanges: [{ exchange: exchangeName, type: 'fanout', durable: true }], // Ensure the exchange exists + // Enable publish confirmations, similar to consumer acknowledgements + confirm: true, + // Enable retries + maxAttempts: 2, + // Ensure the existence of an exchange before we use it + exchanges: [{ exchange: exchangeName, type: 'fanout', durable: true }], }); +// Publish a message for testing +// pub.send({ exchange: exchangeName, routingKey: routingKey }, { +// code: '123', +// browser: 'Chrome', +// os: 'Windows', +// device: 'Desktop', +// country: 'US', +// region: 'CA', +// city: 'Los Angeles', +// }) +// .then(() => console.log('Message published')) +// .catch((error) => console.error('Error publishing message:', error)); + +// TODO: Implement the Producer class +// TODO: Make it batch processing + +let dump = []; + class Producer { - private dump: ProduceMessage[]; // Batch of messages - - constructor() { - this.dump = []; - } - - public async produceLogic(ip: string, browser: string, os: string, device: string, code: string): Promise { - try { - const location: UserLocation = await UserLocationService.getUserLocation(ip); - const message: ProduceMessage = { - code, - browser, - os, - device, - country: location.country, - region: location.region, - city: location.city, - }; - - console.log(`[INFO] Adding message to batch: ${JSON.stringify(message)}`); - this.dump.push(message); - - if (this.dump.length >= batchSize) { - await this.publishBatch(); - } - } catch (error) { - console.error(`[ERROR] Failed to process message for IP ${ip}:`, error); + async produceLogic(ip: string, browser: string, os: string, device: string, code: string): Promise { + const location: UserLocation = await UserLocationService.getUserLocation(ip); + const message: ProduceMessage = { + code, + browser, + os, + device, + country: location.country, + region: location.region, + city: location.city, + }; + console.log('Producing message:', JSON.stringify(message)); + dump.push(message); + if (dump.length >= 10) { + pub.send({ exchange: exchangeName, routingKey: routingKey }, dump) + .then(() => console.log(`Published ${dump.length} messages, at ${new Date().toISOString()}`)) + .catch((error) => console.error('Error publishing message:', error)); + dump = []; } - } - - private async publishBatch(): Promise { - try { - await pub.send({ exchange: exchangeName, routingKey: routingKey }, this.dump); - console.log(`[SUCCESS] Published ${this.dump.length} messages at ${new Date().toISOString()}`); - } catch (error) { - console.error(`[ERROR] Failed to publish batch of ${this.dump.length} messages:`, error); - } finally { - this.dump = []; // Clear batch regardless of success or failure - } - } + +} } const instance = new Producer(); + export default instance;