Blob Blame History Raw
From f88f7d1a06a41635d5308e59077b135d9f7353c2 Mon Sep 17 00:00:00 2001
From: Simon MacMullen <simon@rabbitmq.com>
Date: Thu, 21 Aug 2014 17:51:07 +0100
Subject: [PATCH] Make Mnesia tx worker pool jobs use a disposable process so
 that if mnesia_locker decides to randomly send a message there later it will
 just get dropped and not cause chaos.


diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index feed680..ac56d65 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -506,7 +506,7 @@ execute_mnesia_transaction(TxFun) ->
                                 end;
                        true  -> mnesia:sync_transaction(TxFun)
                    end
-           end) of
+           end, true) of
         {sync, {atomic,  Result}} -> mnesia_sync:sync(), Result;
         {sync, {aborted, Reason}} -> throw({error, Reason});
         {atomic,  Result}         -> Result;
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index b1dba5a..2646258 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -28,7 +28,8 @@
 
 -behaviour(gen_server2).
 
--export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]).
+-export([start_link/0, submit/1, submit/2, submit_async/1, ready/1,
+         idle/1]).
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
@@ -41,6 +42,7 @@
 
 -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
 -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
+-spec(submit/2 :: (fun (() -> A) | mfargs(), boolean()) -> A).
 -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
 -spec(ready/1 :: (pid()) -> 'ok').
 -spec(idle/1 :: (pid()) -> 'ok').
@@ -61,10 +63,14 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
                                        [{timeout, infinity}]).
 
 submit(Fun) ->
+    submit(Fun, false).
+
+%% OneOffProcess =:= true is for working around the mnesia_locker bug.
+submit(Fun, OneOffProcess) ->
     case get(worker_pool_worker) of
         true -> worker_pool_worker:run(Fun);
         _    -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity),
-                worker_pool_worker:submit(Pid, Fun)
+                worker_pool_worker:submit(Pid, Fun, OneOffProcess)
     end.
 
 submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}).
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index beb95bc..14b2df7 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -18,7 +18,7 @@
 
 -behaviour(gen_server2).
 
--export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]).
+-export([start_link/0, next_job_from/2, submit/3, submit_async/2, run/1]).
 
 -export([set_maximum_since_use/2]).
 
@@ -33,7 +33,7 @@
 
 -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
 -spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
--spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
+-spec(submit/3 :: (pid(), fun (() -> A) | mfargs(), boolean()) -> A).
 -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
 -spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
 -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -53,8 +53,8 @@ start_link() ->
 next_job_from(Pid, CPid) ->
     gen_server2:cast(Pid, {next_job_from, CPid}).
 
-submit(Pid, Fun) ->
-    gen_server2:call(Pid, {submit, Fun, self()}, infinity).
+submit(Pid, Fun, OneOffProcess) ->
+    gen_server2:call(Pid, {submit, Fun, self(), OneOffProcess}, infinity).
 
 submit_async(Pid, Fun) ->
     gen_server2:cast(Pid, {submit_async, Fun}).
@@ -62,10 +62,22 @@ submit_async(Pid, Fun) ->
 set_maximum_since_use(Pid, Age) ->
     gen_server2:cast(Pid, {set_maximum_since_use, Age}).
 
-run({M, F, A}) ->
-    apply(M, F, A);
-run(Fun) ->
-    Fun().
+run({M, F, A}) -> apply(M, F, A);
+run(Fun)       -> Fun().
+
+run(Fun, false) ->
+    run(Fun);
+run(Fun, true) ->
+    Self = self(),
+    Ref = make_ref(),
+    spawn_link(fun () ->
+                       put(worker_pool_worker, true),
+                       Self ! {Ref, run(Fun)},
+                       unlink(Self)
+               end),
+    receive
+        {Ref, Res} -> Res
+    end.
 
 %%----------------------------------------------------------------------------
 
@@ -81,12 +93,12 @@ prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
 prioritise_cast({next_job_from, _CPid},        _Len, _State) -> 7;
 prioritise_cast(_Msg,                          _Len, _State) -> 0.
 
-handle_call({submit, Fun, CPid}, From, undefined) ->
-    {noreply, {job, CPid, From, Fun}, hibernate};
+handle_call({submit, Fun, CPid, OneOffProcess}, From, undefined) ->
+    {noreply, {job, CPid, From, Fun, OneOffProcess}, hibernate};
 
-handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) ->
+handle_call({submit, Fun, CPid, OneOffProcess}, From, {from, CPid, MRef}) ->
     erlang:demonitor(MRef),
-    gen_server2:reply(From, run(Fun)),
+    gen_server2:reply(From, run(Fun, OneOffProcess)),
     ok = worker_pool:idle(self()),
     {noreply, undefined, hibernate};
 
@@ -97,8 +109,8 @@ handle_cast({next_job_from, CPid}, undefined) ->
     MRef = erlang:monitor(process, CPid),
     {noreply, {from, CPid, MRef}, hibernate};
 
-handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) ->
-    gen_server2:reply(From, run(Fun)),
+handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, OneOffProcess}) ->
+    gen_server2:reply(From, run(Fun, OneOffProcess)),
     ok = worker_pool:idle(self()),
     {noreply, undefined, hibernate};