Skip to content

Commit

Permalink
Skip bitfield for now
Browse files Browse the repository at this point in the history
  • Loading branch information
MKPLKN committed Sep 3, 2024
1 parent 5e17e15 commit 9269748
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 85 deletions.
71 changes: 27 additions & 44 deletions lib/monitor.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const ReadyResource = require('ready-resource')
const debounce = require('debounceify')
const safetyCatch = require('safety-catch')

module.exports = class Monitor extends ReadyResource {
constructor (drive, opts = {}) {
Expand All @@ -9,15 +8,13 @@ module.exports = class Monitor extends ReadyResource {
this.blobs = null
this.name = opts.name
this.entry = opts.entry
this.isDownload = opts.download === true

this._boundOnAppend = debounce(this._onAppend.bind(this))
this._boundOnUpload = this._onUpload.bind(this)
this._boundOnDownload = this._onDownload.bind(this)
this.drive.on('close', () => this.close())

// Updated on each upload/download event
this.stats = {
const stats = {
startTime: 0,
percentage: 0,
peersCount: 0,
Expand All @@ -28,6 +25,10 @@ module.exports = class Monitor extends ReadyResource {
targetBytes: null,
targetBlocks: null
}

// Updated on each upload/download event
this.uploadStats = { ...stats }
this.downloadStats = { ...stats }
}

async _open () {
Expand All @@ -36,15 +37,6 @@ module.exports = class Monitor extends ReadyResource {
this.entry = await this.drive.entry(this.name)
if (this.entry) this._setEntryInfo()

// load the local state of the file.
// upload is a bit more tricky...
if (this.entry && this.isDownload) {
await this._loadLocalState().catch(safetyCatch).finally(() => {
this._calculateStats()
this.emit('update')
})
}

// Handlers
this.blobs.core.on('append', this._boundOnAppend)
this.blobs.core.on('upload', this._boundOnUpload)
Expand All @@ -65,53 +57,44 @@ module.exports = class Monitor extends ReadyResource {
}

_setEntryInfo () {
if (this.stats.targetBytes || this.stats.targetBlocks) return
this.stats.targetBytes = this.entry.value.blob.byteLength
this.stats.targetBlocks = this.entry.value.blob.blockLength
this.stats.blockOffset = this.entry.value.blob.blockOffset
this.stats.byteOffset = this.entry.value.blob.byteOffset
if (!this.downloadStats.targetBytes || !this.downloadStats.targetBlocks) {
this.downloadStats.targetBytes = this.entry.value.blob.byteLength
this.downloadStats.targetBlocks = this.entry.value.blob.blockLength
}

if (!this.uploadStats.targetBytes || !this.uploadStats.targetBlocks) {
this.uploadStats.targetBytes = this.entry.value.blob.byteLength
this.uploadStats.targetBlocks = this.entry.value.blob.blockLength
}
}

async _onUpload (index, bytes, from) {
this._updateStats(index, bytes, from)
this._updateStats(this.uploadStats, index, bytes, from)
}

async _onDownload (index, bytes, from) {
this._updateStats(index, bytes, from)
}

async _loadLocalState () {
// @TODO: There's a better way to do this?
let blockIdx = this.stats.blockOffset
while (blockIdx <= this.stats.targetBlocks) {
if (await this.blobs.core.core.bitfield.get(blockIdx)) {
const bytes = await this.blobs.core.core.blocks.get(blockIdx)
this.stats.totalBytes += bytes.length
this.stats.blocks++
}
blockIdx++
}
this._updateStats(this.downloadStats, index, bytes, from)
}

_updateStats (index, bytes, from) {
_updateStats (stats, index, bytes, from) {
if (!this.entry || this.closing) return
if (!isWithinRange(index, this.entry)) return

this.stats.peersCount = from.replicator.peers.length
this.stats.blocks++
this.stats.monitoringBytes += bytes
this.stats.totalBytes += bytes
stats.peersCount = from.replicator.peers.length
stats.blocks++
stats.monitoringBytes += bytes
stats.totalBytes += bytes

this._calculateStats()
this._calculateStats(stats)
this.emit('update')
}

_calculateStats () {
if (!this.stats.startTime) this.stats.startTime = Date.now()
this.stats.percentage = Number(((this.stats.totalBytes / this.stats.targetBytes) * 100).toFixed(2))
const timeElapsed = (Date.now() - this.stats.startTime) / 1000
_calculateStats (stats) {
if (!stats.startTime) stats.startTime = Date.now()
stats.percentage = Number(((stats.totalBytes / stats.targetBytes) * 100).toFixed(2))
const timeElapsed = (Date.now() - stats.startTime) / 1000
if (timeElapsed > 0) {
this.stats.speed = Math.floor(this.stats.monitoringBytes / timeElapsed) // Speed in bytes/sec
stats.speed = Math.floor(stats.monitoringBytes / timeElapsed) // bytes/sec
}
}
}
Expand Down
52 changes: 11 additions & 41 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1563,7 +1563,7 @@ test('drive.list (recursive false) ignore', async (t) => {
})

test('upload/download can be monitored', async (t) => {
t.plan(17)
t.plan(21)
const { corestore, drive, swarm, mirror } = await testenv(t.teardown)
swarm.on('connection', (conn) => corestore.replicate(conn))
swarm.join(drive.discoveryKey, { server: true, client: false })
Expand All @@ -1585,10 +1585,11 @@ test('upload/download can be monitored', async (t) => {
const expectedBlocks = [2, 1]
const expectedBytes = [bytes, 65536]
monitor.on('update', () => {
t.is(monitor.stats.blocks, expectedBlocks.pop())
t.is(monitor.stats.monitoringBytes, expectedBytes.pop())
t.is(monitor.stats.targetBlocks, 2)
t.is(monitor.stats.targetBytes, bytes)
t.is(monitor.uploadStats.blocks, expectedBlocks.pop())
t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop())
t.is(monitor.uploadStats.targetBlocks, 2)
t.is(monitor.uploadStats.targetBytes, bytes)
t.absent(monitor.downloadStats.blocks)
})
}

Expand All @@ -1601,48 +1602,17 @@ test('upload/download can be monitored', async (t) => {
const expectedBlocks = [2, 1]
const expectedBytes = [bytes, 65536]
monitor.on('update', () => {
t.is(monitor.stats.blocks, expectedBlocks.pop())
t.is(monitor.stats.monitoringBytes, expectedBytes.pop())
t.is(monitor.stats.targetBlocks, 2)
t.is(monitor.stats.targetBytes, bytes)
t.is(monitor.downloadStats.blocks, expectedBlocks.pop())
t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop())
t.is(monitor.downloadStats.targetBlocks, 2)
t.is(monitor.downloadStats.targetBytes, bytes)
t.absent(monitor.uploadStats.blocks)
})
}

await mirror.drive.get(file)
})

test('monitor loads the local state on download', async (t) => {
t.plan(3)
const { corestore, drive, swarm, mirror } = await testenv(t.teardown)
swarm.on('connection', (conn) => corestore.replicate(conn))
swarm.join(drive.discoveryKey, { server: true, client: false })
await swarm.flush()

mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn))
mirror.swarm.join(drive.discoveryKey, { server: false, client: true })
await mirror.swarm.flush()

const file = '/example.md'
const bytes = 1234
const buffer = Buffer.alloc(bytes, '0')
await drive.put(file, buffer)

observe()
async function observe () {
for await (const _ of mirror.drive.watch()) { /* eslint-disable-line */
await mirror.drive.get(file)
// Start monitoring after we've downloaded the file
const monitor = mirror.drive.monitor(file, { download: true })
monitor.on('update', () => {
t.is(monitor.stats.percentage, 100)
t.is(monitor.stats.totalBytes, bytes)
t.is(monitor.stats.targetBytes, bytes)
})
await monitor.ready()
}
}
})

async function testenv (teardown) {
const corestore = new Corestore(RAM)
await corestore.ready()
Expand Down

0 comments on commit 9269748

Please sign in to comment.