Skip to content

Commit

Permalink
feat: idempotency support (#69)
Browse files Browse the repository at this point in the history
Co-authored-by: Darwin D Wu <darwin67@users.noreply.github.com>
  • Loading branch information
darwin67 and darwin67 authored Nov 27, 2023
1 parent 35a0852 commit 10df6e8
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 22 deletions.
4 changes: 4 additions & 0 deletions lib/inngest/error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ end
defmodule Inngest.ConcurrencyConfigError do
defexception [:message]
end

defmodule Inngest.IdempotencyConfigError do
defexception [:message]
end
28 changes: 12 additions & 16 deletions lib/inngest/function.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,31 +168,27 @@ defmodule Inngest.Function do
|> maybe_debounce()
|> maybe_batch_events()
|> maybe_rate_limit()
|> maybe_idempotency()
|> maybe_concurrency()
] ++ handler
end

defp retries(), do: fn_opts() |> Map.get(:retries)

defp maybe_debounce(config) do
fn_opts()
|> Inngest.FnOpts.validate_debounce(config)
end
defp maybe_debounce(config),
do: fn_opts() |> Inngest.FnOpts.validate_debounce(config)

defp maybe_batch_events(config) do
fn_opts()
|> Inngest.FnOpts.validate_batch_events(config)
end
defp maybe_batch_events(config),
do: fn_opts() |> Inngest.FnOpts.validate_batch_events(config)

defp maybe_rate_limit(config) do
fn_opts()
|> Inngest.FnOpts.validate_rate_limit(config)
end
defp maybe_rate_limit(config),
do: fn_opts() |> Inngest.FnOpts.validate_rate_limit(config)

defp maybe_concurrency(config) do
fn_opts()
|> Inngest.FnOpts.validate_concurrency(config)
end
defp maybe_idempotency(config),
do: fn_opts() |> Inngest.FnOpts.validate_idempotency(config)

defp maybe_concurrency(config),
do: fn_opts() |> Inngest.FnOpts.validate_concurrency(config)

defp fn_opts() do
case __MODULE__.__info__(:attributes) |> Keyword.get(:func) |> List.first() do
Expand Down
30 changes: 26 additions & 4 deletions lib/inngest/function/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Inngest.FnOpts do
:debounce,
:batch_events,
:rate_limit,
:idempotency,
:concurrency,
retries: 3
]
Expand All @@ -22,6 +23,7 @@ defmodule Inngest.FnOpts do
debounce: debounce() | nil,
batch_events: batch_events() | nil,
rate_limit: rate_limit() | nil,
idempotency: idempotency() | nil,
concurrency: concurrency() | nil
}

Expand All @@ -41,6 +43,8 @@ defmodule Inngest.FnOpts do
key: binary() | nil
}

@type idempotency() :: binary()

@type concurrency() ::
number()
| concurrency_option()
Expand All @@ -54,7 +58,7 @@ defmodule Inngest.FnOpts do
@concurrency_scopes ["fn", "env", "account"]

@doc """
Validate the debounce configuration
Validate the debounce settings
"""
@spec validate_debounce(t(), map()) :: map()
def validate_debounce(fnopts, config) do
Expand Down Expand Up @@ -86,7 +90,7 @@ defmodule Inngest.FnOpts do
end

@doc """
Validate the event batch config
Validate the event batch settings
"""
@spec validate_batch_events(t(), map()) :: map()
def validate_batch_events(fnopts, config) do
Expand Down Expand Up @@ -121,7 +125,7 @@ defmodule Inngest.FnOpts do
end

@doc """
Validate the rate limit config
Validate the rate limit settings
"""
@spec validate_rate_limit(t(), map()) :: map()
def validate_rate_limit(fnopts, config) do
Expand Down Expand Up @@ -155,7 +159,25 @@ defmodule Inngest.FnOpts do
end

@doc """
Validate the concurrency config
Validate the idempotency settings
"""
def validate_idempotency(fnopts, config) do
# NOTE: nothing really to validate, just have this for the sake of consistency
case fnopts |> Map.get(:idempotency) do
nil ->
config

setting ->
if !is_binary(setting) do
raise Inngest.IdempotencyConfigError, message: "idempotency must be a CEL string"
end

Map.put(config, :idempotency, setting)
end
end

@doc """
Validate the concurrency settings
"""
@spec validate_concurrency(t(), map()) :: map()
def validate_concurrency(fnopts, config) do
Expand Down
53 changes: 53 additions & 0 deletions test/inngest/function/cases/idempotency_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
defmodule Inngest.Function.Cases.IdempotencyTest do
use ExUnit.Case, async: true

alias Inngest.Test.DevServer
import Inngest.Test.Helper

@default_sleep 3_000

@tag :integration
test "should only run 1 out of 10" do
event_ids =
Enum.map(1..10, fn _ -> send_test_event("test/plug.idempotency", %{foobar: false}) end)

Process.sleep(@default_sleep)

fn_runs =
event_ids
|> Enum.map(fn id ->
{:ok, %{"data" => data}} = DevServer.run_ids(id)

if Enum.count(data) == 1 do
assert [
%{
"output" => "Done",
"status" => "Completed",
"run_id" => run_id
}
] = data

run_id
else
nil
end
end)
|> Enum.filter(&(!is_nil(&1)))

assert Enum.count(fn_runs) == 1

# sending with a different value will run
other = send_test_event("test/plug.idempotency", %{foobar: "hello"})
Process.sleep(@default_sleep)

assert {:ok,
%{
"data" => [
%{
"output" => "Done",
"status" => "Completed"
}
]
}} = DevServer.run_ids(other)
end
end
4 changes: 3 additions & 1 deletion test/inngest/function/cases/rate_limit_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Inngest.Function.Cases.RateLimitTest do
@default_sleep 10_000

@tag :integration
test "should only run 2 out of 10" do
test "should <= 2 out of 10" do
event_ids = Enum.map(1..10, fn _ -> send_test_event("test/plug.ratelimit") end)

Process.sleep(@default_sleep)
Expand All @@ -25,6 +25,8 @@ defmodule Inngest.Function.Cases.RateLimitTest do
"run_id" => run_id
}
] = data

run_id
else
nil
end
Expand Down
24 changes: 24 additions & 0 deletions test/inngest/function/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,30 @@ defmodule Inngest.FnOptsTest do
end
end

describe "validate_idempotency/2" do
@fn_opts %FnOpts{
id: "foobar",
name: "Foobar",
idempotency: "event.data.foobar"
}

test "should succeed with valid settings" do
assert %{
idempotency: "event.data.foobar"
} = FnOpts.validate_idempotency(@fn_opts, @config)
end

test "should raise if value is not string" do
opts = %{@fn_opts | idempotency: false}

assert_raise Inngest.IdempotencyConfigError,
"idempotency must be a CEL string",
fn ->
FnOpts.validate_idempotency(opts, @config)
end
end
end

describe "validate_concurrency/2" do
@fn_opts %FnOpts{
id: "foobar",
Expand Down
1 change: 0 additions & 1 deletion test/support/cases/concurrency_fn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ defmodule Inngest.Test.Case.ConcurrencyFn do
@trigger %Trigger{event: "test/plug.throttle"}

@impl true
@spec exec(any, %{:step => atom, optional(any) => any}) :: {:ok, <<_::72>>}
def exec(ctx, %{step: step} = _args) do
_ =
step.run(ctx, "wait", fn ->
Expand Down
18 changes: 18 additions & 0 deletions test/support/cases/idempotent_fn.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Inngest.Test.Case.IdempotentFn do
@moduledoc false

use Inngest.Function
alias Inngest.{FnOpts, Trigger}

@func %FnOpts{
id: "idempotent-fn",
name: "Idempotent Function",
idempotency: "event.data.foobar"
}
@trigger %Trigger{event: "test/plug.idempotency"}

@impl true
def exec(_ctx, _args) do
{:ok, "Done"}
end
end

0 comments on commit 10df6e8

Please sign in to comment.