From ff4467ac1ba10aa44a0f6345273ed7c2e6d2eb3e Mon Sep 17 00:00:00 2001 From: Andrew MacMurray Date: Tue, 14 Nov 2023 17:53:46 +0000 Subject: [PATCH] Identify pool on init (#22) --- README.md | 12 +- runner/index.ts | 102 +++++---- .../Internal/ConcurrentTask.elm | 208 +++++++++++++++--- 3 files changed, 249 insertions(+), 73 deletions(-) diff --git a/README.md b/README.md index 794b17a..bcdb28d 100644 --- a/README.md +++ b/README.md @@ -199,9 +199,9 @@ npm install @andrewmacmurray/elm-concurrent-task ### 2. Add to your Elm app -Your Elm program needs +Your Elm program needs: -- A `ConcurrentTask.Pool` in your `Model` to keep track of each task attempt: +- A single `ConcurrentTask.Pool` in your `Model` to keep track of each task attempt: ```elm type alias Model = @@ -365,6 +365,14 @@ const tasks = { **NOTE**: for a more complete `localStorage` integration with proper error handling [check out the localstorage example](https://github.com/andrewMacmurray/elm-concurrent-task/blob/ba7c8af4b1afeff138ba839511d4411a0a40bbb1/examples/localstorage-fruit-trees/src/index.ts). +## Re-using ports + +Each `send` and `receive` port pair only support **one** `ConcurrentTask.Pool` subscribed at a time. +**Weird** things can happen if you have **two or more** `ConcurrentTask.Pool`s using the same ports at the same time. + +Generally this should not be needed, but if you have a use-case, please leave an [issue](https://github.com/andrewMacmurray/elm-concurrent-task/issues). + + ## Develop Locally Install Dependencies: diff --git a/runner/index.ts b/runner/index.ts index 2146791..0698904 100644 --- a/runner/index.ts +++ b/runner/index.ts @@ -14,11 +14,17 @@ export * from "./browser"; export interface ElmPorts { send: { - subscribe: (callback: (defs: TaskDefinition[]) => Promise) => void; + subscribe: ( + callback: (defs: TaskDefinition[] | Command) => Promise + ) => void; }; - receive: { send: (result: TaskResult[]) => void }; + receive: { send: (result: TaskResult[] | PoolId) => void }; } +export type Command = { command: "identify-pool" }; + +export type PoolId = { poolId: number }; + export type Tasks = { [fn: string]: (arg: any) => any }; export interface TaskDefinition { @@ -118,49 +124,59 @@ export function register(options: Options): void { const tasks = createTasks(options); const subscribe = options.ports.send.subscribe; const send = options.ports.receive.send; - - subscribe(async (defs) => { - const debouncedSend = debounce(send, debounceThreshold(defs)); - - for (const def of defs) { - if (!tasks[def.function]) { - return debouncedSend({ - attemptId: def.attemptId, - taskId: def.taskId, - result: { - error: { - reason: "missing_function", - message: `${def.function} is not registered`, + let poolId = 0; + + subscribe(async (payload) => { + if ("command" in payload) { + if (payload.command === "identify-pool") { + send({ poolId }); + poolId = cycleInt({ max: 1000 }, poolId); + } else { + throw new Error(`Unrecognised internal command: ${payload}`); + } + } else { + const debouncedSend = debounce(send, debounceThreshold(payload)); + + for (const def of payload) { + if (!tasks[def.function]) { + return debouncedSend({ + attemptId: def.attemptId, + taskId: def.taskId, + result: { + error: { + reason: "missing_function", + message: `${def.function} is not registered`, + }, }, - }, - }); + }); + } } - } - defs.map(async (def) => { - try { - logTaskStart(def, options); - const result = await tasks[def.function]?.(def.args); - logTaskFinish(def, options); - debouncedSend({ - attemptId: def.attemptId, - taskId: def.taskId, - result: { value: result }, - }); - } catch (e) { - debouncedSend({ - attemptId: def.attemptId, - taskId: def.taskId, - result: { - error: { - reason: "js_exception", - message: `${e.name}: ${e.message}`, - raw: e, + payload.map(async (def) => { + try { + logTaskStart(def, options); + const result = await tasks[def.function]?.(def.args); + logTaskFinish(def, options); + debouncedSend({ + attemptId: def.attemptId, + taskId: def.taskId, + result: { value: result }, + }); + } catch (e) { + debouncedSend({ + attemptId: def.attemptId, + taskId: def.taskId, + result: { + error: { + reason: "js_exception", + message: `${e.name}: ${e.message}`, + raw: e, + }, }, - }, - }); - } - }); + }); + } + }); + } }); } @@ -239,3 +255,7 @@ function prefixWith(prefix: string, tasks: Tasks): Tasks { Object.entries(tasks).map(([key, fn]) => [`${prefix}${key}`, fn]) ); } + +function cycleInt(options: { max: number }, i: number): number { + return i >= options.max ? 0 : i + 1; +} diff --git a/src/ConcurrentTask/Internal/ConcurrentTask.elm b/src/ConcurrentTask/Internal/ConcurrentTask.elm index 533278f..5a9054f 100644 --- a/src/ConcurrentTask/Internal/ConcurrentTask.elm +++ b/src/ConcurrentTask/Internal/ConcurrentTask.elm @@ -311,6 +311,15 @@ sequenceHelp tasks combined = -- Batch +{-| Dividing each batch into mini-batches for some reason makes this stack safe up to 10M+ Tasks. + +Without dividing into mini-batches, `batch` quickly falls over at much smaller numbers. +Because each individual task needs a unique Id the `Task` type is difficult to defunctionalize (), +which would help significantly with stack safety. + +A clear explanation of why this works (or an alternative method!) would be much appreciated! (An approach something like this would be ideal ). + +-} batch : List (ConcurrentTask x a) -> ConcurrentTask x (List a) batch tasks = tasks @@ -558,11 +567,34 @@ type Pool msg x a type alias Pool_ msg x a = - { attempts : Dict AttemptId (Progress msg x a) + { poolId : PoolId + , queued : List ( Array Todo, Progress msg x a ) + , attempts : Dict AttemptId (Progress msg x a) , attemptIds : Ids } +{-| Because Pools can be instantiated multiple times (think switching pages in a Single Page App), +without a unique identifier a Task Pool may end up receiving responses for a Task pool that was previously discarded. + +One example is a user switching back and forth between two pages: + + - Page one has a long running task on `init` + - The user switches to page 2, then switches back to page 1 + - A new long running task is started + - But the Task Pool can receive the response from the first long running task (which is unexpected behaviour) + +Adding a `PoolId` is a bit fiddly (internally there needs to be a random / external value present before any tasks start), but it solves this problem. + +The JS runner externally keeps track of the number of Pools that have been instantiated and sends that number back, so each new pool is unique. + +-} +type PoolId + = Unidentified + | Identifying + | Identified String + + type alias Progress msg x a = { inFlight : Set TaskId , task : ( Ids, ConcurrentTask x a ) @@ -608,43 +640,109 @@ attempt attempt_ task = ) ( _, Pending defs _ ) -> - ( startAttempt - { task = ( Ids.init, task ) - , inFlight = recordSent defs Set.empty - , onComplete = attempt_.onComplete - } - attempt_.pool - , attempt_.send (encodeDefinitions (currentAttemptId attempt_.pool) defs) - ) + let + progress : Progress msg x a + progress = + { task = ( Ids.init, task ) + , inFlight = recordSent defs Set.empty + , onComplete = attempt_.onComplete + } + in + case poolId attempt_.pool of + Unidentified -> + ( withPoolId Identifying attempt_.pool |> queueTask ( defs, progress ) + , attempt_.send identifyPoolCmd + ) + Identifying -> + ( attempt_.pool |> queueTask ( defs, progress ) + , Cmd.none + ) + + Identified _ -> + startTask + { progress = progress + , pool = attempt_.pool + , send = attempt_.send + , defs = defs + } + + +startTask : + { progress : Progress msg x a + , pool : Pool msg x a + , send : Encode.Value -> Cmd msg + , defs : Array Todo + } + -> ( Pool msg x a, Cmd msg ) +startTask options = + ( startAttempt options.progress options.pool + , options.send (encodeDefinitions (currentAttemptId options.pool) options.defs) + ) -currentAttemptId : Pool msg x a -> AttemptId -currentAttemptId (Pool pool_) = - Ids.get pool_.attemptIds + +identifyPoolCmd : Encode.Value +identifyPoolCmd = + Encode.object [ ( "command", Encode.string "identify-pool" ) ] + + +decodeIdentifyResponse : Decode.Value -> Result Decode.Error PoolId +decodeIdentifyResponse = + Decode.decodeValue + (Decode.map (String.fromInt >> Identified) + (Decode.field "poolId" Decode.int) + ) onProgress : OnProgress msg x a -> Pool msg x a -> Sub msg onProgress options pool_ = options.receive (\rawResults -> - toBatchResults rawResults - |> Dict.toList - |> List.foldl - (\( attempt_, results ) ( p, cmd ) -> - case findAttempt attempt_ p of - Nothing -> - ( p, cmd ) - - Just progress -> - progress - |> updateAttempt options p ( attempt_, results ) - |> Tuple.mapSecond (\c -> Cmd.batch [ c, cmd ]) - ) - ( pool_, Cmd.none ) - |> options.onProgress + case decodeIdentifyResponse rawResults of + Ok id -> + options.onProgress + (startQueuedTasks + { pool = withPoolId id pool_ + , send = options.send + } + ) + + Err _ -> + toBatchResults rawResults + |> Dict.toList + |> List.foldl + (\( attempt_, results ) ( p, cmd ) -> + case findAttempt attempt_ p of + Nothing -> + ( p, cmd ) + + Just progress -> + progress + |> updateAttempt options p ( attempt_, results ) + |> withCmd cmd + ) + ( pool_, Cmd.none ) + |> options.onProgress ) +startQueuedTasks : { send : Encode.Value -> Cmd msg, pool : Pool msg x a } -> ( Pool msg x a, Cmd msg ) +startQueuedTasks options = + queuedTasks options.pool + |> List.foldl + (\( defs, progress ) ( pool_, cmd ) -> + startTask + { progress = progress + , defs = defs + , pool = pool_ + , send = options.send + } + |> withCmd cmd + ) + ( options.pool, Cmd.none ) + |> Tuple.mapFirst clearQueue + + updateAttempt : OnProgress msg x a -> Pool msg x a -> ( AttemptId, Results ) -> Progress msg x a -> ( Pool msg x a, Cmd msg ) updateAttempt options pool_ ( attemptId, results ) progress = case stepTask results progress.task of @@ -899,20 +997,51 @@ encodeDefinition attemptId def = pool : Pool msg x a pool = Pool - { attempts = Dict.empty + { poolId = Unidentified + , queued = [] + , attempts = Dict.empty , attemptIds = Ids.init } startAttempt : Progress msg x a -> Pool msg x a -> Pool msg x a -startAttempt progress = +startAttempt progress p = mapPool (\pool_ -> { pool_ - | attempts = Dict.insert (Ids.get pool_.attemptIds) progress pool_.attempts + | attempts = Dict.insert (currentAttemptId p) progress pool_.attempts , attemptIds = Ids.next pool_.attemptIds } ) + p + + +currentAttemptId : Pool msg x a -> AttemptId +currentAttemptId (Pool pool_) = + case pool_.poolId of + Identified id -> + id ++ ":" ++ Ids.get pool_.attemptIds + + Unidentified -> + Ids.get pool_.attemptIds + + Identifying -> + Ids.get pool_.attemptIds + + +poolId : Pool msg x a -> PoolId +poolId (Pool pool_) = + pool_.poolId + + +withPoolId : PoolId -> Pool msg x a -> Pool msg x a +withPoolId id = + mapPool (\pool_ -> { pool_ | poolId = id }) + + +queueTask : ( Array Todo, Progress msg x a ) -> Pool msg x a -> Pool msg x a +queueTask progress = + mapPool (\pool_ -> { pool_ | queued = progress :: pool_.queued }) updateProgressFor : AttemptId -> Progress msg x a -> Pool msg x a -> Pool msg x a @@ -925,6 +1054,16 @@ removeFromPool attemptId = mapPool (\pool_ -> { pool_ | attempts = Dict.remove attemptId pool_.attempts }) +queuedTasks : Pool msg x a -> List ( Array Todo, Progress msg x a ) +queuedTasks (Pool p) = + p.queued + + +clearQueue : Pool msg x a -> Pool msg x a +clearQueue = + mapPool (\pool_ -> { pool_ | queued = [] }) + + findAttempt : AttemptId -> Pool msg x a -> Maybe (Progress msg x a) findAttempt attemptId (Pool p) = Dict.get attemptId p.attempts @@ -933,3 +1072,12 @@ findAttempt attemptId (Pool p) = mapPool : (Pool_ msg x a -> Pool_ msg x a) -> Pool msg x a -> Pool msg x a mapPool f (Pool p) = Pool (f p) + + + +-- Utils + + +withCmd : Cmd msg -> ( model, Cmd msg ) -> ( model, Cmd msg ) +withCmd cmd = + Tuple.mapSecond (\c -> Cmd.batch [ c, cmd ])