Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync schema migration #1789

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b53d16a
Disable workaround
clementetb Jun 18, 2024
f345396
Clean up app initializer infrastructure
rorbech Jun 19, 2024
d392541
Merge branch 'cr/legacy-mixed-test' into ct/core_7297
clementetb Jun 20, 2024
1187d45
Fix email auth test app setup
rorbech Jun 24, 2024
bcccd47
Merge branch 'main' into cr/legacy-mixed-test
rorbech Jun 24, 2024
8194c98
Add CHANGELOG entry
rorbech Jun 24, 2024
15637af
Fix ktor test app setup
rorbech Jun 24, 2024
c086bda
Working
clementetb Jun 25, 2024
13ad081
Some improvements
clementetb Jun 25, 2024
c779eb7
Passing
clementetb Jun 25, 2024
7283eab
Merge branch 'main' into cr/legacy-mixed-test
rorbech Jun 26, 2024
6412783
Fix app initializer for collection in mixed round trip tests
rorbech Jun 26, 2024
fda7ba3
Fix test app initializers for function and http obfuscator tests
rorbech Jun 26, 2024
aa00ccb
Clean up
clementetb Jun 26, 2024
11b3816
More test app initializer adjustments
rorbech Jun 26, 2024
ed2c8f4
Merge commit '11b38162c40ad379f6dc88675312fd9d3a1dd254' into ct/core_…
clementetb Jun 26, 2024
f83118f
Clean up
clementetb Jun 27, 2024
30dcad6
Make collection in mixed round trip tests repeatable
rorbech Jun 27, 2024
600faa4
Merge branch 'cr/legacy-mixed-test' into ct/core_7297
clementetb Jun 27, 2024
ce040e7
Add missing dependency
clementetb Jun 27, 2024
fa96c03
Add sync migrations with basic test cases
clementetb Jun 28, 2024
e5fa6a5
Merge commit 'ce040e7fe635c7dc2afd2942ef092579a7695e20' into ct/sync_…
clementetb Jun 28, 2024
bfb4559
cleanup
clementetb Jun 28, 2024
063f42b
Add test cases
clementetb Jul 1, 2024
7a77c2c
linting
clementetb Jul 1, 2024
ea460b4
Use native method to retrieve persisted schema version
clementetb Jul 2, 2024
2a2f0da
Point to the right core commit
clementetb Jul 2, 2024
739cfe2
Merge branch 'main' into ct/sync_migration
clementetb Jul 8, 2024
28e44c7
Bump core branch
clementetb Jul 14, 2024
2a06e6e
Update test case names
clementetb Jul 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ object CoreErrorConverter {
return userError ?: when {
ErrorCode.RLM_ERR_INDEX_OUT_OF_BOUNDS == errorCode ->
IndexOutOfBoundsException(message)
ErrorCode.RLM_ERR_INVALID_SCHEMA_VERSION == errorCode ->
InvalidSchemaVersionException(message)
ErrorCategory.RLM_ERR_CAT_INVALID_ARG in categories && ErrorCategory.RLM_ERR_CAT_SYNC_ERROR !in categories -> {
// Some sync errors flagged as both logical and illegal. In our case, we consider those
// IllegalState, so discard them them here and let them fall through to the bottom case
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024 Realm Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.realm.kotlin.internal.interop

/**
* Exception thrown when there is a mismatch between the schema version defined in the configuration
* and the persisted one.
*/
class InvalidSchemaVersionException(override val message: String?) : IllegalStateException()
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ expect object RealmInterop {
// dispatcher. The realm itself must also be opened on the same thread
fun realm_open(config: RealmConfigurationPointer, scheduler: RealmSchedulerPointer): Pair<LiveRealmPointer, Boolean>

fun realm_open(config: RealmConfigurationPointer): LiveRealmPointer

// Opening a Realm asynchronously. Only supported for synchronized realms.
fun realm_open_synchronized(config: RealmConfigurationPointer): RealmAsyncOpenTaskPointer
fun realm_async_open_task_start(task: RealmAsyncOpenTaskPointer, callback: AsyncOpenCallback)
Expand Down Expand Up @@ -647,6 +649,10 @@ expect object RealmInterop {
fun realm_sync_client_config_set_pong_keepalive_timeout(syncClientConfig: RealmSyncClientConfigurationPointer, timeoutMs: ULong)
fun realm_sync_client_config_set_fast_reconnect_limit(syncClientConfig: RealmSyncClientConfigurationPointer, timeoutMs: ULong)

fun realm_get_persisted_schema_version(
config: RealmConfigurationPointer
): Long

fun realm_sync_config_new(
user: RealmUserPointer,
partition: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ actual object RealmInterop {
return Pair(realmPtr, fileCreated)
}

actual fun realm_open(
config: RealmConfigurationPointer,
): LiveRealmPointer {
val realmPtr = LongPointerWrapper<LiveRealmT>(realmc.realm_open(config.cptr()))
return realmPtr
}

actual fun realm_open_synchronized(config: RealmConfigurationPointer): RealmAsyncOpenTaskPointer {
return LongPointerWrapper(realmc.realm_open_synchronized(config.cptr()))
}
Expand Down Expand Up @@ -1438,6 +1445,10 @@ actual object RealmInterop {
realmc.realm_sync_client_config_set_fast_reconnect_limit(syncClientConfig.cptr(), timeoutMs.toLong())
}

actual fun realm_get_persisted_schema_version(
config: RealmConfigurationPointer
): Long = realmc.realm_get_persisted_schema_version(config.cptr())

actual fun realm_network_transport_new(networkTransport: NetworkTransport): RealmNetworkTransportPointer {
return LongPointerWrapper(realmc.realm_network_transport_new(networkTransport))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ actual enum class ErrorCode(

actual companion object {
actual fun of(nativeValue: Int): ErrorCode? =
values().firstOrNull { value ->
entries.firstOrNull { value ->
value.nativeValue == nativeValue
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,11 @@ actual object RealmInterop {
return Pair(realmPtr, fileCreated.value)
}

actual fun realm_open(config: RealmConfigurationPointer): LiveRealmPointer {
val realmPtr = CPointerWrapper<LiveRealmT>(realm_wrapper.realm_open(config.cptr()))
return realmPtr
}

actual fun realm_create_scheduler(): RealmSchedulerPointer {
// If there is no notification dispatcher use the default scheduler.
// Re-verify if this is actually needed when notification scheduler is fully in place.
Expand Down Expand Up @@ -2607,6 +2612,10 @@ actual object RealmInterop {
realm_wrapper.realm_sync_client_config_set_fast_reconnect_limit(syncClientConfig.cptr(), timeoutMs)
}

actual fun realm_get_persisted_schema_version(
config: RealmConfigurationPointer
): Long = realm_wrapper.realm_get_persisted_schema_version(config.cptr()).toLong()

actual fun realm_sync_config_set_error_handler(
syncConfig: RealmSyncConfigurationPointer,
errorHandler: SyncErrorCallback
Expand Down
2 changes: 1 addition & 1 deletion packages/external/core
Submodule core updated 258 files
Original file line number Diff line number Diff line change
Expand Up @@ -283,18 +283,6 @@ public interface Configuration {
this.writeDispatcher = dispatcher
} as S

/**
* Sets the schema version of the Realm. This must be equal to or higher than the schema
* version of the existing Realm file, if any. If the schema version is higher than the
* already existing Realm, a migration is needed.
*/
public fun schemaVersion(schemaVersion: Long): S {
if (schemaVersion < 0) {
throw IllegalArgumentException("Realm schema version numbers must be 0 (zero) or higher. Yours was: $schemaVersion")
}
return apply { this.schemaVersion = schemaVersion } as S
}

/**
* Sets the 64 byte key used to encrypt and decrypt the Realm file. If no key is provided
* the Realm file will be unencrypted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ public interface RealmConfiguration : Configuration {
this.automaticEmbeddedObjectConstraintsResolution = resolveEmbeddedObjectConstraints
}

/**
* Sets the schema version of the Realm. This must be equal to or higher than the schema
* version of the existing Realm file, if any. If the schema version is higher than the
* already existing Realm, a migration is needed.
*/
public fun schemaVersion(schemaVersion: Long): Builder {
if (schemaVersion < 0) {
throw IllegalArgumentException("Realm schema version numbers must be 0 (zero) or higher. Yours was: $schemaVersion")
}
return apply { this.schemaVersion = schemaVersion }
}

override fun name(name: String): Builder = apply {
checkName(name)
this.name = name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ internal class SubscriptionSetImpl<T : BaseRealm>(
channel.trySend(false)
}
else -> {
println("STATE $state")
// Ignore all other states, wait for either complete or error.
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import io.realm.kotlin.mongodb.sync.RecoverOrDiscardUnsyncedChangesStrategy
import io.realm.kotlin.mongodb.sync.RecoverUnsyncedChangesStrategy
import io.realm.kotlin.mongodb.sync.SyncClientResetStrategy
import io.realm.kotlin.mongodb.sync.SyncConfiguration
import io.realm.kotlin.mongodb.sync.SyncMigrationRemoteDataConfiguration
import io.realm.kotlin.mongodb.sync.SyncMode
import io.realm.kotlin.mongodb.sync.SyncSession
import kotlinx.atomicfu.AtomicBoolean
Expand All @@ -69,7 +70,8 @@ internal class SyncConfigurationImpl(
override val errorHandler: SyncSession.ErrorHandler,
override val syncClientResetStrategy: SyncClientResetStrategy,
override val initialSubscriptions: InitialSubscriptionsConfiguration?,
override val initialRemoteData: InitialRemoteDataConfiguration?
override val initialRemoteData: InitialRemoteDataConfiguration?,
override val schemaMigrationRemoteData: SyncMigrationRemoteDataConfiguration?,
) : InternalConfiguration by configuration, SyncConfiguration {

override suspend fun openRealm(realm: RealmImpl): Pair<FrozenRealmReference, Boolean> {
Expand All @@ -84,14 +86,28 @@ internal class SyncConfigurationImpl(
// unnecessary pressure on the server.
val fileExists: Boolean = fileExists(configuration.path)
val asyncOpenCreatedRealmFile: AtomicBoolean = atomic(false)
if (initialRemoteData != null && !fileExists) {

val configPtr = createNativeConfiguration()

if (
(!fileExists && initialRemoteData != null) ||
(fileExists && isSyncMigrationPending(configPtr))
) {
// There are two different timeout:
// - initial remote data timeout, when it is the first time we open the Realm.
// - schema migration timeout, when a sync schema migration is required.
val timeout = if (fileExists)
schemaMigrationRemoteData!!.timeout
else
initialRemoteData!!.timeout

// Channel to work around not being able to use `suspendCoroutine` to wrap the callback, as
// that results in the `Continuation` being frozen, which breaks it.
val channel = Channel<Any>(1)
val taskPointer: AtomicRef<RealmAsyncOpenTaskPointer?> = atomic(null)
try {
val result: Any = withTimeout(initialRemoteData.timeout.inWholeMilliseconds) {
withContext(realm.notificationScheduler.dispatcher) {
val result: Any = withTimeout(timeout.inWholeMilliseconds) {
withContext(realm.writeScheduler.dispatcher) {
val callback = AsyncOpenCallback { error: Throwable? ->
if (error != null) {
channel.trySend(error)
Expand All @@ -100,7 +116,6 @@ internal class SyncConfigurationImpl(
}
}

val configPtr = createNativeConfiguration()
taskPointer.value = RealmInterop.realm_open_synchronized(configPtr)
RealmInterop.realm_async_open_task_start(taskPointer.value!!, callback)
channel.receive()
Expand All @@ -111,6 +126,7 @@ internal class SyncConfigurationImpl(
// Track whether or not async open created the file.
asyncOpenCreatedRealmFile.value = true
}

is Throwable -> throw result
else -> throw IllegalStateException("Unexpected value: $result")
}
Expand Down Expand Up @@ -138,6 +154,17 @@ internal class SyncConfigurationImpl(
return Pair(result.first, result.second || asyncOpenCreatedRealmFile.value)
}

/**
* Checks whether a sync Realm requires a migration, this happens when the schema version provided in
* the config differs from the persisted one.
*/
internal fun isSyncMigrationPending(configPtr: RealmConfigurationPointer): Boolean =
if (fileExists(configuration.path)) {
RealmInterop.realm_get_persisted_schema_version(configPtr) != configuration.schemaVersion
} else {
false
}

override suspend fun initializeRealmData(realm: RealmImpl, realmFileCreated: Boolean) {
// Create or update subscriptions for Flexible Sync realms as needed.
initialSubscriptions?.let { initialSubscriptionsConfig ->
Expand Down Expand Up @@ -173,7 +200,7 @@ internal class SyncConfigurationImpl(
return syncInitializer(ptr)
}

private val syncInitializer: (RealmConfigurationPointer) -> RealmConfigurationPointer
private var syncInitializer: (RealmConfigurationPointer) -> RealmConfigurationPointer

init {
// We need to freeze `errorHandler` reference on initial thread
Expand All @@ -184,12 +211,16 @@ internal class SyncConfigurationImpl(
val initializerHelper = when (resetStrategy) {
is DiscardUnsyncedChangesStrategy ->
DiscardUnsyncedChangesHelper(resetStrategy, configuration)

is ManuallyRecoverUnsyncedChangesStrategy ->
ManuallyRecoverUnsyncedChangesHelper(resetStrategy)

is RecoverUnsyncedChangesStrategy ->
RecoverUnsyncedChangesHelper(resetStrategy, configuration)

is RecoverOrDiscardUnsyncedChangesStrategy ->
RecoverOrDiscardUnsyncedChangesHelper(resetStrategy, configuration)

else -> throw IllegalArgumentException("Unsupported client reset strategy: $resetStrategy")
}

Expand Down Expand Up @@ -263,7 +294,7 @@ private interface ClientResetStrategyHelper {

private abstract class OnBeforeOnAfterHelper<T : SyncClientResetStrategy> constructor(
val strategy: T,
val configuration: InternalConfiguration
val configuration: InternalConfiguration,
) : ClientResetStrategyHelper {

abstract fun getResyncMode(): SyncSessionResyncMode
Expand All @@ -285,7 +316,7 @@ private abstract class OnBeforeOnAfterHelper<T : SyncClientResetStrategy> constr

private class RecoverOrDiscardUnsyncedChangesHelper constructor(
strategy: RecoverOrDiscardUnsyncedChangesStrategy,
configuration: InternalConfiguration
configuration: InternalConfiguration,
) : OnBeforeOnAfterHelper<RecoverOrDiscardUnsyncedChangesStrategy>(strategy, configuration) {

override fun getResyncMode(): SyncSessionResyncMode =
Expand All @@ -303,7 +334,7 @@ private class RecoverOrDiscardUnsyncedChangesHelper constructor(
override fun onAfterReset(
realmBefore: FrozenRealmPointer,
realmAfter: LiveRealmPointer,
didRecover: Boolean
didRecover: Boolean,
) {
// Needed to allow writes on the Mutable after Realm
RealmInterop.realm_begin_write(realmAfter)
Expand Down Expand Up @@ -338,7 +369,7 @@ private class RecoverOrDiscardUnsyncedChangesHelper constructor(
override fun onSyncError(
session: SyncSession,
appPointer: RealmAppPointer,
error: SyncError
error: SyncError,
) {
// If there is a user exception we appoint it as the cause of the client reset
strategy.onManualResetFallback(
Expand All @@ -350,7 +381,7 @@ private class RecoverOrDiscardUnsyncedChangesHelper constructor(

private class RecoverUnsyncedChangesHelper constructor(
strategy: RecoverUnsyncedChangesStrategy,
configuration: InternalConfiguration
configuration: InternalConfiguration,
) : OnBeforeOnAfterHelper<RecoverUnsyncedChangesStrategy>(strategy, configuration) {

override fun getResyncMode(): SyncSessionResyncMode =
Expand All @@ -368,7 +399,7 @@ private class RecoverUnsyncedChangesHelper constructor(
override fun onAfterReset(
realmBefore: FrozenRealmPointer,
realmAfter: LiveRealmPointer,
didRecover: Boolean
didRecover: Boolean,
) {
// Needed to allow writes on the Mutable after Realm
RealmInterop.realm_begin_write(realmAfter)
Expand Down Expand Up @@ -400,7 +431,7 @@ private class RecoverUnsyncedChangesHelper constructor(
override fun onSyncError(
session: SyncSession,
appPointer: RealmAppPointer,
error: SyncError
error: SyncError,
) {
// If there is a user exception we appoint it as the cause of the client reset
strategy.onManualResetFallback(
Expand All @@ -412,7 +443,7 @@ private class RecoverUnsyncedChangesHelper constructor(

private class DiscardUnsyncedChangesHelper constructor(
strategy: DiscardUnsyncedChangesStrategy,
configuration: InternalConfiguration
configuration: InternalConfiguration,
) : OnBeforeOnAfterHelper<DiscardUnsyncedChangesStrategy>(strategy, configuration) {

override fun getResyncMode(): SyncSessionResyncMode =
Expand All @@ -430,7 +461,7 @@ private class DiscardUnsyncedChangesHelper constructor(
override fun onAfterReset(
realmBefore: FrozenRealmPointer,
realmAfter: LiveRealmPointer,
didRecover: Boolean
didRecover: Boolean,
) {
// Needed to allow writes on the Mutable after Realm
RealmInterop.realm_begin_write(realmAfter)
Expand Down Expand Up @@ -462,7 +493,7 @@ private class DiscardUnsyncedChangesHelper constructor(
override fun onSyncError(
session: SyncSession,
appPointer: RealmAppPointer,
error: SyncError
error: SyncError,
) {
strategy.onManualResetFallback(
session,
Expand All @@ -472,7 +503,7 @@ private class DiscardUnsyncedChangesHelper constructor(
}

private class ManuallyRecoverUnsyncedChangesHelper(
val strategy: ManuallyRecoverUnsyncedChangesStrategy
val strategy: ManuallyRecoverUnsyncedChangesStrategy,
) : ClientResetStrategyHelper {

override fun initialize(nativeSyncConfig: RealmSyncConfigurationPointer) {
Expand All @@ -485,7 +516,7 @@ private class ManuallyRecoverUnsyncedChangesHelper(
override fun onSyncError(
session: SyncSession,
appPointer: RealmAppPointer,
error: SyncError
error: SyncError,
) {
strategy.onClientReset(
session,
Expand Down
Loading
Loading