Skip to content

Commit

Permalink
Mas i994 handoffsync (#995)
Browse files Browse the repository at this point in the history
* Initial tidy

Remove legacy code from handoff_sender.  The handoff_receiver must keep unused references, as during an upgrade an updated node could be a receiver to a non-updated node sending.

For non-batch not such an issue, as all supported versions for upgrade already supported batch.  However, receiver must still indicate it supports batch due to above problem.

All handoff receiver/sender code tidied down to 80-column width

* Change AckSync to every batch by default

Do away with timer based sync, and ack-sync based on threshold only.

Also log ongoing transfer progress every ack-log threshold.

Log at point of error reason for error - avoid generic {shutdown, timeout} error with no clue as to actual point of code origin.

* Standardise send_sync into function

Fix issue that configure message does not respond sync, and so another sync is now required.

* Receiver needs vnode module not master

* Make first log indicate batch_count of 0

Confusing that the first log on the sending of a batch will indicate a batch_count of 100 not 0

* Batch threshold can be either count or size

Clarify log text, as batch_size no longer fixed

* Reinstate keepalive of receiver

Need to distinguish between failed fold and slow fold - and so the keepalive of the receiver has value.

Now implemented by checking a keepalive_next time every visit item, rather than continuously entering and exiting selective receive

* Further comments

* Update after review

* Metadata exchange on join

Make a metadata exchange part of the join process.  This prevents the situation where a bucket type is active in a cluster, then a node joins (as part of cluster expansion, say), but the bucket properties are not known to joining node during handoff of  objects of that type.

Now, the join cannot be staged without a metadata exchange, so that all joining nodes know of cluster metadata (e.g. bucket types) before the join is committed and handoffs start.

* Attempt exchange on Join

It is only an attempt - failure (i.e. timeout) would be no different to the current state with a potential race, so we don't block joins.  Joins will normally be safer because of this.
  • Loading branch information
martinsumner authored Jan 18, 2023
1 parent 5da9f75 commit ad546ed
Show file tree
Hide file tree
Showing 5 changed files with 568 additions and 415 deletions.
29 changes: 16 additions & 13 deletions src/riak_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,25 +140,28 @@ standard_join(Node, Ring, Rejoin, Auto) ->
{error, different_ring_sizes};
_ ->
GossipVsn = riak_core_gossip:gossip_version(),
Ring2 = riak_core_ring:add_member(node(), Ring,
node()),
Ring2 =
riak_core_ring:add_member(
node(), Ring, node()),
Ring3 = riak_core_ring:set_owner(Ring2, node()),
Ring4 =
riak_core_ring:update_member_meta(node(),
Ring3,
node(),
gossip_vsn,
GossipVsn),
ParticipateInCoverage = app_helper:get_env(riak_core,participate_in_coverage),
riak_core_ring:update_member_meta(
node(), Ring3, node(), gossip_vsn, GossipVsn),
ParticipateInCoverage =
app_helper:get_env(riak_core,participate_in_coverage),
Ring4a =
riak_core_ring:update_member_meta(node(),
Ring4,
node(),
participate_in_coverage, ParticipateInCoverage),
riak_core_ring:update_member_meta(
node(),
Ring4,
node(),
participate_in_coverage,
ParticipateInCoverage),
{_, Ring5} = riak_core_capability:update_ring(Ring4a),
Ring6 = maybe_auto_join(Auto, node(), Ring5),
riak_core_ring_manager:set_my_ring(Ring6),
riak_core_gossip:send_ring(Node, node())
ok = riak_core_gossip:send_ring(Node, node()),
ok = riak_core_metadata_manager:attempt_exchange(Node)

end.

maybe_auto_join(false, _Node, Ring) ->
Expand Down
178 changes: 111 additions & 67 deletions src/riak_core_handoff_receiver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,24 @@
[{gen_fsm, sync_send_all_state_event, 3}]}).

-export([start_link/0, % Don't use SSL
start_link/1, % SSL options list, empty=no SSL
set_socket/2,
supports_batching/0]).
start_link/1, % SSL options list, empty=no SSL
set_socket/2,
supports_batching/0,
get_handoff_timeout/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-record(state, {sock :: port() | undefined,
peer :: term() | undefined,
ssl_opts :: [] | list(),
tcp_mod :: atom(),
recv_timeout_len :: non_neg_integer(),
vnode_timeout_len :: non_neg_integer(),
partition :: non_neg_integer() | undefined,
vnode_mod = riak_kv_vnode :: module(),
vnode :: pid() | undefined,
count = 0 :: non_neg_integer()}).
-record(state,
{sock :: port() | undefined,
peer :: term() | undefined,
ssl_opts :: [] | list(),
tcp_mod :: atom(),
recv_timeout_len :: non_neg_integer(),
vnode_timeout_len :: non_neg_integer(),
partition :: non_neg_integer() | undefined,
vnode_mod = riak_kv_vnode :: module(),
vnode :: pid() | undefined,
count = 0 :: non_neg_integer()}).

%% set the TCP receive timeout to five minutes to be conservative.
-define(RECV_TIMEOUT, 300000).
Expand Down Expand Up @@ -71,48 +73,71 @@ supports_batching() ->
true.

init([SslOpts]) ->
{ok, #state{ssl_opts = SslOpts,
tcp_mod = if SslOpts /= [] -> ssl;
true -> gen_tcp
end,
recv_timeout_len = app_helper:get_env(riak_core, handoff_receive_timeout, ?RECV_TIMEOUT),
vnode_timeout_len = app_helper:get_env(riak_core, handoff_receive_vnode_timeout, ?VNODE_TIMEOUT)}}.

handle_call({set_socket, Socket0}, _From, State = #state{ssl_opts = SslOpts}) ->
{ok,
#state{
ssl_opts = SslOpts,
tcp_mod =
if SslOpts /= [] ->
ssl;
true ->
gen_tcp
end,
recv_timeout_len = get_handoff_timeout(),
vnode_timeout_len =
app_helper:get_env(
riak_core, handoff_receive_vnode_timeout, ?VNODE_TIMEOUT)
}
}.

handle_call({set_socket, Socket0}, _From, State=#state{ssl_opts = SslOpts}) ->
SockOpts = [{active, once}, {packet, 4}, {header, 1}],
Socket = if SslOpts /= [] ->
{ok, Skt} = ssl_handshake(Socket0, SslOpts, 30*1000),
ok = ssl:setopts(Skt, SockOpts),
Peer = safe_peername(Skt, ssl),
Skt;
true ->
ok = inet:setopts(Socket0, SockOpts),
Peer = safe_peername(Socket0, inet),
Socket0
end,
Socket =
if SslOpts /= [] ->
{ok, Skt} = ssl_handshake(Socket0, SslOpts, 30*1000),
ok = ssl:setopts(Skt, SockOpts),
Peer = safe_peername(Skt, ssl),
Skt;
true ->
ok = inet:setopts(Socket0, SockOpts),
Peer = safe_peername(Socket0, inet),
Socket0
end,
{reply, ok, State#state { sock = Socket, peer = Peer }}.

handle_info({tcp_closed,_Socket},State=#state{partition=Partition,count=Count,
peer=Peer}) ->
lager:info("Handoff receiver for partition ~p exited after processing ~p"
" objects from ~p", [Partition, Count, Peer]),
handle_info(
{tcp_closed,_Socket},
State=#state{partition=Partition, count=Count, peer=Peer}) ->
lager:info(
"Handoff receiver for partition ~p exited after processing ~p"
" objects from ~p",
[Partition, Count, Peer]),
{stop, normal, State};
handle_info({tcp_error, _Socket, Reason}, State=#state{partition=Partition,count=Count,
peer=Peer}) ->
lager:info("Handoff receiver for partition ~p exited after processing ~p"
" objects from ~p: TCP error ~p", [Partition, Count, Peer, Reason]),
handle_info(
{tcp_error, _Socket, Reason},
State=#state{partition=Partition,count=Count, peer=Peer}) ->
lager:info(
"Handoff receiver for partition ~p exited after processing ~p"
" objects from ~p: TCP error ~p",
[Partition, Count, Peer, Reason]),
{stop, normal, State};
handle_info({tcp, Socket, Data}, State) ->
[MsgType|MsgData] = Data,
case catch(process_message(MsgType, MsgData, State)) of
{'EXIT', Reason} ->
lager:error("Handoff receiver for partition ~p exited abnormally after "
"processing ~p objects from ~p: ~p", [State#state.partition, State#state.count, State#state.peer, Reason]),
lager:error(
"Handoff receiver for partition ~p exited abnormally after "
"processing ~p objects from ~p: ~p",
[State#state.partition,
State#state.count,
State#state.peer, Reason]),
{stop, normal, State};
NewState when is_record(NewState, state) ->
InetMod = if NewState#state.ssl_opts /= [] -> ssl;
true -> inet
end,
InetMod =
if NewState#state.ssl_opts /= [] ->
ssl;
true ->
inet
end,
InetMod:setopts(Socket, [{active, once}]),
{noreply, NewState, State#state.recv_timeout_len}
end;
Expand All @@ -123,27 +148,38 @@ handle_info({ssl_error, Socket, Reason}, State) ->
handle_info({ssl, Socket, Data}, State) ->
handle_info({tcp, Socket, Data}, State);
handle_info(timeout, State) ->
lager:error("Handoff receiver for partition ~p timed out after "
"processing ~p objects from ~p.", [State#state.partition, State#state.count, State#state.peer]),
lager:error(
"Handoff receiver for partition ~p timed out after "
"processing ~p objects from ~p.",
[State#state.partition, State#state.count, State#state.peer]),
{stop, normal, State}.

process_message(?PT_MSG_INIT, MsgData, State=#state{vnode_mod=VNodeMod,
peer=Peer}) ->
process_message(
?PT_MSG_INIT,
MsgData,
State=#state{vnode_mod=VNodeMod, peer=Peer}) ->
<<Partition:160/integer>> = MsgData,
lager:info("Receiving handoff data for partition ~p:~p from ~p", [VNodeMod, Partition, Peer]),
lager:info(
"Receiving handoff data for partition ~p:~p from ~p",
[VNodeMod, Partition, Peer]),
{ok, VNode} = riak_core_vnode_master:get_vnode_pid(Partition, VNodeMod),
Data = [{mod_src_tgt, {VNodeMod, undefined, Partition}},
{vnode_pid, VNode}],
Data =
[{mod_src_tgt, {VNodeMod, undefined, Partition}}, {vnode_pid, VNode}],
riak_core_handoff_manager:set_recv_data(self(), Data),
State#state{partition=Partition, vnode=VNode};

process_message(?PT_MSG_BATCH, MsgData, State) ->
lists:foldl(fun(Obj, StateAcc) -> process_message(?PT_MSG_OBJ, Obj, StateAcc) end,
State,
binary_to_term(MsgData));

process_message(?PT_MSG_OBJ, MsgData, State=#state{vnode=VNode, count=Count,
vnode_timeout_len=VNodeTimeout}) ->
lists:foldl(
fun(Obj, StateAcc) ->
process_message(?PT_MSG_OBJ, Obj, StateAcc)
end,
State,
binary_to_term(MsgData));
process_message(
?PT_MSG_OBJ,
MsgData,
State =
#state{
vnode=VNode, count=Count, vnode_timeout_len=VNodeTimeout}) ->
Msg = {handoff_data, MsgData},
try gen_fsm:sync_send_all_state_event(VNode, Msg, VNodeTimeout) of
ok ->
Expand All @@ -152,23 +188,28 @@ process_message(?PT_MSG_OBJ, MsgData, State=#state{vnode=VNode, count=Count,
exit(E)
catch
exit:{timeout, _} ->
exit({error, {vnode_timeout, VNodeTimeout, size(MsgData),
binary:part(MsgData, {0,min(size(MsgData),128)})}})
exit({error, {vnode_timeout, VNodeTimeout, size(MsgData)}})
end;
process_message(?PT_MSG_OLDSYNC, MsgData, State=#state{sock=Socket,
tcp_mod=TcpMod}) ->
process_message(
?PT_MSG_OLDSYNC, MsgData, State=#state{sock=Socket, tcp_mod=TcpMod}) ->
% Message still required for now, as when upgrading, may have a sender in
% the cluster which has not upgraded ... and so will still send OLDSYNC
TcpMod:send(Socket, <<?PT_MSG_OLDSYNC:8,"sync">>),
<<VNodeModBin/binary>> = MsgData,
VNodeMod = binary_to_atom(VNodeModBin, utf8),
State#state{vnode_mod=VNodeMod};
process_message(?PT_MSG_SYNC, _MsgData, State=#state{sock=Socket,
tcp_mod=TcpMod}) ->
process_message(
?PT_MSG_SYNC,
_MsgData,
State=#state{sock=Socket, tcp_mod=TcpMod}) ->
TcpMod:send(Socket, <<?PT_MSG_SYNC:8, "sync">>),
State;
process_message(?PT_MSG_CONFIGURE, MsgData, State) ->
ConfProps = binary_to_term(MsgData),
State#state{vnode_mod=proplists:get_value(vnode_mod, ConfProps),
partition=proplists:get_value(partition, ConfProps)};
% Partition used will be over-written by ?PT_MSG_INIT
State#state{
vnode_mod=proplists:get_value(vnode_mod, ConfProps),
partition=proplists:get_value(partition, ConfProps)};
process_message(_, _MsgData, State=#state{sock=Socket,
tcp_mod=TcpMod}) ->
TcpMod:send(Socket, <<255:8,"unknown_msg">>),
Expand All @@ -185,5 +226,8 @@ safe_peername(Skt, Mod) ->
{ok, {Host, Port}} ->
{inet_parse:ntoa(Host), Port};
_ ->
{unknown, unknown} % Real info is {Addr, Port}
{unknown, unknown} % Real info is {Addr, Port}
end.

get_handoff_timeout() ->
app_helper:get_env(riak_core, handoff_receive_timeout, ?RECV_TIMEOUT).
Loading

0 comments on commit ad546ed

Please sign in to comment.