Skip to content

Commit

Permalink
add retry policy using Exq (#4)
Browse files Browse the repository at this point in the history
* init docker

* add notifier logic (#2)

* add notifier logic

* remove debuggers

* fix the tests and whitelist  the apis endpoints

* add consul in run.sh

* remove newrelic var

* update

* add docker file INP

* working docker separately

* running docker composer

* docker working without assets YET

* add retry policy using Exq

* running docker image finally

* fix local.sh to work on dev environment

* make travis work on docker-compose

* remove elixir from travis and depend only on docker

* try to change install to script in travis

* try  a failed test [should be reverted]

* try to run mix test explicitly in the travis file

* remove build in install travis

* return the last exit code in test.sh

* add redis to travis

* run mix test inside travis script

* run test.sh

* read the correct envs inside travis configs

* Revert "try  a failed test [should be reverted]"

This reverts commit 83e10f2.

* push image on deployment

* deploy on merge to master

* isolate the push docker logic in another file

* fix the test.sh

* add a configurable retry policy

* travis

* add redis in docker compose and make the host configurable

* add tests

* edit docker -compose

* test.sh

* add redis in docker compose and make the host configurable

* refine tests
  • Loading branch information
AlaaNour94 authored and ma7modx committed Dec 30, 2018
1 parent 66b4c77 commit 7a6034d
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ before_install:
- curl -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin

- docker-compose --version

- docker-compose build web

script:
Expand Down
8 changes: 8 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,11 @@ config :phoenix, :json_library, Jason
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"

config :exq,
name: Exq,
host: System.get_env("REDIS_URL") || "127.0.0.1",
port: 6379,
namespace: "exq",
concurrency: 500,
queues: ["events"]
8 changes: 7 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ services:
environment:
MYSQL_ROOT_PASSWORD: "password"

redis:
image: redis

web:
build: .
command: /app/local.sh
Expand All @@ -18,9 +21,12 @@ services:
DB_NAME: eventful_dev
DB_HOST: db
MIX_ENV: dev
REDIS_URL: redis
volumes:
- ./:/app
links:
- db
- redis
depends_on:
- db
- db
- redis
14 changes: 12 additions & 2 deletions lib/eventful/helpers/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ defmodule Eventful.Notifier do
require Logger
require HTTPoison
alias Eventful.Resources
alias Eventful.Repo
use Task

def fanout(event) do
Enum.each(Resources.get_subscriptions_for_topic(event.topic_id), fn(subscription) ->
Task.async(__MODULE__, :fire, [event, subscription])
max_retries = subscription.max_retries || System.get_env("max_retries")
Exq.enqueue(Exq, "events", __MODULE__, [event.id, subscription.id], max_retries: max_retries)
end)
end


def fire(event, subscription) do
def perform(event_id, subscription_id) do
event = Repo.get(Eventful.Resources.Event, event_id)
subscription = Repo.get(Eventful.Resources.Subscription, subscription_id)
Logger.info("fire event: #{event.id} for subscription: #{subscription.id}")
case HTTPoison.post(subscription.webhook, event.payload, %{"Content-Type" => "application/json"}) do
{:ok, %{status_code: 200}} ->
Expand All @@ -20,6 +24,12 @@ defmodule Eventful.Notifier do
{status, response} ->
Resources.create_event_log(%{status: "failed", event_id: event.id, subscription_id: subscription.id})
Logger.error("status: #{status}, response: #{inspect(response)}")
raise TaskError, "error: #{inspect(response)} for webhook #{subscription.webhook}"
end
end
end


defmodule TaskError do
defexception message: "Task has failed"
end
1 change: 0 additions & 1 deletion lib/eventful/resources/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ defmodule Eventful.Resources.Event do
use Ecto.Schema
import Ecto.Changeset


schema "events" do
field :payload, :string
field :sender_info, :string
Expand Down
4 changes: 2 additions & 2 deletions lib/eventful/resources/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ defmodule Eventful.Resources.Subscription do
use Ecto.Schema
import Ecto.Changeset


schema "subscriptions" do
field :webhook, :string
field :max_retries, :integer, default: nil

timestamps()

Expand All @@ -15,7 +15,7 @@ defmodule Eventful.Resources.Subscription do
@doc false
def changeset(subscription, attrs) do
subscription
|> cast(attrs, [:webhook, :topic_id])
|> cast(attrs, [:webhook, :topic_id, :max_retries])
|> validate_required([:webhook])
end
end
4 changes: 4 additions & 0 deletions lib/eventful_web/templates/subscription/form.html.eex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
<%= text_input f, :topic_id %>
<%= error_tag f, :topic_id %>

<%= label f, :max_retries %>
<%= text_input f, :max_retries %>
<%= error_tag f, :max_retries %>

<div>
<%= submit "Save" %>
</div>
Expand Down
6 changes: 4 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Eventful.MixProject do
def application do
[
mod: {Eventful.Application, []},
extra_applications: [:logger, :runtime_tools]
extra_applications: [:logger, :runtime_tools, :exq]
]
end

Expand All @@ -44,7 +44,9 @@ defmodule Eventful.MixProject do
{:jason, "~> 1.0"},
{:plug_cowboy, "~> 2.0"},
{:httpoison, "~> 1.4"},
{:poison, "~> 3.1"}
{:poison, "~> 3.1"},
{:exq, "~> 0.12.2"},
{:mock, "~> 0.3.0", only: :test}
]
end

Expand Down
6 changes: 6 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@
"decimal": {:hex, :decimal, "1.6.0", "bfd84d90ff966e1f5d4370bdd3943432d8f65f07d3bab48001aebd7030590dcc", [:mix], [], "hexpm"},
"ecto": {:hex, :ecto, "3.0.3", "018a3df0956636f84eb3033d807485a7d3dea8474f47b90da5cb8073444c4384", [:mix], [{:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm"},
"ecto_sql": {:hex, :ecto_sql, "3.0.2", "0e04cbc183b91ea0085c502226befcd237a4ac31c204fd4be8d4db6676b5f10d", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.0.3", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.2.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.0", "ff26e938f95830b1db152cb6e594d711c10c02c6391236900ddd070a6b01271d", [:mix], [], "hexpm"},
"exq": {:hex, :exq, "0.12.2", "7f4557a13a7fcfb159b318ee3275a1f2c38a572847d4cf8d47e303a15d0807b4", [:mix], [{:elixir_uuid, ">= 1.2.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:poison, ">= 1.2.0 or ~> 2.0", [hex: :poison, repo: "hexpm", optional: false]}, {:redix, ">= 0.5.0 and < 0.8.0", [hex: :redix, repo: "hexpm", optional: false]}], "hexpm"},
"file_system": {:hex, :file_system, "0.2.6", "fd4dc3af89b9ab1dc8ccbcc214a0e60c41f34be251d9307920748a14bf41f1d3", [:mix], [], "hexpm"},
"gettext": {:hex, :gettext, "0.16.1", "e2130b25eebcbe02bb343b119a07ae2c7e28bd4b146c4a154da2ffb2b3507af2", [:mix], [], "hexpm"},
"hackney": {:hex, :hackney, "1.14.3", "b5f6f5dcc4f1fba340762738759209e21914516df6be440d85772542d4a5e412", [:rebar3], [{:certifi, "2.4.2", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.4", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"httpoison": {:hex, :httpoison, "1.4.0", "e0b3c2ad6fa573134e42194d13e925acfa8f89d138bc621ffb7b1989e6d22e73", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"mariaex": {:hex, :mariaex, "0.9.1", "83266fec657ea68dd426f4bbc12594be45ee91fe162ebf1bf017ce3cfa098ddd", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.2", [hex: :decimal, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm"},
"meck": {:hex, :meck, "0.8.12", "1f7b1a9f5d12c511848fec26bbefd09a21e1432eadb8982d9a8aceb9891a3cf2", [:rebar3], [], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
"mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"},
"mock": {:hex, :mock, "0.3.2", "e98e998fd76c191c7e1a9557c8617912c53df3d4a6132f561eb762b699ef59fa", [:mix], [{:meck, "~> 0.8.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
"phoenix": {:hex, :phoenix, "1.4.0", "56fe9a809e0e735f3e3b9b31c1b749d4b436e466d8da627b8d82f90eaae714d2", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"},
"phoenix_ecto": {:hex, :phoenix_ecto, "4.0.0", "c43117a136e7399ea04ecaac73f8f23ee0ffe3e07acfcb8062fe5f4c9f0f6531", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.9", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
Expand All @@ -28,7 +32,9 @@
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"ranch": {:hex, :ranch, "1.7.0", "9583f47160ca62af7f8d5db11454068eaa32b56eeadf984d4f46e61a076df5f2", [:rebar3], [], "hexpm"},
"redix": {:hex, :redix, "0.7.1", "25a6c1c0d9b2d12a35aef759f9e49bd9bca00e0cd857ce766412f08fdda72163", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm"},
"telemetry": {:hex, :telemetry, "0.2.0", "5b40caa3efe4deb30fb12d7cd8ed4f556f6d6bd15c374c2366772161311ce377", [:mix], [], "hexpm"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm"},
"web_spell": {:hex, :web_spell, "0.1.1", "8122305ce8b78d3e172742858dea733bd712162603cac67dd4500a25646cd459", [:mix], [], "hexpm"},
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Eventful.Repo.Migrations.AddMaxRetriesToSubscriptions do
use Ecto.Migration

def change do
alter table(:subscriptions) do
add :max_retries, :integer, default: nil
end
end
end
18 changes: 17 additions & 1 deletion test/eventful/resources/resources_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Eventful.ResourcesTest do
use Eventful.DataCase

import Mock
alias Eventful.Resources

describe "topics" do
Expand Down Expand Up @@ -241,5 +241,21 @@ defmodule Eventful.ResourcesTest do
event_log = event_log_fixture()
assert %Ecto.Changeset{} = Resources.change_event_log(event_log)
end

test "perform/2 fire the task correctly" do
event = event_fixture()
with_mock HTTPoison, [post: fn(_url, _payload, _headers) -> {:ok, %{status_code: 200}} end] do
subscription = subscription_fixture()
assert {:ok, _} = Eventful.Notifier.perform(event.id, subscription.id)
end
end

test "perform/2 raise Error with wrong webhook" do
event = event_fixture()
subscription = subscription_fixture()
with_mock HTTPoison, [post: fn(_url, _payload, _headers) -> {:error, %{status_code: 500}} end] do
assert_raise TaskError, fn -> Eventful.Notifier.perform(event.id, subscription.id) end
end
end
end
end

0 comments on commit 7a6034d

Please sign in to comment.