Blob Blame History Raw
From: Simon MacMullen <simon@rabbitmq.com>
Date: Wed, 17 Sep 2014 15:03:24 +0100
Subject: [PATCH] rabbit_mirror_queue_coordinator: Ensure GM exited before
 terminating

If the coordinator exits before the GM informed all other GMs (and
therefore all slaves) about the termination of the queue, another slave
could be promoted as a master in between. This causes the old master's
GM to wait forever than other GMs exit.

Fixes #465.

Also includes this patch as well -
d174409fc0946668f4a8edad9ff342cf09b6df21

Rename GM:terminate/2 to GM:handle_terminate/2 so that it doesn't conflict with the same callback in gen_server.

diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
index 245c23b..d8686cd 100644
--- a/include/gm_specs.hrl
+++ b/include/gm_specs.hrl
@@ -20,9 +20,9 @@
 -type(args() :: any()).
 -type(members() :: [pid()]).
 
--spec(joined/2          :: (args(), members())    -> callback_result()).
--spec(members_changed/3 :: (args(), members(), members()) -> callback_result()).
--spec(handle_msg/3      :: (args(), pid(), any()) -> callback_result()).
--spec(terminate/2       :: (args(), term())       -> any()).
+-spec(joined/2           :: (args(), members())    -> callback_result()).
+-spec(members_changed/3  :: (args(), members(),members()) -> callback_result()).
+-spec(handle_msg/3       :: (args(), pid(), any()) -> callback_result()).
+-spec(handle_terminate/2 :: (args(), term())       -> any()).
 
 -endif.
diff --git a/src/gm.erl b/src/gm.erl
index ff90748..d9ce51b 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -62,7 +62,7 @@
 %%
 %% leave/1
 %% Provide the Pid. Removes the Pid from the group. The callback
-%% terminate/2 function will be called.
+%% handle_terminate/2 function will be called.
 %%
 %% broadcast/2
 %% Provide the Pid and a Message. The message will be sent to all
@@ -491,13 +491,13 @@
 
 %% Called on gm member termination as per rules in gen_server, with
 %% the Args provided in start_link plus the termination Reason.
--callback terminate(Args :: term(), Reason :: term()) ->
+-callback handle_terminate(Args :: term(), Reason :: term()) ->
     ok | term().
 
 -else.
 
 behaviour_info(callbacks) ->
-    [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}];
+    [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {handle_terminate, 2}];
 behaviour_info(_Other) ->
     undefined.
 
@@ -733,7 +733,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
 
 terminate(Reason, State = #state { module        = Module,
                                    callback_args = Args }) ->
-    Module:terminate(Args, Reason).
+    flush_broadcast_buffer(State),
+    Module:handle_terminate(Args, Reason).
 
 
 code_change(_OldVsn, State, _Extra) ->
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index c9a2552..90b94e4 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -17,7 +17,7 @@
 -module(gm_soak_test).
 
 -export([test/0]).
--export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]).
 
 -behaviour(gm).
 
@@ -94,7 +94,7 @@ handle_msg([], From, {test_msg, Num}) ->
       end),
     ok.
 
-terminate([], Reason) ->
+handle_terminate([], Reason) ->
     io:format("Left ~p (~p)~n", [self(), Reason]),
     ok.
 
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index 41be6dd..f2aedff 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -17,7 +17,7 @@
 -module(gm_speed_test).
 
 -export([test/3]).
--export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]).
 -export([wile_e_coyote/2]).
 
 -behaviour(gm).
@@ -37,7 +37,7 @@ handle_msg(Owner, _From, ping) ->
     Owner ! ping,
     ok.
 
-terminate(Owner, _Reason) ->
+handle_terminate(Owner, _Reason) ->
     Owner ! terminated,
     ok.
 
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index cae2164..db552ce 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -22,7 +22,7 @@
          test_member_death/0,
          test_receive_in_order/0,
          all_tests/0]).
--export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]).
 
 -behaviour(gm).
 
@@ -48,7 +48,7 @@ handle_msg(Pid, From, Msg) ->
     Pid ! {msg, self(), From, Msg},
     ok.
 
-terminate(Pid, Reason) ->
+handle_terminate(Pid, Reason) ->
     Pid ! {termination, self(), Reason},
     ok.
 
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 23718da..0ccafee 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -21,7 +21,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
 
--export([joined/2, members_changed/3, handle_msg/3]).
+-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]).
 
 -behaviour(gen_server2).
 -behaviour(gm).
@@ -368,6 +368,8 @@ handle_cast(request_depth, State = #state { depth_fun = DepthFun }) ->
 handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
     noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
 
+handle_cast({delete_and_terminate, {shutdown, ring_shutdown}}, State) ->
+    {stop, normal, State};
 handle_cast({delete_and_terminate, Reason}, State) ->
     {stop, Reason, State}.
 
@@ -384,10 +386,6 @@ handle_info(Msg, State) ->
     {stop, {unexpected_info, Msg}, State}.
 
 terminate(_Reason, #state{}) ->
-    %% gen_server case
-    ok;
-terminate([_CPid], _Reason) ->
-    %% gm case
     ok.
 
 code_change(_OldVsn, State, _Extra) ->
@@ -410,12 +408,24 @@ handle_msg([CPid], _From, request_depth = Msg) ->
     ok = gen_server2:cast(CPid, Msg);
 handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
     ok = gen_server2:cast(CPid, Msg);
-handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
-    ok = gen_server2:cast(CPid, Msg),
+handle_msg([_CPid], _From, {delete_and_terminate, _Reason}) ->
+    %% We tell GM to stop, but we don't instruct the coordinator to
+    %% stop yet. The GM will first make sure all pending messages were
+    %% actually delivered. Then it calls handle_terminate/2 below so the
+    %% coordinator is stopped.
+    %%
+    %% If we stop the coordinator right now, remote slaves could see the
+    %% coordinator DOWN before delete_and_terminate was delivered to all
+    %% GMs. One of those GM would be promoted as the master, and this GM
+    %% would hang forever, waiting for other GMs to stop.
     {stop, {shutdown, ring_shutdown}};
 handle_msg([_CPid], _From, _Msg) ->
     ok.
 
+handle_terminate([CPid], Reason) ->
+    ok = gen_server2:cast(CPid, {delete_and_terminate, Reason}),
+    ok.
+
 %% ---------------------------------------------------------------------------
 %% Others
 %% ---------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 11d6a79..853d2fe 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -30,7 +30,7 @@
          code_change/3, handle_pre_hibernate/1, prioritise_call/4,
          prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
 
--export([joined/2, members_changed/3, handle_msg/3]).
+-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]).
 
 -behaviour(gen_server2).
 -behaviour(gm).
@@ -339,10 +339,7 @@ terminate({shutdown, _} = R, State) ->
 terminate(Reason, State = #state{backing_queue       = BQ,
                                  backing_queue_state = BQS}) ->
     terminate_common(State),
-    BQ:delete_and_terminate(Reason, BQS);
-terminate([_SPid], _Reason) ->
-    %% gm case
-    ok.
+    BQ:delete_and_terminate(Reason, BQS).
 
 %% If the Reason is shutdown, or {shutdown, _}, it is not the queue
 %% being deleted: it's just the node going down. Even though we're a
@@ -439,6 +436,9 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
 handle_msg([SPid], _From, Msg) ->
     ok = gen_server2:cast(SPid, {gm, Msg}).
 
+handle_terminate([_SPid], _Reason) ->
+    ok.
+
 %% ---------------------------------------------------------------------------
 %% Others
 %% ---------------------------------------------------------------------------