Skip to content

Commit

Permalink
add ability to pass gen_server start options to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
dziaineka committed Jan 8, 2024
1 parent 53ed12d commit 5e8f033
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 20 deletions.
6 changes: 3 additions & 3 deletions include/fox.hrl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
-include_lib("amqp_client/include/amqp_client.hrl").

-type(pool_name() :: binary() | string() | atom()).
-type(queue_name() :: binary()).
-type(subscribe_queue() :: queue_name() | #'basic.consume'{}).
-type pool_name() :: binary() | string() | atom().
-type queue_name() :: binary().
-type subscribe_queue() :: queue_name() | #'basic.consume'{}.

-record(conn_worker_state, {
connection :: pid() | undefined,
Expand Down
14 changes: 10 additions & 4 deletions src/fox.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
create_connection_pool/3,
close_connection_pool/1,
get_channel/1,
subscribe/3, subscribe/4, unsubscribe/2,
subscribe/3, subscribe/4, subscribe/5,
unsubscribe/2,
declare_exchange/2, declare_exchange/3,
delete_exchange/2, delete_exchange/3,
bind_exchange/4, bind_exchange/5,
Expand Down Expand Up @@ -77,15 +78,20 @@ get_channel(PoolName0) ->
-spec subscribe(pool_name(), subscribe_queue(), module()) ->
{ok, SubscriptionReference :: reference()} | {error, Reason :: term()}.
subscribe(PoolName, Queue, SubsModule) ->
subscribe(PoolName, Queue, SubsModule, []).
subscribe(PoolName, Queue, SubsModule, [], []).


-spec subscribe(pool_name(), subscribe_queue(), module(), list()) ->
{ok, SubscriptionReference :: reference()}.
subscribe(PoolName0, BasicConsumeOrQueueName, SubsModule, SubsArgs) ->
subscribe(PoolName0, BasicConsumeOrQueueName, SubsModule, SubsArgs, []).

-spec subscribe(pool_name(), subscribe_queue(), module(), list(), [gen_server:start_opt()]) ->
{ok, SubscriptionReference :: reference()}.
subscribe(PoolName0, BasicConsumeOrQueueName, SubsModule, SubsArgs, GenServerStartOptions) ->
PoolName = fox_utils:name_to_atom(PoolName0),
CPid = fox_conn_pool:get_conn_worker(PoolName),
BasicConsume =
BasicConsume =
case BasicConsumeOrQueueName of
#'basic.consume'{} = Consume -> Consume;
Name when is_binary(Name) -> #'basic.consume'{queue = Name}
Expand All @@ -99,7 +105,7 @@ subscribe(PoolName0, BasicConsumeOrQueueName, SubsModule, SubsArgs) ->
subs_module = SubsModule,
subs_args = SubsArgs
},
{ok, _} = fox_subs_sup:start_subscriber(PoolName, Subs),
{ok, _} = fox_subs_sup:start_subscriber(PoolName, Subs, GenServerStartOptions),
{ok, SubsRef}.


Expand Down
8 changes: 4 additions & 4 deletions src/subscription/fox_subs_sup.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(fox_subs_sup).
-behaviour(supervisor).

-export([start_link/1, start_subscriber/2, init/1]).
-export([start_link/1, start_subscriber/3, init/1]).

-include("otp_types.hrl").
-include("fox.hrl").
Expand All @@ -13,10 +13,10 @@ start_link(PoolName) ->
supervisor:start_link({local, RegName}, ?MODULE, no_args).


-spec start_subscriber(atom(), #subscription{}) -> startchild_ret().
start_subscriber(PoolName, Sub) ->
-spec start_subscriber(atom(), #subscription{}, [gen_server:start_opt()]) -> startchild_ret().
start_subscriber(PoolName, Subscription, GenServerStartOptions) ->
RegName = fox_utils:make_reg_name(?MODULE, PoolName),
supervisor:start_child(RegName, [Sub]).
supervisor:start_child(RegName, [Subscription, GenServerStartOptions]).


-spec(init(gs_args()) -> sup_init_reply()).
Expand Down
18 changes: 9 additions & 9 deletions src/subscription/fox_subs_worker.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(fox_subs_worker).
-behavior(gen_server).

-export([start_link/1, connection_established/2, stop/1]).
-export([start_link/2, connection_established/2, stop/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-include("otp_types.hrl").
Expand All @@ -14,9 +14,9 @@

%%% module API

-spec start_link(#subscription{}) -> gs_start_link_reply().
start_link(State) ->
gen_server:start_link(?MODULE, State, []).
-spec start_link(#subscription{}, [gen_server:start_opt()]) -> gs_start_link_reply().
start_link(State, StartOptions) ->
gen_server:start_link(?MODULE, State, StartOptions).


-spec connection_established(pid(), pid()) -> ok.
Expand Down Expand Up @@ -74,7 +74,7 @@ handle_cast({connection_established, Conn},
= State) ->
State2 = unsubscribe(State),
case amqp_connection:open_channel(Conn) of
{ok, Channel} ->
{ok, Channel} ->
logger:info("~s subscribe to queue", [worker_name(State)]),

Ref = erlang:monitor(process, Channel),
Expand All @@ -85,11 +85,11 @@ handle_cast({connection_established, Conn},

{noreply, State2#subscription{
connection = Conn,
channel = Channel,
channel_ref = Ref,
subs_state = SubsState,
channel = Channel,
channel_ref = Ref,
subs_state = SubsState,
subs_tag = Tag}};
Other ->
Other ->
logger:info("~s can't subscribe to queue, reason: ~w", [worker_name(State), Other]),
{noreply, State2}
end;
Expand Down

0 comments on commit 5e8f033

Please sign in to comment.