From 8d997fdfa252844ff568fb3f4c2e6feba792c724 Mon Sep 17 00:00:00 2001 From: "M.C. van den Boogaart" Date: Fri, 25 Jun 2021 12:37:31 +0200 Subject: [PATCH] feat(queue): return the pipeline --- src/server/helpers/queue/task-runner.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/server/helpers/queue/task-runner.ts b/src/server/helpers/queue/task-runner.ts index 9956636c..87546966 100644 --- a/src/server/helpers/queue/task-runner.ts +++ b/src/server/helpers/queue/task-runner.ts @@ -1,13 +1,14 @@ import type { Database, UpdateResult } from '../sql' +import type { DuplexOptions, Readable, Transform, Writable } from 'stream' import type { Queue, QueueRun, TaskRun } from '../../entities' import Ajv from 'ajv' -import type { DuplexOptions } from 'stream' import type { Logger } from 'pino' import type { ObjectSchema } from 'fluent-json-schema' import type { Queuer } from './queuer' import type { WrappedNodeRedisClient } from 'handy-redis' import { createNodeRedisClient } from 'handy-redis' import type { queue as fastq } from 'fastq' +import { pipeline } from '../stream' import { promise } from 'fastq' import { sql } from '../sql' import waitUntil from 'async-wait-until' @@ -261,6 +262,16 @@ export abstract class TaskRunner { return validator } + /** + * Handles a data stream as a Promise. + * + * @param streams - The streams + * @see {@link pipeline} + */ + public async pipeline (...streams: Array): Promise { + return pipeline(...streams) + } + /** * Sets up the task runner. *