diff --git a/lib/monitor.js b/lib/monitor.js index 5c4e948..bb1fdc3 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -1,4 +1,6 @@ const ReadyResource = require('ready-resource') +const debounce = require('debounceify') +const safetyCatch = require('safety-catch') module.exports = class Monitor extends ReadyResource { constructor (drive, opts = {}) { @@ -7,20 +9,24 @@ module.exports = class Monitor extends ReadyResource { this.blobs = null this.name = opts.name this.entry = opts.entry + this.isDownload = opts.download === true - this._boundOnAppend = this._onAppend.bind(this) - this._boundUpdateStats = this._updateStats.bind(this) + this._boundOnAppend = debounce(this._onAppend.bind(this)) + this._boundOnUpload = this._onUpload.bind(this) + this._boundOnDownload = this._onDownload.bind(this) this.drive.on('close', () => this.close()) // Updated on each upload/download event this.stats = { startTime: 0, percentage: 0, + peersCount: 0, speed: null, blocks: null, - bytes: null, - totalBytes: null, - totalBlocks: null + totalBytes: null, // local + bytes loaded during monitoring + monitoringBytes: null, // bytes loaded during monitoring + targetBytes: null, + targetBlocks: null } } @@ -29,16 +35,26 @@ module.exports = class Monitor extends ReadyResource { this.blobs = await this.drive.getBlobs() this.entry = await this.drive.entry(this.name) if (this.entry) this._setEntryInfo() + + // load the local state for the file. + // upload is a bit more tricky + if (this.entry && this.isDownload) { + await this._loadLocalState().catch(safetyCatch).finally(() => { + this._calculateStats() + this.emit('update') + }) + } + // Handlers this.blobs.core.on('append', this._boundOnAppend) - this.blobs.core.on('upload', this._boundUpdateStats) - this.blobs.core.on('download', this._boundUpdateStats) + this.blobs.core.on('upload', this._boundOnUpload) + this.blobs.core.on('download', this._boundOnDownload) } async _close () { this.blobs.core.off('append', this._boundOnAppend) - this.blobs.core.off('upload', this._boundUpdateStats) - this.blobs.core.off('download', this._boundUpdateStats) + this.blobs.core.off('upload', this._boundOnUpload) + this.blobs.core.off('download', this._boundOnDownload) } async _onAppend () { @@ -49,25 +65,50 @@ module.exports = class Monitor extends ReadyResource { } _setEntryInfo () { - if (this.stats.totalBytes || this.stats.totalBlocks) return - this.stats.totalBytes = this.entry.value.blob.byteLength - this.stats.totalBlocks = this.entry.value.blob.blockLength + if (this.stats.targetBytes || this.stats.targetBlocks) return + this.stats.targetBytes = this.entry.value.blob.byteLength + this.stats.targetBlocks = this.entry.value.blob.blockLength + this.stats.blockOffset = this.entry.value.blob.blockOffset + this.stats.byteOffset = this.entry.value.blob.byteOffset + } + + async _onUpload (index, bytes, from) { + this._updateStats(index, bytes, from) + } + + async _onDownload (index, bytes, from) { + this._updateStats(index, bytes, from) + } + + async _loadLocalState () { + // TODO: think this will only work if its linear + const stream = this.blobs.createReadStream(this.entry.value.blob, { wait: false }) + for await (const bytes of stream) { + this.stats.totalBytes += bytes.length + this.stats.blocks++ + } } - _updateStats (index, bytes) { + _updateStats (index, bytes, from) { if (!this.entry || this.closing) return if (!isWithinRange(index, this.entry)) return - if (!this.stats.startTime) this.stats.startTime = Date.now() + this.stats.peersCount = from.replicator.peers.length this.stats.blocks++ - this.stats.bytes += bytes - this.stats.percentage = Number(((this.stats.bytes / this.stats.totalBytes) * 100).toFixed(2)) + this.stats.monitoringBytes += bytes + this.stats.totalBytes += bytes + + this._calculateStats() + this.emit('update') + } + + _calculateStats () { + if (!this.stats.startTime) this.stats.startTime = Date.now() + this.stats.percentage = Number(((this.stats.totalBytes / this.stats.targetBytes) * 100).toFixed(2)) const timeElapsed = (Date.now() - this.stats.startTime) / 1000 if (timeElapsed > 0) { - this.stats.speed = Math.floor(this.stats.bytes / timeElapsed) // Speed in bytes/sec + this.stats.speed = Math.floor(this.stats.monitoringBytes / timeElapsed) // Speed in bytes/sec } - - this.emit('update') } } diff --git a/package.json b/package.json index 2791626..193e241 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ }, "homepage": "https://github.com/holepunchto/hyperdrive#readme", "dependencies": { + "debounceify": "^1.1.0", "hyperbee": "^2.11.1", "hyperblobs": "^2.3.0", "hypercore": "^10.33.0", diff --git a/test.js b/test.js index 202ff15..885ab79 100644 --- a/test.js +++ b/test.js @@ -1586,9 +1586,9 @@ test('upload/download can be monitored', async (t) => { const expectedBytes = [bytes, 65536] monitor.on('update', () => { t.is(monitor.stats.blocks, expectedBlocks.pop()) - t.is(monitor.stats.bytes, expectedBytes.pop()) - t.is(monitor.stats.totalBlocks, 2) - t.is(monitor.stats.totalBytes, bytes) + t.is(monitor.stats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.stats.targetBlocks, 2) + t.is(monitor.stats.targetBytes, bytes) }) } @@ -1602,15 +1602,47 @@ test('upload/download can be monitored', async (t) => { const expectedBytes = [bytes, 65536] monitor.on('update', () => { t.is(monitor.stats.blocks, expectedBlocks.pop()) - t.is(monitor.stats.bytes, expectedBytes.pop()) - t.is(monitor.stats.totalBlocks, 2) - t.is(monitor.stats.totalBytes, bytes) + t.is(monitor.stats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.stats.targetBlocks, 2) + t.is(monitor.stats.targetBytes, bytes) }) } await mirror.drive.get(file) }) +test('monitor loads the local state on download', async (t) => { + t.plan(3) + const { corestore, drive, swarm, mirror } = await testenv(t.teardown) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + const file = '/example.md' + const bytes = 1234 + const buffer = Buffer.alloc(bytes, '0') + await drive.put(file, buffer) + + observe() + async function observe () { + for await (const _ of mirror.drive.watch()) { /* eslint-disable-line */ + await mirror.drive.get(file) + // Start monitoring after we've downloaded the file + const monitor = mirror.drive.monitor(file, { download: true }) + monitor.on('update', () => { + t.is(monitor.stats.percentage, 100) + t.is(monitor.stats.totalBytes, bytes) + t.is(monitor.stats.targetBytes, bytes) + }) + await monitor.ready() + } + } +}) + async function testenv (teardown) { const corestore = new Corestore(RAM) await corestore.ready()