A Pub-Sub abstraction for Laravel.
This package is a wrapper bridging php-pubsub into Lumen.
The following adapters are supported:
- Local
- /dev/null
- Redis
- Kafka (see separate installation instructions below)
- Google Cloud
composer require lushdigital/lumen-pubsub
Register the service provider with Lumen in the bootstrap/app.php
file:
$app->register(LushDigital\LumenPubSub\PubSubServiceProvider::class);
The package has a default configuration which uses the following environment variables.
PUBSUB_CONNECTION=redis
REDIS_HOST=localhost
REDIS_PASSWORD=null
REDIS_PORT=6379
KAFKA_BROKERS=localhost
GOOGLE_CLOUD_PROJECT_ID=your-project-id-here
GOOGLE_CLOUD_KEY_FILE=path/to/your/gcloud-key.json
To customize the configuration file simply copy it from vendor/lushdigital/lumen-pubsub/config/pubsub.php
to
app/config/pubsub.php
. Then you need to add the following line to bootstrap/app.php
:
$app->configure('pubsub');
You can then edit the config at app/config/pubsub.php
.
Please note that whilst the package is bundled with support for the php-pubsub-kafka adapter, the adapter is not included by default.
This is because the KafkaPubSubAdapter has an external dependency on the librdkafka c library
and the php-rdkafka
PECL extension.
If you plan on using this adapter, you will need to install these dependencies by following these installation instructions.
You can then include the adapter using:
composer require superbalist/php-pubsub-kafka
// get the pub-sub manager
$pubsub = app('pubsub');
// note: function calls on the manager are proxied through to the default connection
// eg: you can do this on the manager OR a connection
$pubsub->publish('channel_name', 'message');
// get the default connection
$pubsub = app('pubsub.connection');
// or
$pubsub = app(\Superbalist\PubSub\PubSubAdapterInterface::class);
// get a specific connection
$pubsub = app('pubsub')->connection('redis');
// publish a message
// the message can be a string, array, json string, bool, object - anything which can be serialized
$pubsub->publish('channel_name', 'this is where your message goes');
$pubsub->publish('channel_name', ['key' => 'value']);
$pubsub->publish('channel_name', '{"key": "value"}');
$pubsub->publish('channel_name', true);
// subscribe to a channel
$pubsub->subscribe('channel_name', function ($message) {
var_dump($message);
});
// all the aboce commands can also be done using the facade
PubSub::connection('kafka')->publish('channel_name', 'Hello World!');
PubSub::connection('kafka')->subscribe('channel_name', function ($message) {
var_dump($message);
});
The package includes a helper command php artisan make:subscriber MyExampleSubscriber
to stub new subscriber command classes.
A lot of pub-sub adapters will contain blocking subscribe()
calls, so these commands are best run as daemons running
as a supervisor process.
This generator command will create the file app/Console/Commands/MyExampleSubscriber.php
which will contain:
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Superbalist\PubSub\PubSubAdapterInterface;
class MyExampleSubscriber extends Command
{
/**
* The name and signature of the subscriber command.
*
* @var string
*/
protected $signature = 'subscriber:name';
/**
* The subscriber description.
*
* @var string
*/
protected $description = 'PubSub subscriber for ________';
/**
* @var PubSubAdapterInterface
*/
protected $pubsub;
/**
* Create a new command instance.
*
* @param PubSubAdapterInterface $pubsub
*/
public function __construct(PubSubAdapterInterface $pubsub)
{
parent::__construct();
$this->pubsub = $pubsub;
}
/**
* Execute the console command.
*/
public function handle()
{
$this->pubsub->subscribe('channel_name', function ($message) {
});
}
}
Once the command has been generated, do not forget to register it with Artisan.
This is done by editing the app/Console/Kernel.php
file. To register your command, simply add the command's class
name to the command list.
protected $commands = [
Commands\MyExampleSubscriber::class
];
For subscribers which use the php-pubsub-kafka
adapter, you'll likely want to change the consumer_group_id
per
subscriber.
To do this, you need to use the PubSubConnectionFactory
to create a new connection per subscriber. This is because
the consumer_group_id
cannot be changed once a connection is created.
Here is an example of how you can do this:
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use LushDigital\LumenPubSub\PubSubConnectionFactory;
use Superbalist\PubSub\PubSubAdapterInterface;
class MyExampleKafkaSubscriber extends Command
{
/**
* The name and signature of the subscriber command.
*
* @var string
*/
protected $signature = 'subscriber:name';
/**
* The subscriber description.
*
* @var string
*/
protected $description = 'PubSub subscriber for ________';
/**
* @var PubSubAdapterInterface
*/
protected $pubsub;
/**
* Create a new command instance.
*
* @param PubSubConnectionFactory $factory
*/
public function __construct(PubSubConnectionFactory $factory)
{
parent::__construct();
$config = config('pubsub.connections.kafka');
$config['consumer_group_id'] = self::class;
$this->pubsub = $factory->make('kafka', $config);
}
/**
* Execute the console command.
*/
public function handle()
{
$this->pubsub->subscribe('channel_name', function ($message) {
});
}
}
Please see the php-pubsub documentation Writing an Adapter.
To include your custom driver, you can call the extend()
function.
$manager = app('pubsub');
$manager->extend('custom_connection_name', function ($config) {
// your callable must return an instance of the PubSubAdapterInterface
return new MyCustomPubSubDriver($config);
});
// get an instance of your custom connection
$pubsub = $manager->connection('custom_connection_name');