Skip to content

Commit

Permalink
Merge pull request #71 from Milly/gather-stream
Browse files Browse the repository at this point in the history
Refactor gather source.
  • Loading branch information
Shougo authored Jul 27, 2023
2 parents 8c7a510 + 9b88201 commit aee0267
Showing 1 changed file with 27 additions and 35 deletions.
62 changes: 27 additions & 35 deletions denops/ddu/ddu.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,29 @@ export class Ddu {
loader: Loader,
itemLevel: number,
parent?: DduItem,
): AsyncGenerator<DduItem[]> {
): AsyncGenerator<DduItem[], void, undefined> {
const { signal } = this.abortController;
if (signal.aborted) {
return;
}

const itemTransformer = new TransformStream<Item[], DduItem[]>({
transform: (chunk, controller) => {
const newItems = chunk.map((item: Item) =>
this.newDduItem(
index,
source,
sourceOptions,
item,
itemLevel,
)
);
controller.enqueue(newItems);
},
});

try {
const sourceItems = source.gather({
yield* source.gather({
denops,
context: this.context,
options: this.options,
Expand All @@ -454,40 +469,17 @@ export class Ddu {
input: this.input,
parent,
loader,
});
const reader = sourceItems.getReader();
const abort = () => reader.cancel(signal.reason);

try {
signal.addEventListener("abort", abort);

for (;;) {
const chunk = await reader.read();
if (chunk.done || signal.aborted) {
break;
}
const newItems = chunk.value.map((item: Item) =>
this.newDduItem(
index,
source,
sourceOptions,
item,
itemLevel,
)
);

yield newItems;
}
} finally {
signal.removeEventListener("abort", abort);
reader.releaseLock();
}
}).pipeThrough(itemTransformer, { signal });
} catch (e: unknown) {
await errorException(
denops,
e,
`source: ${source.name} "gather()" failed`,
);
if (signal.aborted && e === signal.reason) {
// Aborted by signal, so do nothing.
} else {
await errorException(
denops,
e,
`source: ${source.name} "gather()" failed`,
);
}
}
}

Expand Down

0 comments on commit aee0267

Please sign in to comment.