Skip to content

Commit

Permalink
Updated Y sync flow:
Browse files Browse the repository at this point in the history
* Update receivedUntil after getting server changes over syncWithServer() but not when getting chaotic server changes on a particular document over websocket.
* Before syncWithServer, compute an Y.js state vector from the server changes that has arrived after receivedFrom so that the server can avoid sending the same changes back.
* Update YSyncState after sync with the correct unsentFrom and receivedUntil values.
  • Loading branch information
David Fahlander committed Sep 9, 2024
1 parent 50cd0a2 commit 0591312
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 60 deletions.
22 changes: 14 additions & 8 deletions addons/dexie-cloud/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import { updateBaseRevs } from './updateBaseRevs';
import { getLatestRevisionsPerTable } from './getLatestRevisionsPerTable';
import { applyServerChanges } from './applyServerChanges';
import { checkSyncRateLimitDelay } from './ratelimit';
import { listYClientMessages } from '../yjs/listYClientMessages';
import { listYClientMessagesAndStateVector } from '../yjs/listYClientMessagesAndStateVector';
import { applyYServerMessages } from '../yjs/applyYMessages';
import { updateYSyncStates } from '../yjs/updateYSyncStates';

export const CURRENT_SYNC_WORKER = 'currentSyncWorker';

Expand Down Expand Up @@ -149,14 +150,14 @@ async function _sync(
//
// List changes to sync
//
const [clientChangeSet, syncState, baseRevs, yMessages] = await db.transaction(
const [clientChangeSet, syncState, baseRevs, {yMessages, lastUpdateIds}] = await db.transaction(
'r',
db.tables,
async () => {
const syncState = await db.getPersistedSyncState();
const baseRevs = await db.$baseRevs.toArray();
let clientChanges = await listClientChanges(mutationTables, db);
const yMessages = await listYClientMessages(db);
const yResults = await listYClientMessagesAndStateVector(db);
throwIfCancelled(cancelToken);
if (doSyncify) {
const alreadySyncedRealms = [
Expand All @@ -171,15 +172,15 @@ async function _sync(
);
throwIfCancelled(cancelToken);
clientChanges = clientChanges.concat(syncificationInserts);
return [clientChanges, syncState, baseRevs, yMessages];
return [clientChanges, syncState, baseRevs, yResults];
}
return [clientChanges, syncState, baseRevs, yMessages];
return [clientChanges, syncState, baseRevs, yResults];
}
);

const pushSyncIsNeeded = clientChangeSet.some((set) =>
set.muts.some((mut) => mut.keys.length > 0)
) || yMessages.length > 0;
) || yMessages.some(m => m.type === 'u-c');
if (justCheckIfNeeded) {
console.debug('Sync is needed:', pushSyncIsNeeded);
return pushSyncIsNeeded;
Expand Down Expand Up @@ -335,10 +336,15 @@ async function _sync(
//
// apply yMessages
//
await applyYServerMessages(res.yMessages, db);
const receivedUntils = await applyYServerMessages(res.yMessages, db);

//
// Update syncState
// update Y SyncStates
//
await updateYSyncStates(lastUpdateIds, receivedUntils, db);

//
// Update regular syncState
//
db.$syncState.put(newSyncState, 'syncState');

Expand Down
9 changes: 9 additions & 0 deletions addons/dexie-cloud/src/yjs/Y.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import type * as Y from 'yjs';
import type { DexieCloudDB } from '../db/DexieCloudDB';

export function $Y(db: DexieCloudDB): typeof Y {
const $Y = db.dx._options.Y;
if (!$Y) throw new Error('Y library not supplied to Dexie constructor');
return $Y as typeof Y;
}

3 changes: 3 additions & 0 deletions addons/dexie-cloud/src/yjs/YTable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { EntityTable, YUpdateRow } from "dexie";

export type YTable = EntityTable<YUpdateRow, "i">;
24 changes: 11 additions & 13 deletions addons/dexie-cloud/src/yjs/applyYMessages.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { InsertType, YSyncer, YUpdateRow } from 'dexie';
import { InsertType, YSyncState, YUpdateRow } from 'dexie';
import { DexieCloudDB } from '../db/DexieCloudDB';
import { YServerMessage, YUpdateFromClientAck } from 'dexie-cloud-common/src/YMessage';
import { DEXIE_CLOUD_SYNCER_ID } from '../sync/DEXIE_CLOUD_SYNCER_ID';
import { getUpdatesTable } from './getUpdatesTable';

export async function applyYServerMessages(
yMessages: YServerMessage[],
db: DexieCloudDB
): Promise<void> {
): Promise<{[yTable: string]: number}> {
const result: {[yTable: string]: number} = {};
for (const m of yMessages) {
switch (m.type) {
case 'u-s': {
const utbl = getUpdatesTable(db, m.table, m.prop);
await db.table(utbl).add({
result[utbl.name] = await utbl.add({
k: m.k,
u: m.u,
} satisfies InsertType<YUpdateRow, 'i'>);
Expand All @@ -20,13 +22,13 @@ export async function applyYServerMessages(
case 'u-ack': {
const utbl = getUpdatesTable(db, m.table, m.prop);
await db.transaction('rw', utbl, async (tx) => {
let syncer = (await tx.table(utbl).get(DEXIE_CLOUD_SYNCER_ID)) as
| YSyncer
let syncer = (await tx.table(utbl.name).get(DEXIE_CLOUD_SYNCER_ID)) as
| YSyncState
| undefined;
await tx.table(utbl).put({
await tx.table(utbl.name).put({
...(syncer || { i: DEXIE_CLOUD_SYNCER_ID }),
unsentFrom: Math.max(syncer?.unsentFrom || 1, m.i + 1),
} as YSyncer);
} as YSyncState);
});
break;
}
Expand All @@ -39,15 +41,11 @@ export async function applyYServerMessages(
// See my question in https://discuss.yjs.dev/t/generate-an-inverse-update/2765
console.debug(`Y update rejected. Deleting it.`);
const utbl = getUpdatesTable(db, m.table, m.prop);
await db.table(utbl).delete(m.i);
await utbl.delete(m.i);
break;
}
}
}
}
function getUpdatesTable(db: DexieCloudDB, table: string, ydocProp: string) {
const utbl = db.table(table)?.schema.yProps?.find(p => p.prop === ydocProp)?.updatesTable;
if (!utbl) throw new Error(`No updatesTable found for ${table}.${ydocProp}`);
return utbl;
return result;
}

8 changes: 8 additions & 0 deletions addons/dexie-cloud/src/yjs/getUpdatesTable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { DexieCloudDB } from "../db/DexieCloudDB";
import { YTable } from "./YTable";

export function getUpdatesTable(db: DexieCloudDB, table: string, ydocProp: string): YTable {
const utbl = db.table(table)?.schema.yProps?.find(p => p.prop === ydocProp)?.updatesTable;
if (!utbl) throw new Error(`No updatesTable found for ${table}.${ydocProp}`);
return db.table(utbl);
}
4 changes: 2 additions & 2 deletions addons/dexie-cloud/src/yjs/listUpdatesSince.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { Table, YUpdateRow } from 'dexie';

export function listUpdatesSince(yTable: Table, unsentFrom: number): Promise<YUpdateRow[]> {
export function listUpdatesSince(yTable: Table, sinceIncluding: number): Promise<YUpdateRow[]> {
return yTable
.where('i')
.between(unsentFrom, Infinity, true)
.between(sinceIncluding, Infinity, true)
.toArray();
}
36 changes: 0 additions & 36 deletions addons/dexie-cloud/src/yjs/listYClientMessages.ts

This file was deleted.

112 changes: 112 additions & 0 deletions addons/dexie-cloud/src/yjs/listYClientMessagesAndStateVector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import type { YSyncState, YUpdateRow } from 'dexie';
import type { YClientMessage } from 'dexie-cloud-common';
import { DexieCloudDB } from '../db/DexieCloudDB';
import { DEXIE_CLOUD_SYNCER_ID } from '../sync/DEXIE_CLOUD_SYNCER_ID';
import { listUpdatesSince } from './listUpdatesSince';
import { $Y } from './Y';

/** Queries the local database for YMessages to send to server.
*
* There are 2 messages that this function can provide:
* YUpdateFromClientRequest ( for local updates )
* YStateVector ( for state vector of foreign updates so that server can reduce the number of udpates to send back )
*
* Notice that we do not do a step 1 sync phase here to get a state vector from the server. Reason we can avoid
* the 2-step sync is that we are client-server and not client-client here and we keep track of the client changes
* sent to server by letting server acknowledge them. There is always a chance that some client update has already
* been sent and that the client failed to receive the ack. However, if this happens it does not matter - the change
* would be sent again and Yjs handles duplicate changes anyway. And it's rare so we earn the cost of roundtrips by
* avoiding the step1 sync and instead keep track of this in the `unsentFrom` property of the SyncState.
*
* @param db
* @returns
*/
export async function listYClientMessagesAndStateVector(
db: DexieCloudDB
): Promise<{yMessages: YClientMessage[], lastUpdateIds: {[yTable: string]: number}}> {
const result: YClientMessage[] = [];
const lastUpdateIds: {[yTable: string]: number} = {};
for (const table of db.tables) {
if (table.schema.yProps && db.cloud.schema?.[table.name].markedForSync) {
for (const yProp of table.schema.yProps) {
const Y = $Y(db); // This is how we retrieve the user-provided Y library
const yTable = db.table(yProp.updatesTable); // the updates-table for this combo of table+propName
const syncState = (await yTable.get(DEXIE_CLOUD_SYNCER_ID)) as
| YSyncState
| undefined;

// unsentFrom = the `i` value of updates that aren't yet sent to server (or at least not acked by the server yet)
const unsentFrom = syncState?.unsentFrom || 1;
// receivedUntil = the `i` value of updates that both we and the server knows we already have (we know it by the outcome from last syncWithServer() because server keep track of its revision numbers
const receivedUntil = syncState?.receivedUntil || 0;
// Compute the least value of these two (but since receivedUntil is inclusive we need to add +1 to it)
const unsyncedFrom = Math.min(unsentFrom, receivedUntil + 1);
// Query all these updates for all docs of this table+prop combination
const updates = await listUpdatesSince(yTable, unsyncedFrom);
if (updates.length > 0) lastUpdateIds[yTable.name] = updates[updates.length -1].i;

// Now sort them by document and whether they are local or not + ignore local updates already sent:
const perDoc: {
[docKey: string]: {
i: number;
k: any;
isLocal: boolean;
u: Uint8Array[];
};
} = {};
for (const update of updates) {
// Sort updates into buckets of the doc primary key + the flag (whether it's local or foreign)
const isLocal = ((update.f || 0) & 0x01) === 0x01;
if (isLocal && update.i < unsentFrom) continue; // This local update has already been sent and acked.
const docKey = JSON.stringify(update.k) + '/' + isLocal;
let entry = perDoc[docKey];
if (!entry) {
perDoc[docKey] = entry = {
i: update.i,
k: update.k,
isLocal,
u: [],
};
entry.u.push(update.u);
} else {
entry.u.push(update.u);
entry.i = Math.max(update.i, entry.i);
}
}

// Now, go through all these and:
// * For local updates, compute a merged update per document.
// * For foreign updates, compute a state vector to pass to server, so that server can
// avoid re-sending updates that we already have (they might have been sent of websocket
// and when that happens, we do not mark them in any way nor do we update receivedUntil -
// we only update receivedUntil after a "full sync" (syncWithServer()))
for (const { k, isLocal, u, i } of Object.values(perDoc)) {
const mergedUpdate = u.length === 1 ? u[0] : Y.mergeUpdatesV2(u);
if (isLocal) {
result.push({
type: 'u-c',
table: table.name,
prop: yProp.prop,
k,
u: mergedUpdate,
i,
});
} else {
const stateVector = Y.encodeStateVectorFromUpdateV2(mergedUpdate);
result.push({
type: 'sv',
table: table.name,
prop: yProp.prop,
k,
sv: stateVector,
});
}
}
}
}
}
return {
yMessages: result,
lastUpdateIds
};
}
63 changes: 63 additions & 0 deletions addons/dexie-cloud/src/yjs/updateYSyncStates.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { DexieCloudDB } from '../db/DexieCloudDB';
import { DEXIE_CLOUD_SYNCER_ID } from '../sync/DEXIE_CLOUD_SYNCER_ID';
import { YSyncState } from 'dexie';

export async function updateYSyncStates(
lastUpdateIdsBeforeSync: { [yTable: string]: number },
receivedUntilsAfterSync: { [yTable: string]: number },
db: DexieCloudDB
) {
// We want to update unsentFrom for each yTable to the value specified in first argument
// because we got those values before we synced with server and here we are back from server
// that has successfully received all those messages - no matter if the last update was a client or server update,
// we can safely store unsentFrom to a value of the last update + 1 here.
// We also want to update receivedUntil for each yTable to the value specified in the second argument,
// because that contains the highest resulted id of each update from server after storing it.
// We could do these two tasks separately, but that would require two update calls on the same YSyncState, so
// to optimize the dexie calls, we merge these two maps into a single one so we can do a single update request
// per yTable.
const mergedSpec: {
[yTable: string]: { unsentFrom?: number; receivedUntil?: number };
} = {};
for (const [yTable, lastUpdateId] of Object.entries(
lastUpdateIdsBeforeSync
)) {
mergedSpec[yTable] ??= {};
mergedSpec[yTable].unsentFrom = lastUpdateId + 1;
}
for (const [yTable, lastUpdateId] of Object.entries(receivedUntilsAfterSync)) {
mergedSpec[yTable] ??= {};
mergedSpec[yTable].receivedUntil = lastUpdateId;
}

// Now go through the merged map and update YSyncStates accordingly:
for (const [yTable, { unsentFrom, receivedUntil }] of Object.entries(
mergedSpec
)) {
// We're already in a transaction, but for the sake of
// code readability and correctness, let's launch an atomic sub transaction:
await db.transaction('rw', yTable, async () => {
const state: YSyncState | undefined = await db.table(yTable).get(
DEXIE_CLOUD_SYNCER_ID
);
if (!state) {
await db.table(yTable).add({
i: DEXIE_CLOUD_SYNCER_ID,
unsentFrom: unsentFrom || 1,
receivedUntil: receivedUntil || 0
});
} else {
if (unsentFrom) {
state.unsentFrom = Math.max(unsentFrom, state.unsentFrom || 1);
}
if (receivedUntil) {
state.receivedUntil = Math.max(
receivedUntil,
state.receivedUntil || 0
);
}
await db.table(yTable).put(state);
}
});
}
}
10 changes: 9 additions & 1 deletion libs/dexie-cloud-common/src/YMessage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

export type YMessage = YClientMessage | YServerMessage;
export type YClientMessage = YUpdateFromClientRequest | YAwarenessUpdate; // | YDocumentClosed;
export type YClientMessage = YUpdateFromClientRequest | YStateVector | YAwarenessUpdate; // | YDocumentClosed;
export type YServerMessage = YUpdateFromClientAck | YUpdateFromClientReject | YUpdateFromServerMessage | YAwarenessUpdate;

export interface YUpdateFromClientRequest {
Expand All @@ -12,6 +12,14 @@ export interface YUpdateFromClientRequest {
i: number;
}

export interface YStateVector {
type: 'sv';
table: string;
prop: string;
k: any;
sv: Uint8Array;
}

export interface YUpdateFromClientAck {
type: 'u-ack';
table: string;
Expand Down

0 comments on commit 0591312

Please sign in to comment.