Node.js transform stream working at constant pace and concurrent for object mode
- Work time at once can be specified. (workMS option)
- Concurrent workers can be specified. (concurrency option)
- Fires
done
event after when all workers have finished asynchrous -processes - Counting tag system to call
this.countTag(<tag>)
in_workPromise
, you can get summarized resultstagCounts
grouped by tag. - Node.js 6.10 or later
- API client that needs to handle the rate-limit
- DB client that needs to handle the read/write capacity units like AWS DynamoDB
$ npm install -save paced-work-stream
new PacedWorkStream(options, workPromise)
options
<Object>
concurrency
is the number of concurrent processes.workMS
is milliseconds of work time at once that contains process-time and wait-time.delay
is enable to start concurrent process in order delay for a time that divided workMS by concurrency, default is false. workPromise must return functions wrap each promise. Refer to the following figure for detailed operation pattern.highWaterMark
is maximum object buffer size. If you use flow mode, you should set it at least concurrency.
workPromise
isfunction(item):
must return a Promise processing the item or a Function that returns a Promise.
super(options)
must be called in the constructor._workPromise
method must be overrided and return a Promise processing the item or a Function that returns a Promise.
class MyWorkStream extends PacedWorkStream {
constructor(options) {
super(options);
}
_workPromise(item) {
return () => {
this.countTag(item.tag);
return Promise.resolve(item.value);
};
}
}
const es = require('event-stream');
const devnull = require('dev-null');
const PacedWorkStream = require('paced-work-stream');
const pwStream = new PacedWorkStream({
concurrency: 2,
workMS: 1000,
highWaterMark: 5
}, function(item) {
console.log(new Date().toISOString(), 'Begin', item);
return new Promise((resolve, reject) => {
setTimeout(() => {
this.countTag('workDone');
console.log(new Date().toISOString(), 'End', item);
resolve();
}, 600); // workMS contains the time.
})
})
.on('done', function() {
console.log(this.tagCounts);
}).on('error', (err) => {
console.error(err);
});
const reader = es.readArray([11, 12, 21, 22, 31])
reader.pipe(pwStream).pipe(devnull({ objectMode: true }));
- Pay attention to handling
done
event to get lasttagCounts
because workers haven't processed items onfinish
event. - If stream need not output, the stream must pipe dev-null.
$ node example.js
2016-09-11T03:17:50.000Z Begin 11
2016-09-11T03:17:50.003Z Begin 12
2016-09-11T03:17:50.605Z End 11
2016-09-11T03:17:50.605Z End 12
2016-09-11T03:17:51.009Z Begin 21
2016-09-11T03:17:51.009Z Begin 22
2016-09-11T03:17:51.606Z End 21
2016-09-11T03:17:51.606Z End 22
2016-09-11T03:17:52.004Z Begin 31
2016-09-11T03:17:52.607Z End 31
{ workDone: 5 }
Promised Lifestream is useful for stream pipeline. The following example gets the same result as above.
'use strict';
const es = require('event-stream');
const PromisedLife = require('promised-lifestream');
const PacedWorkStream = require('paced-work-stream');
const pacedWorker = new PacedWorkStream({
concurrency: 2,
workMS: 1000,
highWaterMark: 5
}, function(item) {
console.log(new Date().toISOString(), 'Begin', item);
return new Promise((resolve, reject) => {
setTimeout(() => {
this.countTag('workDone');
console.log(new Date().toISOString(), 'End', item);
resolve();
}, 600); // workMS contains the time.
})
})
PromisedLife([
es.readArray([11, 12, 21, 22, 31]),
pacedWorker
])
.then(() => {
console.log(pacedWorker.tagCounts);
})
.catch(err => {
console.error(err);
});