Skip to content

Commit

Permalink
Improve throughput when sending FCM Messages by using HTTP/2 (#874)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegamez committed Apr 25, 2024
1 parent a1d064c commit d2456d7
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 87 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ Please read about the future of the Firebase Admin PHP SDK on the

## [Unreleased]

### Changed

* FCM Messages are now sent asynchronously using HTTP connection pooling with HTTP/2. This should improve performance
when sending messages to many devices.
([#874](https://github.com/kreait/firebase-php/pull/874))

## [7.9.1] - 2023-12-04

### Changed
Expand Down
2 changes: 1 addition & 1 deletion src/Firebase/Contract/Messaging.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ interface Messaging
* @throws FirebaseException
* @throws InvalidArgumentException
*
* @return array<non-empty-string, mixed>
* @return array<array-key, mixed>
*/
public function send(Message|array $message, bool $validateOnly = false): array;

Expand Down
3 changes: 1 addition & 2 deletions src/Firebase/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ public function createMessaging(): Contract\Messaging
$projectId,
$this->httpFactory,
$this->httpFactory,
$errorHandler,
);

$appInstanceApiClient = new AppInstanceApiClient(
Expand All @@ -437,7 +436,7 @@ public function createMessaging(): Contract\Messaging
$errorHandler,
);

return new Messaging($messagingApiClient, $appInstanceApiClient);
return new Messaging($messagingApiClient, $appInstanceApiClient, $errorHandler);
}

/**
Expand Down
109 changes: 53 additions & 56 deletions src/Firebase/Messaging.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
namespace Kreait\Firebase;

use Beste\Json;
use GuzzleHttp\Promise\PromiseInterface;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\Promise\Utils;
use Iterator;
use Kreait\Firebase\Exception\InvalidArgumentException;
use Kreait\Firebase\Exception\Messaging\InvalidArgument;
use Kreait\Firebase\Exception\Messaging\NotFound;
use Kreait\Firebase\Exception\MessagingApiExceptionConverter;
use Kreait\Firebase\Exception\MessagingException;
use Kreait\Firebase\Messaging\ApiClient;
use Kreait\Firebase\Messaging\AppInstance;
Expand All @@ -25,6 +25,8 @@
use Kreait\Firebase\Messaging\RegistrationTokens;
use Kreait\Firebase\Messaging\SendReport;
use Kreait\Firebase\Messaging\Topic;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Throwable;

use function array_key_exists;
Expand All @@ -36,8 +38,11 @@
*/
final class Messaging implements Contract\Messaging
{
public function __construct(private readonly ApiClient $messagingApi, private readonly AppInstanceApiClient $appInstanceApi)
{
public function __construct(
private readonly ApiClient $messagingApi,
private readonly AppInstanceApiClient $appInstanceApi,
private readonly MessagingApiExceptionConverter $exceptionConverter,
) {
}

public function send(Message|array $message, bool $validateOnly = false): array
Expand All @@ -48,21 +53,18 @@ public function send(Message|array $message, bool $validateOnly = false): array
throw new InvalidArgument('The given message is missing a target');
}

$request = $this->messagingApi->createSendRequestForMessage($message, $validateOnly);
$reports = $this->sendAll([$message], $validateOnly)->getItems();
$report = array_shift($reports);
assert($report instanceof SendReport);

try {
$response = $this->messagingApi->send($request);
} catch (NotFound $e) {
$token = Json::decode(Json::encode($message), true)['token'] ?? null;

if ($token) {
throw NotFound::becauseTokenNotFound($token, $e->errors());
}

throw $e;
if ($report->isSuccess()) {
return $report->result() ?? [];
}

return Json::decode((string) $response->getBody(), true);
$error = $report->error();
assert($error instanceof MessagingException);

throw $error;
}

public function sendMulticast($message, $registrationTokens, bool $validateOnly = false): MulticastSendReport
Expand All @@ -84,35 +86,32 @@ public function sendMulticast($message, $registrationTokens, bool $validateOnly
public function sendAll($messages, bool $validateOnly = false): MulticastSendReport
{
$messages = $this->ensureMessages($messages);
$promises = $this->createMessageRequestPromises($messages(), $validateOnly);
$requests = $this->createSendRequests($messages, $validateOnly);
$sendReports = array_fill(0, count($messages), null);

$sendReports = Utils::settle($promises())
->then(static function (array $responses) use ($messages) {
$messages = $messages();
$sendReports = [];
$config = [
'fulfilled' => function (ResponseInterface $response, int $index) use ($messages, &$sendReports) {
$message = $messages[$index];

foreach ($responses as $response) {
$message = CloudMessage::fromArray(
Json::decode(Json::encode($messages->current()), true),
);
$json = Json::decode((string) $response->getBody(), true);

if ($response['state'] === PromiseInterface::FULFILLED) {
$json = Json::decode((string) $response['value']->getBody(), true);
$sendReports[$index] = SendReport::success($message->target(), $json, $message);
},
'rejected' => function (RequestException $reason, int $index) use ($messages, &$sendReports) {
$message = $messages[$index];

$sendReports[] = SendReport::success($message->target(), $json, $message);
}
$error = $this->exceptionConverter->convertException($reason);

if ($response['state'] === PromiseInterface::REJECTED) {
$sendReports[] = SendReport::failure($message->target(), $response['reason'], $message);
}
$sendReports[$index] = SendReport::failure($message->target(), $error, $message);
},
];

$messages->next();
}
$this->messagingApi->pool($requests(), $config)->wait();

return $sendReports;
})
->wait()
;
// $sendReports has the same size as $messages, and each key is set by the `fulfilled` and `rejected`
// handlers above. The only way I could imagine a `null` value in the reports is when a request
// didn't return a response at all. I don't think it's possible, so letting PHPStan know.
assert(!in_array(null, $sendReports, true));

return MulticastSendReport::withItems($sendReports);
}
Expand Down Expand Up @@ -222,45 +221,43 @@ public function getAppInstance(RegistrationToken|string $registrationToken): App
/**
* @param iterable<Message|array<non-empty-string, mixed>> $messages
*
* @return callable(): Iterator<Message>
* @return list<CloudMessage>
*/
private function ensureMessages(iterable $messages): callable
private function ensureMessages(iterable $messages): array
{
return function () use ($messages) {
foreach ($messages as $message) {
if ($message instanceof Message) {
yield $message;
} else {
yield $this->makeMessage($message);
}
}
};
$ensured = [];

foreach ($messages as $message) {
$ensured[] = $this->makeMessage($message);
}

return $ensured;
}

/**
* @param Message|array<non-empty-string, mixed> $message
*
* @throws InvalidArgumentException
*/
private function makeMessage(Message|array $message): Message
private function makeMessage(Message|array $message): CloudMessage
{
$message = $message instanceof Message ? $message : CloudMessage::fromArray($message);

$message = (new SetApnsPushTypeIfNeeded())($message);
$message = (new SetApnsContentAvailableIfNeeded())($message);

return (new SetApnsContentAvailableIfNeeded())($message);
return CloudMessage::fromArray(Json::decode(JSON::encode($message->jsonSerialize()), true));
}

/**
* @param iterable<Message> $messages
* @param iterable<CloudMessage> $messages
* @return callable(): list<RequestInterface>
*/
private function createMessageRequestPromises(iterable $messages, bool $validateOnly): callable
private function createSendRequests(iterable $messages, bool $validateOnly): callable
{
return function () use ($messages, $validateOnly) {
foreach ($messages as $message) {
$request = $this->messagingApi->createSendRequestForMessage($message, $validateOnly);

yield $this->messagingApi->sendAsync($request);
yield $this->messagingApi->createSendRequestForMessage($message, $validateOnly);
}
};
}
Expand Down
35 changes: 8 additions & 27 deletions src/Firebase/Messaging/ApiClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@

use Beste\Json;
use GuzzleHttp\ClientInterface;
use GuzzleHttp\Pool;
use GuzzleHttp\Promise\PromiseInterface;
use Kreait\Firebase\Exception\FirebaseException;
use Kreait\Firebase\Exception\MessagingApiExceptionConverter;
use Kreait\Firebase\Exception\MessagingException;
use Iterator;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Throwable;

/**
* @internal
Expand All @@ -26,7 +23,6 @@ public function __construct(
private readonly string $projectId,
private readonly RequestFactoryInterface $requestFactory,
private readonly StreamFactoryInterface $streamFactory,
private readonly MessagingApiExceptionConverter $errorHandler,
) {
}

Expand All @@ -48,36 +44,21 @@ public function createSendRequestForMessage(Message $message, bool $validateOnly
$body = $this->streamFactory->createStream(Json::encode($payload));

return $request
->withProtocolVersion('2.0')
->withBody($body)
->withHeader('Content-Type', 'application/json; charset=UTF-8')
->withHeader('Content-Length', (string) $body->getSize())
;
}

/**
* @param array<string, mixed> $options
*
* @throws FirebaseException
* @throws MessagingException
* @param list<RequestInterface>|Iterator<RequestInterface> $requests
* @param array<string, mixed> $config
*/
public function send(RequestInterface $request, array $options = []): ResponseInterface
public function pool(array|Iterator $requests, array $config): PromiseInterface
{
try {
return $this->client->send($request, $options);
} catch (Throwable $e) {
throw $this->errorHandler->convertException($e);
}
}
$pool = new Pool($this->client, $requests, $config);

/**
* @param array<string, mixed> $options
*/
public function sendAsync(RequestInterface $request, array $options = []): PromiseInterface
{
return $this->client->sendAsync($request, $options)
->then(null, function (Throwable $e): never {
throw $this->errorHandler->convertException($e);
})
;
return $pool->promise();
}
}
1 change: 1 addition & 0 deletions tests/Integration/MessagingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public function sendMulticastWithValidAndInvalidTarget(): void

$failure = $report->failures()->getItems()[0];
$this->assertSame($invalid, $failure->target()->value());
$this->assertTrue($failure->messageWasInvalid());
$this->assertInstanceOf(MessagingException::class, $failure->error());
}

Expand Down
4 changes: 3 additions & 1 deletion tests/Unit/MessagingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Kreait\Firebase\Exception\InvalidArgumentException;
use Kreait\Firebase\Exception\Messaging\InvalidArgument;
use Kreait\Firebase\Exception\MessagingApiExceptionConverter;
use Kreait\Firebase\Messaging;
use Kreait\Firebase\Messaging\ApiClient;
use Kreait\Firebase\Messaging\AppInstanceApiClient;
Expand All @@ -24,8 +25,9 @@ protected function setUp(): void
{
$messagingApi = $this->createMock(ApiClient::class);
$appInstanceApi = $this->createMock(AppInstanceApiClient::class);
$exceptionConverter = new MessagingApiExceptionConverter();

$this->messaging = new Messaging($messagingApi, $appInstanceApi);
$this->messaging = new Messaging($messagingApi, $appInstanceApi, $exceptionConverter);
}

#[Test]
Expand Down

0 comments on commit d2456d7

Please sign in to comment.