From 5e831683e688b308cc913a546b4de176a61276c4 Mon Sep 17 00:00:00 2001 From: "amano.kenji" Date: Sun, 18 Feb 2024 02:03:32 +0000 Subject: [PATCH] Replace spork/stream/lines-channel with channel/from-each --- spork/channel.janet | 37 +++++++++++++++++++++++++++++++++++++ spork/stream.janet | 36 ------------------------------------ 2 files changed, 37 insertions(+), 36 deletions(-) create mode 100644 spork/channel.janet diff --git a/spork/channel.janet b/spork/channel.janet new file mode 100644 index 0000000..87d3931 --- /dev/null +++ b/spork/channel.janet @@ -0,0 +1,37 @@ +(defn from-each + ``` + Returns a channel that gives each item from an iterable data type. `each` macro is used to iterate over all iterable + types. `supervisor` argument is passed to `ev/go` which launches two tasks that feed items to the channel. To finish + the tasks, drain all items from the channel, or close the channel. Otherwise, the tasks remain frozen. When the tasks + finish, the channel is closed. An error caused during iteration finishes the tasks with an error. Writing to the + channel finishes the tasks with an error or freezes the fiber that tries to write to the channel. + ``` + [iterable &named supervisor] + (def ch (ev/chan)) + (def iterable-ch (ev/chan)) + (def iterable-task (ev/go |(try + (defer (:close iterable-ch) + (each item iterable + (ev/give iterable-ch item))) + ([err f] + (unless (= err :cancel) + (propagate err f)))) + nil supervisor)) + (defn give-items [] + (match (ev/select ch iterable-ch) + [:take c item] + (if (= c iterable-ch) + (do + (ev/give ch item) + (give-items)) + (do + (ev/cancel iterable-task :cancel) + (error "Writing to the returned channel is prohibited."))) + [:close c] + # If iterable-ch is closed, give-items exits quietly. + (when (= c ch) + (ev/cancel iterable-task :cancel)))) + (ev/go |(defer (:close ch) + (give-items)) + nil supervisor) + ch) diff --git a/spork/stream.janet b/spork/stream.janet index ace1515..45a039b 100644 --- a/spork/stream.janet +++ b/spork/stream.janet @@ -26,39 +26,3 @@ (when (not (empty? chunk)) (yield chunk))))) (coro (fetch-lines @""))) - -(defn lines-channel - ``` - Returns a channel that gives each line from a core/stream value. If separator is not specified, the default separator - is `\n`. `supervisor` argument is passed to `ev/go` which launches two tasks that feed lines to the channel. To finish - the tasks, drain all lines from the channel, or close the channel. Otherwise, the tasks remain frozen. When the tasks - finish, the channel is closed. A stream error finishes the tasks with an error. Writing to the channel finishes the - tasks with an error or freezes the fiber that tries to write to the channel. - ``` - [stream &named separator supervisor] - (def ch (ev/chan)) - (def stream-ch (ev/chan)) - (def stream-task (ev/go |(try - (defer (:close stream-ch) - (each line (lines stream :separator separator) - (ev/give stream-ch line))) - ([err f] - (unless (= err :cancel) - (propagate err f)))) - nil supervisor)) - (defn give-lines [] - (match (ev/select ch stream-ch) - [:take c line] - (if (= c stream-ch) - (do - (ev/give ch line) - (give-lines)) - (error "Writing to the returned channel is prohibited.")) - [:close c] - # If stream-ch is closed, give-lines exits quietly. - (when (= c ch) - (ev/cancel stream-task :cancel)))) - (ev/go |(defer (:close ch) - (give-lines)) - nil supervisor) - ch)