-
-
Notifications
You must be signed in to change notification settings - Fork 642
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Avoid 429 responses by slowing down sync if needed.
* If only 50% of remaining is left, stall next sync until (reset / remaining) seconds * Bugfix - used registerSyncEvent() instead of triggerSync() - would only work in production with SW active. * Simplify performGuardedJob() using navigator.locks.
- Loading branch information
1 parent
a52b1e2
commit 3c4e698
Showing
7 changed files
with
61 additions
and
150 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,125 +1,13 @@ | ||
import { liveQuery, Table } from 'dexie'; | ||
import { MINUTES, SECONDS } from '../helpers/date-constants'; | ||
import { DexieCloudDB } from '../db/DexieCloudDB'; | ||
import { GuardedJob } from '../db/entities/GuardedJob'; | ||
import { myId } from './myId'; | ||
import { from } from 'rxjs'; | ||
import { filter, timeout } from 'rxjs/operators'; | ||
import { DexieCloudDB } from "../db/DexieCloudDB"; | ||
|
||
const GUARDED_JOB_HEARTBEAT = 1 * SECONDS; | ||
const GUARDED_JOB_TIMEOUT = 1 * MINUTES; | ||
|
||
export async function performGuardedJob( | ||
export function performGuardedJob<T>( | ||
db: DexieCloudDB, | ||
jobName: string, | ||
jobsTableName: string, | ||
job: () => Promise<any>, | ||
{ awaitRemoteJob }: { awaitRemoteJob?: boolean } = {} | ||
): Promise<void> { | ||
// Start working. | ||
// | ||
// Check if someone else is working on this already. | ||
// | ||
const jobsTable = db.table(jobsTableName) as Table<GuardedJob, string>; | ||
|
||
async function aquireLock() { | ||
const gotTheLock = await db.transaction('rw!', jobsTableName, async () => { | ||
const currentWork = await jobsTable.get(jobName); | ||
if (!currentWork) { | ||
// No one else is working. Let's record that we are. | ||
await jobsTable.add( | ||
{ | ||
nodeId: myId, | ||
started: new Date(), | ||
heartbeat: new Date() | ||
}, | ||
jobName | ||
); | ||
return true; | ||
} else if ( | ||
currentWork.heartbeat.getTime() < | ||
Date.now() - GUARDED_JOB_TIMEOUT | ||
) { | ||
console.warn( | ||
`Latest ${jobName} worker seem to have died.\n`, | ||
`The dead job started:`, | ||
currentWork.started, | ||
`\n`, | ||
`Last heart beat was:`, | ||
currentWork.heartbeat, | ||
'\n', | ||
`We're now taking over!` | ||
); | ||
// Now, take over! | ||
await jobsTable.put( | ||
{ | ||
nodeId: myId, | ||
started: new Date(), | ||
heartbeat: new Date() | ||
}, | ||
jobName | ||
); | ||
return true; | ||
} | ||
return false; | ||
}); | ||
|
||
if (gotTheLock) return true; | ||
|
||
// Someone else took the job. | ||
if (awaitRemoteJob) { | ||
try { | ||
const jobDoneObservable = from( | ||
liveQuery(() => jobsTable.get(jobName)) | ||
).pipe( | ||
timeout(GUARDED_JOB_TIMEOUT), | ||
filter((job) => !job) | ||
); // Wait til job is not there anymore. | ||
await jobDoneObservable.toPromise(); | ||
return false; | ||
} catch (err) { | ||
if (err.name !== 'TimeoutError') { | ||
throw err; | ||
} | ||
// Timeout stopped us! Try aquire the lock now. | ||
// It will likely succeed this time unless | ||
// another client took it. | ||
return await aquireLock(); | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
if (await aquireLock()) { | ||
// We own the lock entry and can do our job undisturbed. | ||
// We're not within a transaction, but these type of locks | ||
// spans over transactions. | ||
|
||
// Start our heart beat during the job. | ||
// Use setInterval to make sure we are updating heartbeat even during long-lived fetch calls. | ||
const heartbeat = setInterval(() => { | ||
jobsTable.update( | ||
jobName, | ||
(job) => { | ||
if (job.nodeId === myId) { | ||
job.heartbeat = new Date(); | ||
} | ||
} | ||
); | ||
}, GUARDED_JOB_HEARTBEAT); | ||
|
||
try { | ||
return await job(); | ||
} finally { | ||
// Stop heartbeat | ||
clearInterval(heartbeat); | ||
// Remove the persisted job state: | ||
await db.transaction('rw!', jobsTableName, async () => { | ||
const currentWork = await jobsTable.get(jobName); | ||
if (currentWork && currentWork.nodeId === myId) { | ||
await jobsTable.delete(jobName); | ||
} | ||
}); | ||
} | ||
job: () => Promise<T> | ||
): Promise<T> { | ||
if (typeof navigator === 'undefined' || !navigator.locks) { | ||
// No support for guarding jobs. IE11, node.js, etc. | ||
return job(); | ||
} | ||
return navigator.locks.request(db.name + '|' + jobName, () => job()); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
import { DexieCloudDB } from '../db/DexieCloudDB'; | ||
|
||
// If we get Ratelimit-Limit and Ratelimit-Remaining where Ratelimit-Remaining is below | ||
// (Ratelimit-Limit / 2), we should delay the next sync by (Ratelimit-Reset / Ratelimit-Remaining) | ||
// seconds (given that there is a Ratelimit-Reset header). | ||
|
||
let syncRatelimitDelays = new WeakMap<DexieCloudDB, Date>(); | ||
|
||
export async function checkSyncRateLimitDelay(db: DexieCloudDB) { | ||
const delatMilliseconds = (syncRatelimitDelays.get(db)?.getTime() ?? 0) - Date.now(); | ||
if (delatMilliseconds > 0) { | ||
console.debug(`Stalling sync request ${delatMilliseconds} ms to spare ratelimits`); | ||
await new Promise(resolve => setTimeout(resolve, delatMilliseconds)); | ||
} | ||
} | ||
|
||
export function updateSyncRateLimitDelays(db: DexieCloudDB, res: Response) { | ||
const limit = res.headers.get('Ratelimit-Limit'); | ||
const remaining = res.headers.get('Ratelimit-Remaining'); | ||
const reset = res.headers.get('Ratelimit-Reset'); | ||
if (limit && remaining && reset) { | ||
const limitNum = Number(limit); | ||
const remainingNum = Math.max(0, Number(remaining)); | ||
const willResetInSeconds = Number(reset); | ||
if (remainingNum < limitNum / 2) { | ||
const delay = Math.ceil(willResetInSeconds / (remainingNum + 1)); | ||
syncRatelimitDelays.set(db, new Date(Date.now() + delay * 1000)); | ||
console.debug(`Sync ratelimit delay set to ${delay} seconds`); | ||
} else { | ||
syncRatelimitDelays.delete(db); | ||
console.debug(`Sync ratelimit delay cleared`); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters