Skip to content

Commit

Permalink
Merge pull request #843 from vector-im/feature/fga/coroutine_dispatcher
Browse files Browse the repository at this point in the history
Feature/fga/coroutine dispatcher
  • Loading branch information
ganfra authored Jul 11, 2023
2 parents e308b1d + 4012317 commit 21f9093
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 64 deletions.
3 changes: 0 additions & 3 deletions app/src/main/kotlin/io/element/android/x/di/AppModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.plus
import java.io.File
import java.util.concurrent.Executors

@Module
@ContributesTo(AppScope::class)
Expand Down Expand Up @@ -99,7 +97,6 @@ object AppModule {
io = Dispatchers.IO,
computation = Dispatchers.Default,
main = Dispatchers.Main,
diffUpdateDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ data class CoroutineDispatchers(
val io: CoroutineDispatcher,
val computation: CoroutineDispatcher,
val main: CoroutineDispatcher,
val diffUpdateDispatcher: CoroutineDispatcher,
)
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import io.element.android.libraries.matrix.impl.verification.RustSessionVerifica
import io.element.android.libraries.sessionstorage.api.SessionStore
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
Expand All @@ -73,6 +73,7 @@ import org.matrix.rustcomponents.sdk.CreateRoomParameters as RustCreateRoomParam
import org.matrix.rustcomponents.sdk.RoomPreset as RustRoomPreset
import org.matrix.rustcomponents.sdk.RoomVisibility as RustRoomVisibility

@OptIn(ExperimentalCoroutinesApi::class)
class RustMatrixClient constructor(
private val client: Client,
private val sessionStore: SessionStore,
Expand All @@ -85,13 +86,15 @@ class RustMatrixClient constructor(

override val sessionId: UserId = UserId(client.userId())
private val roomListService = client.roomListServiceWithEncryption()
private val sessionDispatcher = dispatchers.io.limitedParallelism(64)
private val sessionCoroutineScope = appCoroutineScope.childScope(dispatchers.main, "Session-${sessionId}")
private val verificationService = RustSessionVerificationService()
private val syncService = RustSyncService(roomListService, sessionCoroutineScope)
private val pushersService = RustPushersService(
client = client,
dispatchers = dispatchers,
)

private val notificationService = RustNotificationService(client)

private val clientDelegate = object : ClientDelegate {
Expand All @@ -105,7 +108,7 @@ class RustMatrixClient constructor(
RustRoomSummaryDataSource(
roomListService = roomListService,
sessionCoroutineScope = sessionCoroutineScope,
coroutineDispatchers = dispatchers,
dispatcher = sessionDispatcher,
)

override val roomSummaryDataSource: RoomSummaryDataSource
Expand Down Expand Up @@ -150,7 +153,7 @@ class RustMatrixClient constructor(
)
}

private suspend fun pairOfRoom(roomId: RoomId): Pair<RoomListItem, Room>? = withContext(dispatchers.io) {
private suspend fun pairOfRoom(roomId: RoomId): Pair<RoomListItem, Room>? = withContext(sessionDispatcher) {
val cachedRoomListItem = roomListService.roomOrNull(roomId.value)
val fullRoom = cachedRoomListItem?.fullRoom()
if (cachedRoomListItem == null || fullRoom == null) {
Expand All @@ -165,19 +168,19 @@ class RustMatrixClient constructor(
return roomId?.let { getRoom(it) }
}

override suspend fun ignoreUser(userId: UserId): Result<Unit> = withContext(dispatchers.io) {
override suspend fun ignoreUser(userId: UserId): Result<Unit> = withContext(sessionDispatcher) {
runCatching {
client.ignoreUser(userId.value)
}
}

override suspend fun unignoreUser(userId: UserId): Result<Unit> = withContext(dispatchers.io) {
override suspend fun unignoreUser(userId: UserId): Result<Unit> = withContext(sessionDispatcher) {
runCatching {
client.unignoreUser(userId.value)
}
}

override suspend fun createRoom(createRoomParams: CreateRoomParameters): Result<RoomId> = withContext(dispatchers.io) {
override suspend fun createRoom(createRoomParams: CreateRoomParameters): Result<RoomId> = withContext(sessionDispatcher) {
runCatching {
val rustParams = RustCreateRoomParameters(
name = createRoomParams.name,
Expand Down Expand Up @@ -221,14 +224,14 @@ class RustMatrixClient constructor(
return createRoom(createRoomParams)
}

override suspend fun getProfile(userId: UserId): Result<MatrixUser> = withContext(Dispatchers.IO) {
override suspend fun getProfile(userId: UserId): Result<MatrixUser> = withContext(sessionDispatcher) {
runCatching {
client.getProfile(userId.value).let(UserProfileMapper::map)
}
}

override suspend fun searchUsers(searchTerm: String, limit: Long): Result<MatrixSearchUserResults> =
withContext(dispatchers.io) {
withContext(sessionDispatcher) {
runCatching {
client.searchUsers(searchTerm, limit.toULong()).let(UserSearchResultMapper::map)
}
Expand Down Expand Up @@ -260,7 +263,7 @@ class RustMatrixClient constructor(
baseDirectory.deleteSessionDirectory(userID = sessionId.value, deleteCryptoDb = false)
}

override suspend fun logout() = withContext(dispatchers.io) {
override suspend fun logout() = withContext(sessionDispatcher) {
try {
client.logout()
} catch (failure: Throwable) {
Expand All @@ -271,20 +274,20 @@ class RustMatrixClient constructor(
sessionStore.removeSession(sessionId.value)
}

override suspend fun loadUserDisplayName(): Result<String> = withContext(dispatchers.io) {
override suspend fun loadUserDisplayName(): Result<String> = withContext(sessionDispatcher) {
runCatching {
client.displayName()
}
}

override suspend fun loadUserAvatarURLString(): Result<String?> = withContext(dispatchers.io) {
override suspend fun loadUserAvatarURLString(): Result<String?> = withContext(sessionDispatcher) {
runCatching {
client.avatarUrl()
}
}

@OptIn(ExperimentalUnsignedTypes::class)
override suspend fun uploadMedia(mimeType: String, data: ByteArray, progressCallback: ProgressCallback?): Result<String> = withContext(dispatchers.io) {
override suspend fun uploadMedia(mimeType: String, data: ByteArray, progressCallback: ProgressCallback?): Result<String> = withContext(sessionDispatcher) {
runCatching {
client.uploadMedia(mimeType, data.toUByteArray().toList(), progressCallback?.toProgressWatcher())
}
Expand All @@ -305,7 +308,7 @@ class RustMatrixClient constructor(
private suspend fun File.getCacheSize(
userID: String,
includeCryptoDb: Boolean = false,
): Long = withContext(dispatchers.io) {
): Long = withContext(sessionDispatcher) {
// Rust sanitises the user ID replacing invalid characters with an _
val sanitisedUserID = userID.replace(":", "_")
val sessionDirectory = File(this@getCacheSize, sanitisedUserID)
Expand All @@ -327,7 +330,7 @@ class RustMatrixClient constructor(
private suspend fun File.deleteSessionDirectory(
userID: String,
deleteCryptoDb: Boolean = false,
): Boolean = withContext(dispatchers.io) {
): Boolean = withContext(sessionDispatcher) {
// Rust sanitises the user ID replacing invalid characters with an _
val sanitisedUserID = userID.replace(":", "_")
val sessionDirectory = File(this@deleteSessionDirectory, sanitisedUserID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.matrix.api.media.MatrixMediaLoader
import io.element.android.libraries.matrix.api.media.MediaFile
import io.element.android.libraries.matrix.api.media.MediaSource
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.Client
import org.matrix.rustcomponents.sdk.mediaSourceFromUrl
Expand All @@ -29,10 +30,12 @@ import org.matrix.rustcomponents.sdk.MediaSource as RustMediaSource

class RustMediaLoader(
baseCacheDirectory: File,
private val dispatchers: CoroutineDispatchers,
dispatchers: CoroutineDispatchers,
private val innerClient: Client,
) : MatrixMediaLoader {

@OptIn(ExperimentalCoroutinesApi::class)
private val mediaDispatcher = dispatchers.io.limitedParallelism(32)
private val cacheDirectory = File(baseCacheDirectory, "temp/media").apply {
if (!exists()) {
mkdirs()
Expand All @@ -41,7 +44,7 @@ class RustMediaLoader(

@OptIn(ExperimentalUnsignedTypes::class)
override suspend fun loadMediaContent(source: MediaSource): Result<ByteArray> =
withContext(dispatchers.io) {
withContext(mediaDispatcher) {
runCatching {
source.toRustMediaSource().use { source ->
innerClient.getMediaContent(source).toUByteArray().toByteArray()
Expand All @@ -55,7 +58,7 @@ class RustMediaLoader(
width: Long,
height: Long
): Result<ByteArray> =
withContext(dispatchers.io) {
withContext(mediaDispatcher) {
runCatching {
source.toRustMediaSource().use { mediaSource ->
innerClient.getMediaThumbnail(
Expand All @@ -68,7 +71,7 @@ class RustMediaLoader(
}

override suspend fun downloadMediaFile(source: MediaSource, mimeType: String?, body: String?): Result<MediaFile> =
withContext(dispatchers.io) {
withContext(mediaDispatcher) {
runCatching {
source.toRustMediaSource().use { mediaSource ->
val mediaFile = innerClient.getMediaFile(
Expand Down
Loading

0 comments on commit 21f9093

Please sign in to comment.