diff --git a/lib/monitor.js b/lib/monitor.js index 9e94729..7479ac6 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -1,6 +1,5 @@ const ReadyResource = require('ready-resource') const debounce = require('debounceify') -const safetyCatch = require('safety-catch') module.exports = class Monitor extends ReadyResource { constructor (drive, opts = {}) { @@ -9,15 +8,13 @@ module.exports = class Monitor extends ReadyResource { this.blobs = null this.name = opts.name this.entry = opts.entry - this.isDownload = opts.download === true this._boundOnAppend = debounce(this._onAppend.bind(this)) this._boundOnUpload = this._onUpload.bind(this) this._boundOnDownload = this._onDownload.bind(this) this.drive.on('close', () => this.close()) - // Updated on each upload/download event - this.stats = { + const stats = { startTime: 0, percentage: 0, peersCount: 0, @@ -28,6 +25,10 @@ module.exports = class Monitor extends ReadyResource { targetBytes: null, targetBlocks: null } + + // Updated on each upload/download event + this.uploadStats = { ...stats } + this.downloadStats = { ...stats } } async _open () { @@ -36,15 +37,6 @@ module.exports = class Monitor extends ReadyResource { this.entry = await this.drive.entry(this.name) if (this.entry) this._setEntryInfo() - // load the local state of the file. - // upload is a bit more tricky... - if (this.entry && this.isDownload) { - await this._loadLocalState().catch(safetyCatch).finally(() => { - this._calculateStats() - this.emit('update') - }) - } - // Handlers this.blobs.core.on('append', this._boundOnAppend) this.blobs.core.on('upload', this._boundOnUpload) @@ -65,53 +57,44 @@ module.exports = class Monitor extends ReadyResource { } _setEntryInfo () { - if (this.stats.targetBytes || this.stats.targetBlocks) return - this.stats.targetBytes = this.entry.value.blob.byteLength - this.stats.targetBlocks = this.entry.value.blob.blockLength - this.stats.blockOffset = this.entry.value.blob.blockOffset - this.stats.byteOffset = this.entry.value.blob.byteOffset + if (!this.downloadStats.targetBytes || !this.downloadStats.targetBlocks) { + this.downloadStats.targetBytes = this.entry.value.blob.byteLength + this.downloadStats.targetBlocks = this.entry.value.blob.blockLength + } + + if (!this.uploadStats.targetBytes || !this.uploadStats.targetBlocks) { + this.uploadStats.targetBytes = this.entry.value.blob.byteLength + this.uploadStats.targetBlocks = this.entry.value.blob.blockLength + } } async _onUpload (index, bytes, from) { - this._updateStats(index, bytes, from) + this._updateStats(this.uploadStats, index, bytes, from) } async _onDownload (index, bytes, from) { - this._updateStats(index, bytes, from) - } - - async _loadLocalState () { - // @TODO: There's a better way to do this? - let blockIdx = this.stats.blockOffset - while (blockIdx <= this.stats.targetBlocks) { - if (await this.blobs.core.core.bitfield.get(blockIdx)) { - const bytes = await this.blobs.core.core.blocks.get(blockIdx) - this.stats.totalBytes += bytes.length - this.stats.blocks++ - } - blockIdx++ - } + this._updateStats(this.downloadStats, index, bytes, from) } - _updateStats (index, bytes, from) { + _updateStats (stats, index, bytes, from) { if (!this.entry || this.closing) return if (!isWithinRange(index, this.entry)) return - this.stats.peersCount = from.replicator.peers.length - this.stats.blocks++ - this.stats.monitoringBytes += bytes - this.stats.totalBytes += bytes + stats.peersCount = from.replicator.peers.length + stats.blocks++ + stats.monitoringBytes += bytes + stats.totalBytes += bytes - this._calculateStats() + this._calculateStats(stats) this.emit('update') } - _calculateStats () { - if (!this.stats.startTime) this.stats.startTime = Date.now() - this.stats.percentage = Number(((this.stats.totalBytes / this.stats.targetBytes) * 100).toFixed(2)) - const timeElapsed = (Date.now() - this.stats.startTime) / 1000 + _calculateStats (stats) { + if (!stats.startTime) stats.startTime = Date.now() + stats.percentage = Number(((stats.totalBytes / stats.targetBytes) * 100).toFixed(2)) + const timeElapsed = (Date.now() - stats.startTime) / 1000 if (timeElapsed > 0) { - this.stats.speed = Math.floor(this.stats.monitoringBytes / timeElapsed) // Speed in bytes/sec + stats.speed = Math.floor(stats.monitoringBytes / timeElapsed) // bytes/sec } } } diff --git a/test.js b/test.js index 885ab79..01025a6 100644 --- a/test.js +++ b/test.js @@ -1563,7 +1563,7 @@ test('drive.list (recursive false) ignore', async (t) => { }) test('upload/download can be monitored', async (t) => { - t.plan(17) + t.plan(21) const { corestore, drive, swarm, mirror } = await testenv(t.teardown) swarm.on('connection', (conn) => corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) @@ -1585,10 +1585,11 @@ test('upload/download can be monitored', async (t) => { const expectedBlocks = [2, 1] const expectedBytes = [bytes, 65536] monitor.on('update', () => { - t.is(monitor.stats.blocks, expectedBlocks.pop()) - t.is(monitor.stats.monitoringBytes, expectedBytes.pop()) - t.is(monitor.stats.targetBlocks, 2) - t.is(monitor.stats.targetBytes, bytes) + t.is(monitor.uploadStats.blocks, expectedBlocks.pop()) + t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.uploadStats.targetBlocks, 2) + t.is(monitor.uploadStats.targetBytes, bytes) + t.absent(monitor.downloadStats.blocks) }) } @@ -1601,48 +1602,17 @@ test('upload/download can be monitored', async (t) => { const expectedBlocks = [2, 1] const expectedBytes = [bytes, 65536] monitor.on('update', () => { - t.is(monitor.stats.blocks, expectedBlocks.pop()) - t.is(monitor.stats.monitoringBytes, expectedBytes.pop()) - t.is(monitor.stats.targetBlocks, 2) - t.is(monitor.stats.targetBytes, bytes) + t.is(monitor.downloadStats.blocks, expectedBlocks.pop()) + t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.downloadStats.targetBlocks, 2) + t.is(monitor.downloadStats.targetBytes, bytes) + t.absent(monitor.uploadStats.blocks) }) } await mirror.drive.get(file) }) -test('monitor loads the local state on download', async (t) => { - t.plan(3) - const { corestore, drive, swarm, mirror } = await testenv(t.teardown) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const file = '/example.md' - const bytes = 1234 - const buffer = Buffer.alloc(bytes, '0') - await drive.put(file, buffer) - - observe() - async function observe () { - for await (const _ of mirror.drive.watch()) { /* eslint-disable-line */ - await mirror.drive.get(file) - // Start monitoring after we've downloaded the file - const monitor = mirror.drive.monitor(file, { download: true }) - monitor.on('update', () => { - t.is(monitor.stats.percentage, 100) - t.is(monitor.stats.totalBytes, bytes) - t.is(monitor.stats.targetBytes, bytes) - }) - await monitor.ready() - } - } -}) - async function testenv (teardown) { const corestore = new Corestore(RAM) await corestore.ready()