php-rdkafka improved to manage php-zookeeper.
Add euskadi31/kafka-php
to your composer.json
:
% php composer.phar require euskadi31/kafka-php:~1.0
<?php
$cm = new Euskadi31\Kafka\ClusterMetadata\Zookeeper([
'10.0.0.13' => 2181
]);
$producer = new Euskadi31\Kafka\Producer($cm);
$topic = $producer->newTopic('test');
$topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message payload');
<?php
$cm = new Euskadi31\Kafka\ClusterMetadata\Zookeeper([
'10.0.0.13' => 2181
]);
$conf = new RdKafka\Conf();
$consumer = new Euskadi31\Kafka\Consumer($cm, $conf);
$topic = $consumer->newTopic('test');
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
// The first argument is the partition (again).
// The second argument is the timeout.
$msg = $topic->consume(0, 1000);
if (empty($msg)) {
continue;
}
if ($msg->err) {
echo $msg->errstr() . PHP_EOL;
break;
} else {
echo $msg->payload . PHP_EOL;
}
}
kafka-php is licensed under the MIT license.