Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send pings periodically on idle websocket connections. #2084

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,7 @@ private void sendMessage(DataChannel dst, BridgeChannelMessage message)
*/
private void sendMessage(ColibriWebSocket dst, BridgeChannelMessage message)
{
RemoteEndpoint remote = dst.getRemote();
if (remote != null)
{
// We'll use the async version of sendString since this may be called
// from multiple threads. It's just fire-and-forget though, so we
// don't wait on the result
remote.sendString(message.toJson(), new WriteCallback.Adaptor());
}
dst.sendString(message.toJson());
statisticsSupplier.get().colibriWebSocketMessagesSent.inc();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
package org.jitsi.videobridge.websocket;

import org.eclipse.jetty.websocket.api.*;
import org.jitsi.utils.collections.*;
import org.jitsi.utils.logging2.*;
import org.jitsi.videobridge.*;
import org.jitsi.videobridge.util.*;
import org.jitsi.videobridge.websocket.config.*;

import java.nio.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;

/**
* @author Boris Grozev
Expand All @@ -37,6 +41,17 @@ public class ColibriWebSocket extends WebSocketAdapter
*/
private final EventHandler eventHandler;

/**
* The clock used to compute lastSendTime.
*/
private final Clock clock = Clock.systemUTC();

/** The last time something was sent on this web socket */
private Instant lastSendTime = Instant.MIN;

/** The recurring task to send pings on the connection, if needed. */
private ScheduledFuture<?> pinger = null;

/**
* Initializes a new {@link ColibriWebSocket} instance.
*/
Expand Down Expand Up @@ -72,6 +87,14 @@ public void onWebSocketConnect(Session sess)
{
super.onWebSocketConnect(sess);

if (WebsocketServiceConfig.config.getSendKeepalivePings())
{
pinger = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(this::maybeSendPing,
WebsocketServiceConfig.config.getKeepalivePingInterval().toMillis(),
WebsocketServiceConfig.config.getKeepalivePingInterval().toMillis(),
TimeUnit.MILLISECONDS);
}

eventHandler.webSocketConnected(this);
}

Expand All @@ -82,6 +105,52 @@ public void onWebSocketConnect(Session sess)
public void onWebSocketClose(int statusCode, String reason)
{
eventHandler.webSocketClosed(this, statusCode, reason);
if (pinger != null)
{
pinger.cancel(true);
}
}

public void sendString(String message)
{
RemoteEndpoint remote = getRemote();
if (remote != null)
{
// We'll use the async version of sendString since this may be called
// from multiple threads. It's just fire-and-forget though, so we
// don't wait on the result

remote.sendString(message, WriteCallback.NOOP);
synchronized (this)
{
lastSendTime = clock.instant();
}
}
}

private void maybeSendPing()
{
try
{
Instant now = clock.instant();
synchronized (this)
{
if (Duration.between(lastSendTime, now).
compareTo(WebsocketServiceConfig.config.getKeepalivePingInterval()) < 0)
{
RemoteEndpoint remote = getRemote();
if (remote != null)
{
remote.sendPing(ByteBuffer.allocate(0), WriteCallback.NOOP);
lastSendTime = clock.instant();
}
}
}
}
catch (Exception e)
{
logger.error("Error sending websocket ping", e);
}
}

/**
Expand All @@ -91,6 +160,10 @@ public void onWebSocketClose(int statusCode, String reason)
public void onWebSocketError(Throwable cause)
{
eventHandler.webSocketError(this, cause);
if (pinger != null)
{
pinger.cancel(true);
}
}

public interface EventHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.jitsi.utils.logging2.*;
import org.jitsi.videobridge.*;
import org.jitsi.videobridge.relay.*;
import org.jitsi.videobridge.websocket.config.*;

import static org.jitsi.videobridge.websocket.config.WebsocketServiceConfig.config;

import java.io.*;
Expand Down Expand Up @@ -67,7 +69,7 @@ class ColibriWebSocketServlet
public void configure(JettyWebSocketServletFactory webSocketServletFactory)
{
// set a timeout of 1min
webSocketServletFactory.setIdleTimeout(Duration.ofMinutes(1));
webSocketServletFactory.setIdleTimeout(WebsocketServiceConfig.config.getIdleTimeout());

webSocketServletFactory.setCreator((request, response) ->
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.jitsi.videobridge.relay

import org.eclipse.jetty.websocket.api.WriteCallback
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest
import org.eclipse.jetty.websocket.client.WebSocketClient
import org.eclipse.jetty.websocket.core.CloseStatus
Expand All @@ -37,6 +36,7 @@ import org.jitsi.videobridge.message.ServerHelloMessage
import org.jitsi.videobridge.message.SourceVideoTypeMessage
import org.jitsi.videobridge.message.VideoTypeMessage
import org.jitsi.videobridge.websocket.ColibriWebSocket
import org.jitsi.videobridge.websocket.config.WebsocketServiceConfig
import org.json.simple.JSONObject
import java.lang.ref.WeakReference
import java.net.URI
Expand Down Expand Up @@ -229,10 +229,7 @@ class RelayMessageTransport(
* @param message the message to send.
*/
private fun sendMessage(dst: ColibriWebSocket, message: BridgeChannelMessage) {
// We'll use the async version of sendString since this may be called
// from multiple threads. It's just fire-and-forget though, so we
// don't wait on the result
dst.remote?.sendString(message.toJson(), WriteCallback.Adaptor())
dst.sendString(message.toJson())
statisticsSupplier.get().colibriWebSocketMessagesSent.inc()
}

Expand Down Expand Up @@ -496,7 +493,10 @@ class RelayMessageTransport(
/**
* The single [WebSocketClient] instance that all [Relay]s use to initiate a web socket connection.
*/
val webSocketClient = WebSocketClient().apply { start() }
val webSocketClient = WebSocketClient().apply {
idleTimeout = WebsocketServiceConfig.config.idleTimeout
start()
}

/**
* Reason to use when closing a WS due to the relay being expired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.jitsi.videobridge.websocket.config
import org.jitsi.config.JitsiConfig
import org.jitsi.metaconfig.config
import org.jitsi.metaconfig.optionalconfig
import java.time.Duration

class WebsocketServiceConfig private constructor() {
/**
Expand Down Expand Up @@ -113,6 +114,21 @@ class WebsocketServiceConfig private constructor() {
}
}

/** Whether keepalive pings are enabled */
val sendKeepalivePings: Boolean by config {
"videobridge.websockets.send-keepalive-pings".from(JitsiConfig.newConfig)
}

/** The time interval for keepalive pings */
val keepalivePingInterval: Duration by config {
"videobridge.websockets.keepalive-ping-interval".from(JitsiConfig.newConfig)
}

/** The time interval for websocket timeouts */
val idleTimeout: Duration by config {
"videobridge.websockets.idle-timeout".from(JitsiConfig.newConfig)
}

companion object {
@JvmField
val config = WebsocketServiceConfig()
Expand Down
6 changes: 6 additions & 0 deletions jvb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ videobridge {
server-id = "default-id"
// Whether to negotiate WebSocket compression (permessage-deflate)
enable-compression = true
// Whether to send keepalive pings on idle websockets
send-keepalive-pings = true
// The keepalive ping interval
keepalive-ping-interval = 15 seconds
// The websocket idle timeout
idle-timeout = 60 seconds

// Optional, even when 'enabled' is set to true
#tls = true
Expand Down
Loading