Skip to content

Commit

Permalink
fix(realtime)!: serialize Realtime to MainActor
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Dec 2, 2024
1 parent fc406ea commit 75a4c2c
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 188 deletions.
5 changes: 3 additions & 2 deletions Sources/Realtime/V2/PushV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public enum PushStatus: String, Sendable {
case timeout
}

actor PushV2 {
@MainActor
final class PushV2 {
private weak var channel: RealtimeChannelV2?
let message: RealtimeMessageV2

Expand All @@ -40,7 +41,7 @@ actor PushV2 {
}

do {
return try await withTimeout(interval: channel.socket.options().timeoutInterval) {
return try await withTimeout(interval: channel.socket.options.timeoutInterval) {
await withCheckedContinuation { continuation in
self.receivedContinuation = continuation
}
Expand Down
113 changes: 31 additions & 82 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,59 +25,16 @@ public struct RealtimeChannelConfig: Sendable {
public var isPrivate: Bool
}

struct Socket: Sendable {
var broadcastURL: @Sendable () -> URL
var status: @Sendable () -> RealtimeClientStatus
var options: @Sendable () -> RealtimeClientOptions
var accessToken: @Sendable () async -> String?
var apiKey: @Sendable () -> String?
var makeRef: @Sendable () -> Int

var connect: @Sendable () async -> Void
var addChannel: @Sendable (_ channel: RealtimeChannelV2) -> Void
var removeChannel: @Sendable (_ channel: RealtimeChannelV2) async -> Void
var push: @Sendable (_ message: RealtimeMessageV2) async -> Void
var httpSend: @Sendable (_ request: Helpers.HTTPRequest) async throws -> Helpers.HTTPResponse
}

extension Socket {
init(client: RealtimeClientV2) {
self.init(
broadcastURL: { [weak client] in client?.broadcastURL ?? URL(string: "http://localhost")! },
status: { [weak client] in client?.status ?? .disconnected },
options: { [weak client] in client?.options ?? .init() },
accessToken: { [weak client] in
if let accessToken = try? await client?.options.accessToken?() {
return accessToken
}
return client?.mutableState.accessToken
},
apiKey: { [weak client] in client?.apikey },
makeRef: { [weak client] in client?.makeRef() ?? 0 },
connect: { [weak client] in await client?.connect() },
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) },
httpSend: { [weak client] in
try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse())
}
)
}
}

public final class RealtimeChannelV2: Sendable {
struct MutableState {
var clientChanges: [PostgresJoinConfig] = []
var joinRef: String?
var pushes: [String: PushV2] = [:]
}

private let mutableState = LockIsolated(MutableState())
@MainActor
public final class RealtimeChannelV2 {
var clientChanges: [PostgresJoinConfig] = []
var joinRef: String?
var pushes: [String: PushV2] = [:]

let topic: String
let config: RealtimeChannelConfig
let logger: (any SupabaseLogger)?
let socket: Socket
unowned var socket: RealtimeClientV2

let callbackManager = CallbackManager()
private let statusEventEmitter = EventEmitter<RealtimeChannelStatus>(initialEvent: .unsubscribed)
Expand Down Expand Up @@ -105,7 +62,7 @@ public final class RealtimeChannelV2: Sendable {
init(
topic: String,
config: RealtimeChannelConfig,
socket: Socket,
socket: RealtimeClientV2,
logger: (any SupabaseLogger)?
) {
self.topic = topic
Expand All @@ -120,8 +77,8 @@ public final class RealtimeChannelV2: Sendable {

/// Subscribes to the channel
public func subscribe() async {
if socket.status() != .connected {
if socket.options().connectOnSubscribe != true {
if socket.status != .connected {
if socket.options.connectOnSubscribe != true {
reportIssue(
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
)
Expand All @@ -138,7 +95,7 @@ public final class RealtimeChannelV2: Sendable {
let joinConfig = RealtimeJoinConfig(
broadcast: config.broadcast,
presence: config.presence,
postgresChanges: mutableState.clientChanges,
postgresChanges: clientChanges,
isPrivate: config.isPrivate
)

Expand All @@ -148,7 +105,7 @@ public final class RealtimeChannelV2: Sendable {
)

let joinRef = socket.makeRef().description
mutableState.withValue { $0.joinRef = joinRef }
self.joinRef = joinRef

logger?.debug("Subscribing to channel with body: \(joinConfig)")

Expand All @@ -159,7 +116,7 @@ public final class RealtimeChannelV2: Sendable {
)

do {
try await withTimeout(interval: socket.options().timeoutInterval) { [self] in
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
_ = await statusChange.first { @Sendable in $0 == .subscribed }
}
} catch {
Expand Down Expand Up @@ -215,17 +172,17 @@ public final class RealtimeChannelV2: Sendable {
}

var headers: HTTPFields = [.contentType: "application/json"]
if let apiKey = socket.apiKey() {
if let apiKey = socket.apikey {
headers[.apiKey] = apiKey
}
if let accessToken = await socket.accessToken() {
headers[.authorization] = "Bearer \(accessToken)"
}

let task = Task { [headers] in
_ = try? await socket.httpSend(
_ = try? await socket.http.send(
HTTPRequest(
url: socket.broadcastURL(),
url: socket.broadcastURL,
method: .post,
headers: headers,
body: JSONEncoder().encode(
Expand All @@ -245,7 +202,7 @@ public final class RealtimeChannelV2: Sendable {
}

if config.broadcast.acknowledgeBroadcasts {
try? await withTimeout(interval: socket.options().timeoutInterval) {
try? await withTimeout(interval: socket.options.timeoutInterval) {
await task.value
}
}
Expand Down Expand Up @@ -330,7 +287,7 @@ public final class RealtimeChannelV2: Sendable {
throw RealtimeError("Received a reply with unexpected payload: \(message)")
}

await didReceiveReply(ref: ref, status: status)
didReceiveReply(ref: ref, status: status)

if message.payload["response"]?.objectValue?.keys
.contains(ChannelEvent.postgresChanges) == true
Expand Down Expand Up @@ -536,9 +493,7 @@ public final class RealtimeChannelV2: Sendable {
filter: filter
)

mutableState.withValue {
$0.clientChanges.append(config)
}
clientChanges.append(config)

let id = callbackManager.addPostgresCallback(filter: config, callback: callback)
return RealtimeSubscription { [weak callbackManager, logger] in
Expand Down Expand Up @@ -579,30 +534,24 @@ public final class RealtimeChannelV2: Sendable {

@discardableResult
func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus {
let push = mutableState.withValue {
let message = RealtimeMessageV2(
joinRef: $0.joinRef,
ref: ref ?? socket.makeRef().description,
topic: self.topic,
event: event,
payload: payload
)

let push = PushV2(channel: self, message: message)
if let ref = message.ref {
$0.pushes[ref] = push
}
let message = RealtimeMessageV2(
joinRef: joinRef,
ref: ref ?? socket.makeRef().description,
topic: self.topic,
event: event,
payload: payload
)

return push
let push = PushV2(channel: self, message: message)
if let ref = message.ref {
pushes[ref] = push
}

return await push.send()
}

private func didReceiveReply(ref: String, status: String) async {
let push = mutableState.withValue {
$0.pushes.removeValue(forKey: ref)
}
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
private func didReceiveReply(ref: String, status: String) {
let push = pushes.removeValue(forKey: ref)
push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
}
}
Loading

0 comments on commit 75a4c2c

Please sign in to comment.