Skip to content

Commit

Permalink
Send pings periodically on idle websocket connections. (#2084)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonathanLennox authored Jan 24, 2024
1 parent 57abe58 commit 9d121e4
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,7 @@ private void sendMessage(DataChannel dst, BridgeChannelMessage message)
*/
private void sendMessage(ColibriWebSocket dst, BridgeChannelMessage message)
{
RemoteEndpoint remote = dst.getRemote();
if (remote != null)
{
// We'll use the async version of sendString since this may be called
// from multiple threads. It's just fire-and-forget though, so we
// don't wait on the result
remote.sendString(message.toJson(), new WriteCallback.Adaptor());
}
dst.sendString(message.toJson());
statisticsSupplier.get().colibriWebSocketMessagesSent.inc();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
package org.jitsi.videobridge.websocket;

import org.eclipse.jetty.websocket.api.*;
import org.jitsi.utils.collections.*;
import org.jitsi.utils.logging2.*;
import org.jitsi.videobridge.*;
import org.jitsi.videobridge.util.*;
import org.jitsi.videobridge.websocket.config.*;

import java.nio.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;

/**
* @author Boris Grozev
Expand All @@ -37,6 +41,17 @@ public class ColibriWebSocket extends WebSocketAdapter
*/
private final EventHandler eventHandler;

/**
* The clock used to compute lastSendTime.
*/
private final Clock clock = Clock.systemUTC();

/** The last time something was sent on this web socket */
private Instant lastSendTime = Instant.MIN;

/** The recurring task to send pings on the connection, if needed. */
private ScheduledFuture<?> pinger = null;

/**
* Initializes a new {@link ColibriWebSocket} instance.
*/
Expand Down Expand Up @@ -72,6 +87,14 @@ public void onWebSocketConnect(Session sess)
{
super.onWebSocketConnect(sess);

if (WebsocketServiceConfig.config.getSendKeepalivePings())
{
pinger = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(this::maybeSendPing,
WebsocketServiceConfig.config.getKeepalivePingInterval().toMillis(),
WebsocketServiceConfig.config.getKeepalivePingInterval().toMillis(),
TimeUnit.MILLISECONDS);
}

eventHandler.webSocketConnected(this);
}

Expand All @@ -82,6 +105,52 @@ public void onWebSocketConnect(Session sess)
public void onWebSocketClose(int statusCode, String reason)
{
eventHandler.webSocketClosed(this, statusCode, reason);
if (pinger != null)
{
pinger.cancel(true);
}
}

public void sendString(String message)
{
RemoteEndpoint remote = getRemote();
if (remote != null)
{
// We'll use the async version of sendString since this may be called
// from multiple threads. It's just fire-and-forget though, so we
// don't wait on the result

remote.sendString(message, WriteCallback.NOOP);
synchronized (this)
{
lastSendTime = clock.instant();
}
}
}

private void maybeSendPing()
{
try
{
Instant now = clock.instant();
synchronized (this)
{
if (Duration.between(lastSendTime, now).
compareTo(WebsocketServiceConfig.config.getKeepalivePingInterval()) < 0)
{
RemoteEndpoint remote = getRemote();
if (remote != null)
{
remote.sendPing(ByteBuffer.allocate(0), WriteCallback.NOOP);
lastSendTime = clock.instant();
}
}
}
}
catch (Exception e)
{
logger.error("Error sending websocket ping", e);
}
}

/**
Expand All @@ -91,6 +160,10 @@ public void onWebSocketClose(int statusCode, String reason)
public void onWebSocketError(Throwable cause)
{
eventHandler.webSocketError(this, cause);
if (pinger != null)
{
pinger.cancel(true);
}
}

public interface EventHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.jitsi.utils.logging2.*;
import org.jitsi.videobridge.*;
import org.jitsi.videobridge.relay.*;
import org.jitsi.videobridge.websocket.config.*;

import static org.jitsi.videobridge.websocket.config.WebsocketServiceConfig.config;

import java.io.*;
Expand Down Expand Up @@ -67,7 +69,7 @@ class ColibriWebSocketServlet
public void configure(JettyWebSocketServletFactory webSocketServletFactory)
{
// set a timeout of 1min
webSocketServletFactory.setIdleTimeout(Duration.ofMinutes(1));
webSocketServletFactory.setIdleTimeout(WebsocketServiceConfig.config.getIdleTimeout());

webSocketServletFactory.setCreator((request, response) ->
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.jitsi.videobridge.relay

import org.eclipse.jetty.websocket.api.WriteCallback
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest
import org.eclipse.jetty.websocket.client.WebSocketClient
import org.eclipse.jetty.websocket.core.CloseStatus
Expand All @@ -37,6 +36,7 @@ import org.jitsi.videobridge.message.ServerHelloMessage
import org.jitsi.videobridge.message.SourceVideoTypeMessage
import org.jitsi.videobridge.message.VideoTypeMessage
import org.jitsi.videobridge.websocket.ColibriWebSocket
import org.jitsi.videobridge.websocket.config.WebsocketServiceConfig
import org.json.simple.JSONObject
import java.lang.ref.WeakReference
import java.net.URI
Expand Down Expand Up @@ -229,10 +229,7 @@ class RelayMessageTransport(
* @param message the message to send.
*/
private fun sendMessage(dst: ColibriWebSocket, message: BridgeChannelMessage) {
// We'll use the async version of sendString since this may be called
// from multiple threads. It's just fire-and-forget though, so we
// don't wait on the result
dst.remote?.sendString(message.toJson(), WriteCallback.Adaptor())
dst.sendString(message.toJson())
statisticsSupplier.get().colibriWebSocketMessagesSent.inc()
}

Expand Down Expand Up @@ -496,7 +493,10 @@ class RelayMessageTransport(
/**
* The single [WebSocketClient] instance that all [Relay]s use to initiate a web socket connection.
*/
val webSocketClient = WebSocketClient().apply { start() }
val webSocketClient = WebSocketClient().apply {
idleTimeout = WebsocketServiceConfig.config.idleTimeout
start()
}

/**
* Reason to use when closing a WS due to the relay being expired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.jitsi.videobridge.websocket.config
import org.jitsi.config.JitsiConfig
import org.jitsi.metaconfig.config
import org.jitsi.metaconfig.optionalconfig
import java.time.Duration

class WebsocketServiceConfig private constructor() {
/**
Expand Down Expand Up @@ -113,6 +114,21 @@ class WebsocketServiceConfig private constructor() {
}
}

/** Whether keepalive pings are enabled */
val sendKeepalivePings: Boolean by config {
"videobridge.websockets.send-keepalive-pings".from(JitsiConfig.newConfig)
}

/** The time interval for keepalive pings */
val keepalivePingInterval: Duration by config {
"videobridge.websockets.keepalive-ping-interval".from(JitsiConfig.newConfig)
}

/** The time interval for websocket timeouts */
val idleTimeout: Duration by config {
"videobridge.websockets.idle-timeout".from(JitsiConfig.newConfig)
}

companion object {
@JvmField
val config = WebsocketServiceConfig()
Expand Down
6 changes: 6 additions & 0 deletions jvb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ videobridge {
server-id = "default-id"
// Whether to negotiate WebSocket compression (permessage-deflate)
enable-compression = true
// Whether to send keepalive pings on idle websockets
send-keepalive-pings = true
// The keepalive ping interval
keepalive-ping-interval = 15 seconds
// The websocket idle timeout
idle-timeout = 60 seconds

// Optional, even when 'enabled' is set to true
#tls = true
Expand Down

0 comments on commit 9d121e4

Please sign in to comment.