diff --git a/jvb/src/main/java/org/jitsi/videobridge/EndpointMessageTransport.java b/jvb/src/main/java/org/jitsi/videobridge/EndpointMessageTransport.java index f45918c5bc..c0223afa45 100644 --- a/jvb/src/main/java/org/jitsi/videobridge/EndpointMessageTransport.java +++ b/jvb/src/main/java/org/jitsi/videobridge/EndpointMessageTransport.java @@ -212,14 +212,7 @@ private void sendMessage(DataChannel dst, BridgeChannelMessage message) */ private void sendMessage(ColibriWebSocket dst, BridgeChannelMessage message) { - RemoteEndpoint remote = dst.getRemote(); - if (remote != null) - { - // We'll use the async version of sendString since this may be called - // from multiple threads. It's just fire-and-forget though, so we - // don't wait on the result - remote.sendString(message.toJson(), new WriteCallback.Adaptor()); - } + dst.sendString(message.toJson()); statisticsSupplier.get().colibriWebSocketMessagesSent.inc(); } diff --git a/jvb/src/main/java/org/jitsi/videobridge/websocket/ColibriWebSocket.java b/jvb/src/main/java/org/jitsi/videobridge/websocket/ColibriWebSocket.java index 28c7e0024b..d9d7ff59ec 100644 --- a/jvb/src/main/java/org/jitsi/videobridge/websocket/ColibriWebSocket.java +++ b/jvb/src/main/java/org/jitsi/videobridge/websocket/ColibriWebSocket.java @@ -16,11 +16,15 @@ package org.jitsi.videobridge.websocket; import org.eclipse.jetty.websocket.api.*; -import org.jitsi.utils.collections.*; import org.jitsi.utils.logging2.*; import org.jitsi.videobridge.*; +import org.jitsi.videobridge.util.*; +import org.jitsi.videobridge.websocket.config.*; +import java.nio.*; +import java.time.*; import java.util.*; +import java.util.concurrent.*; /** * @author Boris Grozev @@ -37,6 +41,17 @@ public class ColibriWebSocket extends WebSocketAdapter */ private final EventHandler eventHandler; + /** + * The clock used to compute lastSendTime. + */ + private final Clock clock = Clock.systemUTC(); + + /** The last time something was sent on this web socket */ + private Instant lastSendTime = Instant.MIN; + + /** The recurring task to send pings on the connection, if needed. */ + private ScheduledFuture pinger = null; + /** * Initializes a new {@link ColibriWebSocket} instance. */ @@ -72,6 +87,14 @@ public void onWebSocketConnect(Session sess) { super.onWebSocketConnect(sess); + if (WebsocketServiceConfig.config.getSendKeepalivePings()) + { + pinger = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(this::maybeSendPing, + WebsocketServiceConfig.config.getKeepalivePingInterval().toMillis(), + WebsocketServiceConfig.config.getKeepalivePingInterval().toMillis(), + TimeUnit.MILLISECONDS); + } + eventHandler.webSocketConnected(this); } @@ -82,6 +105,52 @@ public void onWebSocketConnect(Session sess) public void onWebSocketClose(int statusCode, String reason) { eventHandler.webSocketClosed(this, statusCode, reason); + if (pinger != null) + { + pinger.cancel(true); + } + } + + public void sendString(String message) + { + RemoteEndpoint remote = getRemote(); + if (remote != null) + { + // We'll use the async version of sendString since this may be called + // from multiple threads. It's just fire-and-forget though, so we + // don't wait on the result + + remote.sendString(message, WriteCallback.NOOP); + synchronized (this) + { + lastSendTime = clock.instant(); + } + } + } + + private void maybeSendPing() + { + try + { + Instant now = clock.instant(); + synchronized (this) + { + if (Duration.between(lastSendTime, now). + compareTo(WebsocketServiceConfig.config.getKeepalivePingInterval()) < 0) + { + RemoteEndpoint remote = getRemote(); + if (remote != null) + { + remote.sendPing(ByteBuffer.allocate(0), WriteCallback.NOOP); + lastSendTime = clock.instant(); + } + } + } + } + catch (Exception e) + { + logger.error("Error sending websocket ping", e); + } } /** @@ -91,6 +160,10 @@ public void onWebSocketClose(int statusCode, String reason) public void onWebSocketError(Throwable cause) { eventHandler.webSocketError(this, cause); + if (pinger != null) + { + pinger.cancel(true); + } } public interface EventHandler diff --git a/jvb/src/main/java/org/jitsi/videobridge/websocket/ColibriWebSocketServlet.java b/jvb/src/main/java/org/jitsi/videobridge/websocket/ColibriWebSocketServlet.java index 50a4a48b2e..153f5f48ac 100644 --- a/jvb/src/main/java/org/jitsi/videobridge/websocket/ColibriWebSocketServlet.java +++ b/jvb/src/main/java/org/jitsi/videobridge/websocket/ColibriWebSocketServlet.java @@ -22,6 +22,8 @@ import org.jitsi.utils.logging2.*; import org.jitsi.videobridge.*; import org.jitsi.videobridge.relay.*; +import org.jitsi.videobridge.websocket.config.*; + import static org.jitsi.videobridge.websocket.config.WebsocketServiceConfig.config; import java.io.*; @@ -67,7 +69,7 @@ class ColibriWebSocketServlet public void configure(JettyWebSocketServletFactory webSocketServletFactory) { // set a timeout of 1min - webSocketServletFactory.setIdleTimeout(Duration.ofMinutes(1)); + webSocketServletFactory.setIdleTimeout(WebsocketServiceConfig.config.getIdleTimeout()); webSocketServletFactory.setCreator((request, response) -> { diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/RelayMessageTransport.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/RelayMessageTransport.kt index 520f048b05..d3024b6d6c 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/RelayMessageTransport.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/RelayMessageTransport.kt @@ -15,7 +15,6 @@ */ package org.jitsi.videobridge.relay -import org.eclipse.jetty.websocket.api.WriteCallback import org.eclipse.jetty.websocket.client.ClientUpgradeRequest import org.eclipse.jetty.websocket.client.WebSocketClient import org.eclipse.jetty.websocket.core.CloseStatus @@ -37,6 +36,7 @@ import org.jitsi.videobridge.message.ServerHelloMessage import org.jitsi.videobridge.message.SourceVideoTypeMessage import org.jitsi.videobridge.message.VideoTypeMessage import org.jitsi.videobridge.websocket.ColibriWebSocket +import org.jitsi.videobridge.websocket.config.WebsocketServiceConfig import org.json.simple.JSONObject import java.lang.ref.WeakReference import java.net.URI @@ -229,10 +229,7 @@ class RelayMessageTransport( * @param message the message to send. */ private fun sendMessage(dst: ColibriWebSocket, message: BridgeChannelMessage) { - // We'll use the async version of sendString since this may be called - // from multiple threads. It's just fire-and-forget though, so we - // don't wait on the result - dst.remote?.sendString(message.toJson(), WriteCallback.Adaptor()) + dst.sendString(message.toJson()) statisticsSupplier.get().colibriWebSocketMessagesSent.inc() } @@ -496,7 +493,10 @@ class RelayMessageTransport( /** * The single [WebSocketClient] instance that all [Relay]s use to initiate a web socket connection. */ - val webSocketClient = WebSocketClient().apply { start() } + val webSocketClient = WebSocketClient().apply { + idleTimeout = WebsocketServiceConfig.config.idleTimeout + start() + } /** * Reason to use when closing a WS due to the relay being expired. diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/websocket/config/WebsocketServiceConfig.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/websocket/config/WebsocketServiceConfig.kt index fffe083572..e5e1a0c9d7 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/websocket/config/WebsocketServiceConfig.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/websocket/config/WebsocketServiceConfig.kt @@ -19,6 +19,7 @@ package org.jitsi.videobridge.websocket.config import org.jitsi.config.JitsiConfig import org.jitsi.metaconfig.config import org.jitsi.metaconfig.optionalconfig +import java.time.Duration class WebsocketServiceConfig private constructor() { /** @@ -113,6 +114,21 @@ class WebsocketServiceConfig private constructor() { } } + /** Whether keepalive pings are enabled */ + val sendKeepalivePings: Boolean by config { + "videobridge.websockets.send-keepalive-pings".from(JitsiConfig.newConfig) + } + + /** The time interval for keepalive pings */ + val keepalivePingInterval: Duration by config { + "videobridge.websockets.keepalive-ping-interval".from(JitsiConfig.newConfig) + } + + /** The time interval for websocket timeouts */ + val idleTimeout: Duration by config { + "videobridge.websockets.idle-timeout".from(JitsiConfig.newConfig) + } + companion object { @JvmField val config = WebsocketServiceConfig() diff --git a/jvb/src/main/resources/reference.conf b/jvb/src/main/resources/reference.conf index 5904c1f62f..cb49a9d49a 100644 --- a/jvb/src/main/resources/reference.conf +++ b/jvb/src/main/resources/reference.conf @@ -225,6 +225,12 @@ videobridge { server-id = "default-id" // Whether to negotiate WebSocket compression (permessage-deflate) enable-compression = true + // Whether to send keepalive pings on idle websockets + send-keepalive-pings = true + // The keepalive ping interval + keepalive-ping-interval = 15 seconds + // The websocket idle timeout + idle-timeout = 60 seconds // Optional, even when 'enabled' is set to true #tls = true