Skip to content

Commit

Permalink
feat: support batching (#66)
Browse files Browse the repository at this point in the history
Can't add tests now since batching is a cloud only feature at the
moment.
Will need to come back and add it once extracted out into OSS

---------

Co-authored-by: Darwin D Wu <darwin67@users.noreply.github.com>
  • Loading branch information
darwin67 and darwin67 authored Nov 26, 2023
1 parent dcb6aef commit 85fca8c
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 57 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
name: Test (Elixir ${{ matrix.elixir }} / OTP ${{ matrix.otp }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
# NOTE: best effort coverage
# https://hexdocs.pm/elixir/compatibility-and-deprecations.html#compatibility-between-elixir-and-erlang-otp
Expand Down
4 changes: 4 additions & 0 deletions lib/inngest/error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ end
defmodule Inngest.InvalidDebounceConfigError do
defexception message: "a 'period' must be set for debounce"
end

defmodule Inngest.InvalidBatchEventConfigError do
defexception [:message]
end
78 changes: 21 additions & 57 deletions lib/inngest/function.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,16 @@ defmodule Inngest.Function do
)

@behaviour Inngest.Function
@default_retries 3

@impl true
def slug() do
__MODULE__.__info__(:attributes)
|> Keyword.get(:func)
|> List.first()
fn_opts()
|> Map.get(:id)
end

@impl true
def name() do
case __MODULE__.__info__(:attributes)
|> Keyword.get(:func)
|> List.first()
|> Map.get(:name) do
case fn_opts() |> Map.get(:name) do
nil -> slug()
name -> name
end
Expand All @@ -116,13 +110,13 @@ defmodule Inngest.Function do
end

def slugs() do
failure = if failure_handler_defined?(__MODULE__), do: [failure_slug()], else: []
failure = if failure_handler_defined?(), do: [failure_slug()], else: []
[slug()] ++ failure
end

def serve(path) do
handler =
if failure_handler_defined?(__MODULE__) do
if failure_handler_defined?() do
id = failure_slug()

[
Expand Down Expand Up @@ -172,38 +166,31 @@ defmodule Inngest.Function do
}
}
|> maybe_debounce()
|> maybe_batch_events()
] ++ handler
end

defp retries() do
case __MODULE__.__info__(:attributes)
|> Keyword.get(:func)
|> List.first()
|> Map.get(:retries) do
nil -> @default_retries
retry -> retry
end
end
defp retries(), do: fn_opts() |> Map.get(:retries)

defp maybe_debounce(config) do
case __MODULE__.__info__(:attributes)
|> Keyword.get(:func)
|> List.first()
|> Map.get(:debounce) do
nil ->
config

debounce ->
if Map.get(debounce, :period) == nil do
raise Inngest.InvalidDebounceConfigError
else
Map.put(config, :debounce, debounce)
end
fn_opts()
|> Inngest.FnOpts.validate_debounce(config)
end

defp maybe_batch_events(config) do
fn_opts()
|> Inngest.FnOpts.validate_batch_events(config)
end

defp fn_opts() do
case __MODULE__.__info__(:attributes) |> Keyword.get(:func) |> List.first() do
nil -> %Inngest.FnOpts{}
val -> val
end
end

defp failure_handler_defined?(mod) do
mod.__info__(:functions) |> Keyword.get(:handle_failure) == 2
defp failure_handler_defined?() do
__MODULE__.__info__(:functions) |> Keyword.get(:handle_failure) == 2
end

defp failure_slug(), do: "#{slug()}-failure"
Expand Down Expand Up @@ -246,26 +233,3 @@ defmodule Inngest.Function do

def validate_datetime(_), do: {:error, "Expect valid DateTime formatted input"}
end

defmodule Inngest.FnOpts do
@moduledoc false

defstruct [
:id,
:name,
:retries,
:debounce
]

@type t() :: %__MODULE__{
id: binary(),
name: binary(),
retries: number() | nil,
debounce: debounce() | nil
}

@type debounce() :: %{
key: nil | binary(),
period: binary()
}
end
103 changes: 103 additions & 0 deletions lib/inngest/function/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
defmodule Inngest.FnOpts do
@moduledoc false

defstruct [
:id,
:name,
:debounce,
:batch_events,
retries: 3
]

alias Inngest.Util

@type t() :: %__MODULE__{
id: binary(),
name: binary(),
retries: number() | nil,
debounce: debounce() | nil,
batch_events: batch_events() | nil
}

@type debounce() :: %{
key: nil | binary(),
period: binary()
}

@type batch_events() :: %{
max_size: number(),
timeout: binary()
}

@doc """
Validate the debounce configuration
"""
@spec validate_debounce(t(), map()) :: map()
def validate_debounce(fnopts, config) do
case fnopts |> Map.get(:debounce) do
nil ->
config

debounce ->
period = Map.get(debounce, :period)

if is_nil(period) do
raise Inngest.InvalidDebounceConfigError
end

case Util.parse_duration(period) do
{:error, error} ->
raise Inngest.InvalidDebounceConfigError, message: error

{:ok, seconds} ->
# credo:disable-for-next-line
if seconds > 7 * Util.day_in_seconds() do
raise Inngest.InvalidDebounceConfigError,
message: "cannot specify period for more than 7 days"
end
end

Map.put(config, :debounce, debounce)
end
end

@doc """
Validate the event batch config
"""
@spec validate_batch_events(t(), map()) :: map()
def validate_batch_events(fnopts, config) do
case fnopts |> Map.get(:batch_events) do
nil ->
config

batch ->
max_size = Map.get(batch, :max_size)
timeout = Map.get(batch, :timeout)

if is_nil(max_size) do
raise Inngest.InvalidBatchEventConfigError,
message: "'max_size' must be set for batch_events"
end

if is_nil(timeout) do
raise Inngest.InvalidBatchEventConfigError,
message: "'timeout' must be set for batch_events"
end

case Util.parse_duration(timeout) do
{:error, error} ->
raise Inngest.InvalidBatchEventConfigError, message: error

{:ok, seconds} ->
# credo:disable-for-next-line
if seconds < 1 || seconds > 60 do
raise Inngest.InvalidBatchEventConfigError,
message: "'timeout' duration set to '#{timeout}', needs to be 1s - 60s"
end
end

batch = batch |> Map.put(:maxSize, max_size) |> Map.drop([:max_size])
Map.put(config, :batchEvents, batch)
end
end
end
32 changes: 32 additions & 0 deletions lib/inngest/util.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Inngest.Util do
@moduledoc """
Utility functions
"""

@doc """
Parse string duration that Inngest understands into seconds
"""
@spec parse_duration(binary()) :: {:ok, number()} | {:error, binary()}
def parse_duration(value) do
with [_, num, unit] <- Regex.run(~r/(\d+)(s|m|h|d)/i, value),
dur <- String.to_integer(num) do
case unit do
"d" -> {:ok, dur * day_in_seconds()}
"h" -> {:ok, dur * hour_in_seconds()}
"m" -> {:ok, dur * minute_in_seconds()}
"s" -> {:ok, dur}
_ -> {:error, "invalid time unit '#{unit}', must be d|h|m|s"}
end
else
nil ->
{:error, "invalid duration: '#{value}'"}

_ ->
{:error, "unknow error occurred when parsing duration"}
end
end

def day_in_seconds(), do: 60 * 60 * 24
def hour_in_seconds(), do: 60 * 60
def minute_in_seconds(), do: 60
end
8 changes: 8 additions & 0 deletions test/inngest/function/cases/batch_events_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule Inngest.Function.Cases.BatchEventsTest do
use ExUnit.Case, async: true

alias Inngest.Test.DevServer

Check warning on line 4 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.14 / OTP 24.3)

unused alias DevServer

Check warning on line 4 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.14 / OTP 25.3)

unused alias DevServer

Check warning on line 4 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15 / OTP 24.3)

unused alias DevServer

Check warning on line 4 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15 / OTP 25.3)

unused alias DevServer
import Inngest.Test.Helper

Check warning on line 5 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.14 / OTP 24.3)

unused import Inngest.Test.Helper

Check warning on line 5 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.14 / OTP 25.3)

unused import Inngest.Test.Helper

Check warning on line 5 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15 / OTP 24.3)

unused import Inngest.Test.Helper

Check warning on line 5 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15 / OTP 25.3)

unused import Inngest.Test.Helper

# TODO: Add test after moving batching logic to OSS
end
Loading

0 comments on commit 85fca8c

Please sign in to comment.