Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and Cover Eager Conflict Resolution Deadlock #674

Open
wants to merge 21 commits into
base: matt-ramotar/turbine
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions store/api/android/store.api
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ public final class org/mobilenativefoundation/store/store5/FetcherResult$Error$M
public fun toString ()Ljava/lang/String;
}

public abstract interface class org/mobilenativefoundation/store/store5/Logger {
public abstract fun debug (Ljava/lang/String;)V
public abstract fun error (Ljava/lang/String;Ljava/lang/Throwable;)V
}

public final class org/mobilenativefoundation/store/store5/Logger$DefaultImpls {
public static synthetic fun error$default (Lorg/mobilenativefoundation/store/store5/Logger;Ljava/lang/String;Ljava/lang/Throwable;ILjava/lang/Object;)V
}

public final class org/mobilenativefoundation/store/store5/MemoryPolicy {
public static final field Companion Lorg/mobilenativefoundation/store/store5/MemoryPolicy$Companion;
public static final field DEFAULT_SIZE_POLICY J
Expand Down
9 changes: 9 additions & 0 deletions store/api/jvm/store.api
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ public final class org/mobilenativefoundation/store/store5/FetcherResult$Error$M
public fun toString ()Ljava/lang/String;
}

public abstract interface class org/mobilenativefoundation/store/store5/Logger {
public abstract fun debug (Ljava/lang/String;)V
public abstract fun error (Ljava/lang/String;Ljava/lang/Throwable;)V
}

public final class org/mobilenativefoundation/store/store5/Logger$DefaultImpls {
public static synthetic fun error$default (Lorg/mobilenativefoundation/store/store5/Logger;Ljava/lang/String;Ljava/lang/Throwable;ILjava/lang/Object;)V
}

public final class org/mobilenativefoundation/store/store5/MemoryPolicy {
public static final field Companion Lorg/mobilenativefoundation/store/store5/MemoryPolicy$Companion;
public static final field DEFAULT_SIZE_POLICY J
Expand Down
4 changes: 1 addition & 3 deletions store/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import org.gradle.internal.impldep.org.testng.reporters.XMLUtils.xml


plugins {
id("org.mobilenativefoundation.store.multiplatform")
alias(libs.plugins.kover)
id("dev.mokkery") version "2.5.1"
}

kotlin {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.mobilenativefoundation.store.store5

/**
* A simple logging interface for logging error and debug messages.
*/
interface Logger {
/**
* Logs an error message, optionally with a throwable.
*
* @param message The error message to log.
* @param throwable An optional [Throwable] associated with the error.
*/
fun error(
message: String,
throwable: Throwable? = null,
)

/**
* Logs a debug message.
*
* @param message The debug message to log.
*/
fun debug(message: String)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.mobilenativefoundation.store.store5.impl

import co.touchlab.kermit.CommonWriter
import org.mobilenativefoundation.store.store5.Logger

/**
* Default implementation of [Logger] using the Kermit logging library.
*/
internal class DefaultLogger : Logger {
private val delegate =
co.touchlab.kermit.Logger.apply {
setLogWriters(listOf(CommonWriter()))
setTag("Store")
}

override fun debug(message: String) {
delegate.d(message)
}

override fun error(
message: String,
throwable: Throwable?,
) {
delegate.e(message, throwable)

Check warning on line 24 in store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/DefaultLogger.kt

View check run for this annotation

Codecov / codecov/patch

store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/DefaultLogger.kt#L24

Added line #L24 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

package org.mobilenativefoundation.store.store5.impl

import co.touchlab.kermit.CommonWriter
import co.touchlab.kermit.Logger
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
Expand All @@ -14,6 +12,7 @@
import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.Bookkeeper
import org.mobilenativefoundation.store.store5.Clear
import org.mobilenativefoundation.store.store5.Logger
import org.mobilenativefoundation.store.store5.MutableStore
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
Expand All @@ -27,11 +26,12 @@
import org.mobilenativefoundation.store.store5.internal.definition.WriteRequestQueue
import org.mobilenativefoundation.store.store5.internal.result.EagerConflictResolutionResult

@ExperimentalStoreApi
@OptIn(ExperimentalStoreApi::class)
internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local : Any>(
private val delegate: RealStore<Key, Network, Output, Local>,
private val updater: Updater<Key, Output, *>,
private val bookkeeper: Bookkeeper<Key>?,
private val logger: Logger = DefaultLogger(),
) : MutableStore<Key, Output>, Clear.Key<Key> by delegate, Clear.All by delegate {
private val storeLock = Mutex()
private val keyToWriteRequestQueue = mutableMapOf<Key, WriteRequestQueue<Key, Output, *>>()
Expand All @@ -42,20 +42,29 @@
safeInitStore(request.key)

when (val eagerConflictResolutionResult = tryEagerlyResolveConflicts<Response>(request.key)) {
// TODO(matt-ramotar): Many use cases will not want to pull immediately after failing
// to push local changes. We should enable configuration of conflict resolution strategies,
// such as logging, retrying, canceling.

is EagerConflictResolutionResult.Error.Exception -> {
logger.e(eagerConflictResolutionResult.error.toString())
logger.error(eagerConflictResolutionResult.error.toString())
}

is EagerConflictResolutionResult.Error.Message -> {
logger.e(eagerConflictResolutionResult.message)
logger.error(eagerConflictResolutionResult.message)
}

is EagerConflictResolutionResult.Success.ConflictsResolved -> {
logger.d(eagerConflictResolutionResult.value.toString())
val message =
when (val result = eagerConflictResolutionResult.value) {
is UpdaterResult.Success.Typed<*> -> result.value.toString()
is UpdaterResult.Success.Untyped -> result.value.toString()

Check warning on line 61 in store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt

View check run for this annotation

Codecov / codecov/patch

store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt#L61

Added line #L61 was not covered by tests
}
logger.debug(message)
}

EagerConflictResolutionResult.Success.NoConflicts -> {
logger.d(eagerConflictResolutionResult.toString())
logger.debug("No conflicts.")
}
}

Expand Down Expand Up @@ -220,57 +229,45 @@
}

@AnyThread
private suspend fun <Response : Any> tryEagerlyResolveConflicts(key: Key): EagerConflictResolutionResult<Response> =
withThreadSafety(key) {
val latest = delegate.latestOrNull(key)
when {
latest == null || bookkeeper == null || conflictsMightExist(key).not() -> EagerConflictResolutionResult.Success.NoConflicts
else -> {
try {
val updaterResult =
updater.post(key, latest).also { updaterResult ->
if (updaterResult is UpdaterResult.Success) {
updateWriteRequestQueue<Response>(key = key, created = now(), updaterResult = updaterResult)
}
}
private suspend fun <Response : Any> tryEagerlyResolveConflicts(key: Key): EagerConflictResolutionResult<Response> {
val (latest, conflictsExist) =
withThreadSafety(key) {
val latest = delegate.latestOrNull(key)
val conflictsExist = latest != null && bookkeeper != null && conflictsMightExist(key)
latest to conflictsExist
}

when (updaterResult) {
is UpdaterResult.Error.Exception -> EagerConflictResolutionResult.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> EagerConflictResolutionResult.Error.Message(updaterResult.message)
is UpdaterResult.Success -> EagerConflictResolutionResult.Success.ConflictsResolved(updaterResult)
}
} catch (throwable: Throwable) {
EagerConflictResolutionResult.Error.Exception(throwable)
if (!conflictsExist || latest == null) {
return EagerConflictResolutionResult.Success.NoConflicts
}

return try {
val updaterResult =
updater.post(key, latest).also { updaterResult ->
if (updaterResult is UpdaterResult.Success) {
updateWriteRequestQueue<Response>(key = key, created = now(), updaterResult = updaterResult)
bookkeeper?.clear(key)
}
}
}
}

private suspend fun safeInitWriteRequestQueue(key: Key) =
withThreadSafety(key) {
if (keyToWriteRequestQueue[key] == null) {
keyToWriteRequestQueue[key] = ArrayDeque()
when (updaterResult) {
is UpdaterResult.Error.Exception -> EagerConflictResolutionResult.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> EagerConflictResolutionResult.Error.Message(updaterResult.message)
is UpdaterResult.Success -> EagerConflictResolutionResult.Success.ConflictsResolved(updaterResult)
}
} catch (throwable: Throwable) {
EagerConflictResolutionResult.Error.Exception(throwable)
}
}

private suspend fun safeInitThreadSafety(key: Key) =
private suspend fun safeInitStore(key: Key) {
storeLock.withLock {
if (keyToThreadSafety[key] == null) {
keyToThreadSafety[key] = ThreadSafety()
}
}

private suspend fun safeInitStore(key: Key) {
safeInitThreadSafety(key)
safeInitWriteRequestQueue(key)
}

companion object {
private val logger =
Logger.apply {
setLogWriters(listOf(CommonWriter()))
setTag("Store")
if (keyToWriteRequestQueue[key] == null) {
keyToWriteRequestQueue[key] = ArrayDeque()
}
private const val UNKNOWN_ERROR = "Unknown error occurred"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.mobilenativefoundation.store.store5.impl.operators.Either
import org.mobilenativefoundation.store.store5.impl.operators.merge
import org.mobilenativefoundation.store.store5.internal.result.StoreDelegateWriteResult

internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
internal open class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
scope: CoroutineScope,
fetcher: Fetcher<Key, Network>,
sourceOfTruth: SourceOfTruth<Key, Local, Output>? = null,
Expand Down Expand Up @@ -327,7 +327,7 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
}
}

internal suspend fun write(
internal open suspend fun write(
key: Key,
value: Output,
): StoreDelegateWriteResult =
Expand All @@ -339,7 +339,7 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
StoreDelegateWriteResult.Error.Exception(error)
}

internal suspend fun latestOrNull(key: Key): Output? = fromMemCache(key) ?: fromSourceOfTruth(key)
internal open suspend fun latestOrNull(key: Key): Output? = fromMemCache(key) ?: fromSourceOfTruth(key)

private suspend fun fromSourceOfTruth(key: Key) =
sourceOfTruth?.reader(key, CompletableDeferred(Unit))?.map { it.dataOrNull() }?.first()
Expand Down
Loading