Blob Blame History Raw
From be4b3a8fdbe180af9ccb341bd8717989740eba04 Mon Sep 17 00:00:00 2001
From: Bryan Fink <bryan@basho.com>
Date: Tue, 19 Feb 2013 08:56:34 -0500
Subject: [PATCH 8/8] migrate mapred_test to riak_test

This suite had a fragile setup stage that would regularly cause the
automated testers to fail. It has been migrated to the following
riak_test modules, where it should be much more predictable:

mapred_notfound_failover
mapred_javascript
mapred_dead_pipe
mapred_buffer_prereduce
mapred_basic_compat
---
 src/riak_kv_mrc_pipe.erl |   3 +
 test/mapred_test.erl     | 761 -----------------------------------------------
 2 files changed, 3 insertions(+), 761 deletions(-)
 delete mode 100644 test/mapred_test.erl

diff --git a/src/riak_kv_mrc_pipe.erl b/src/riak_kv_mrc_pipe.erl
index 15f29be..0be2f30 100644
--- a/src/riak_kv_mrc_pipe.erl
+++ b/src/riak_kv_mrc_pipe.erl
@@ -959,6 +959,9 @@ example_setup() ->
 
 %% @doc Store some example data for the other example functions.
 %%
+%% WARNING: This function is used by riak_test mapred_*
+%% tests. Changing what it creates will break those tests.
+%%
 %% Objects stored:
 %% <dl>
 %%   <dt>`foo/bar'</dt>
diff --git a/test/mapred_test.erl b/test/mapred_test.erl
deleted file mode 100644
index 27a4a7c..0000000
--- a/test/mapred_test.erl
+++ /dev/null
@@ -1,761 +0,0 @@
-%% -------------------------------------------------------------------
-%%
-%% Copyright (c) 2011 Basho Technologies, Inc.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-%%
-%%   http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% -------------------------------------------------------------------
-
--module(mapred_test).
-
--include_lib("eunit/include/eunit.hrl").
--include_lib("riak_pipe/include/riak_pipe.hrl").
--compile(export_all).
-
-setup() ->
-    riak_kv_test_util:common_setup(?MODULE, fun configure/1).
-
-cleanup() ->
-    riak_kv_test_util:common_cleanup(?MODULE, fun configure/1).
-
-configure(load) ->
-    KVSettings = [{storage_backend, riak_kv_memory_backend},
-                  {test, true},
-                  {vnode_vclocks, true},
-                  {pb_ip, "0.0.0.0"},
-                  {pb_port, 0}, % arbitrary #
-                  {map_js_vm_count, 4},
-                  {reduce_js_vm_count, 3}],
-    CoreSettings = [{handoff_ip, "0.0.0.0"},
-                     {handoff_port, 0},
-                     {ring_creation_size, 16}],
-    [ application:set_env(riak_core, K, V) || {K,V} <- CoreSettings ],
-    [ application:set_env(riak_kv, K, V) || {K,V} <- KVSettings ],
-    ok;
-
-configure(start) ->
-    riak_core:wait_for_service(riak_pipe);
-configure(_) ->
-    ok.
-
-inputs_gen_seq(Pipe, Max, _Timeout) ->
-    [riak_pipe:queue_work(Pipe, X) || X <- lists:seq(1, Max)],
-    riak_pipe:eoi(Pipe),
-    ok.
-
-inputs_gen_bkeys_1(Pipe, {Bucket, Start, End}, _Timeout) ->
-    BKeys = [{Bucket, list_to_binary("bar"++integer_to_list(X))} ||
-                 X <- lists:seq(Start, End)],
-    [riak_pipe:queue_work(Pipe, BK) || BK <- BKeys],
-    riak_pipe:eoi(Pipe),
-    ok.
-
-compat_basic1_test_() ->
-    IntsBucket = <<"foonum">>,
-    ReduceSumFun = fun(Inputs, _) -> [lists:sum(Inputs)] end,
-    LinkBucket = <<"link bucket">>,
-    LinkKey = <<"yo">>,
-
-    {setup,
-     setup(),
-     cleanup(),
-     fun(_) ->
-         [
-          ?_test(
-             %% The data created by this step is used by all/most of the
-             %% following tests.
-             begin
-                 ok = riak_kv_mrc_pipe:example_setup(),
-                 {ok, C} = riak:local_client(),
-                 Obj = riak_object:new(LinkBucket, LinkKey, <<"link val">>),
-                 MD = dict:store(<<"Links">>,
-                                 [{{LinkBucket, <<"nokey-1">>}, <<"link 1">>},
-                                  {{LinkBucket, <<"nokey-2">>}, <<"link 2">>}],
-                                 dict:new()),
-                 ok = C:put(riak_object:update_metadata(Obj, MD))
-             end
-            ),
-          ?_test(
-             %% Empty query
-             begin
-                 %% This will trigger a traversal of IntsBucket, but
-                 %% because the query is empty, the MapReduce will
-                 %% traverse the bucket and send BKeys down the pipe.
-                 {ok, BKeys} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, []),
-                 5 = length(BKeys),
-                 {IntsBucket, <<"bar1">>} = hd(lists:sort(BKeys))
-             end),
-          ?_test(
-             %% AZ 479: Reduce with zero inputs -> call reduce once w/empty list
-             begin
-                 Spec = [{reduce, {qfun, ReduceSumFun}, none, true}],
-                 {ok, [0]} = riak_kv_mrc_pipe:mapred([], Spec)
-             end),
-          ?_test(
-             %% Basic compatibility: keep both stages
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, true},
-                      {reduce, {qfun, ReduceSumFun},
-                       none, true}],
-                 {ok, [MapRs, [15]]} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, Spec),
-                 5 = length(MapRs)
-             end),
-          ?_test(
-             %% Basic compat: keep neither stages -> no output
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, false},
-                      {reduce, {qfun, ReduceSumFun},
-                       none, false}],
-                 %% "Crazy" semantics: if only 1 keeper stage, then
-                 %% return List instead of [List].
-                 {ok, []} = riak_kv_mrc_pipe:mapred(IntsBucket, Spec)
-             end),
-          ?_test(
-             %% Basic compat: keep first stage only, want 'crazy' result",
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, true},
-                      {reduce, {qfun, ReduceSumFun},
-                       none, false}],
-                 %% "Crazy" semantics: if only 1 keeper stage, then
-                 %% return List instead of [List].
-                 {ok, MapRs} = riak_kv_mrc_pipe:mapred(IntsBucket, Spec),
-                 5 = length(MapRs)
-             end),
-          ?_test(
-             %% Basic compat: keep second stage only, want 'crazy' result
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, false},
-                      {reduce, {qfun, ReduceSumFun},
-                       none, true}],
-                 %% "Crazy" semantics: if only 1 keeper stage, then
-                 %% return List instead of [List].
-                 {ok, [15]} = riak_kv_mrc_pipe:mapred(IntsBucket, Spec)
-             end),
-          ?_test(
-             %% Explicit rereduce
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, true}] ++
-                     lists:duplicate(
-                       5, {reduce, {qfun, ReduceSumFun}, none, true}),
-                 {ok, [_, [15],[15],[15],[15],[15]]} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, Spec)
-             end),
-          ?_test(
-             %% Make certain that {error, not_found} goes down the pipe
-             %% from a map phase.
-             begin
-                 Inputs = [{<<"no-such-bucket">>, <<"no-such-key!">>}],
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       {struct,[{<<"sub">>,[<<"0">>]}]}, false},
-                      {reduce, {modfun, riak_kv_mapreduce,
-                                reduce_string_to_integer},none,true}],
-                 {ok, [0]} =
-                     riak_kv_mrc_pipe:mapred(Inputs, Spec)
-             end),
-          ?_test(
-             %% Basic link phase
-             begin
-                 %% Inputs = [{LinkBucket, LinkKey}],
-                 Inputs = LinkBucket,
-                 Spec = [{link, '_', <<"link 1">>, true}],
-                 {ok, [ [LinkBucket, <<"nokey-1">>, <<"link 1">>] ]} =
-                     riak_kv_mrc_pipe:mapred(Inputs, Spec)
-             end),
-          ?_test(
-             %% Link phase + notfound
-             begin
-                 Inputs = [{<<"no">>, K} || K <- [<<"no1">>, <<"no2">>]],
-                 Spec = [{link, '_', '_', true}],
-                 {ok, []} =
-                     riak_kv_mrc_pipe:mapred(Inputs, Spec)
-             end),
-          ?_test(
-             %% KeyData
-             begin
-                 UnMap = fun(O, undefined, _) ->
-                                 [{riak_object:bucket(O),
-                                   riak_object:key(O)}];
-                            (O, KeyData, _) ->
-                                 [{{riak_object:bucket(O),
-                                    riak_object:key(O)},
-                                   KeyData}]
-                         end,
-                 Normalize = fun({{B,K},D}) -> {{B,K},D};
-                                ({B,K})     -> {B,K};
-                                ([B,K])     -> {B,K};
-                                ([B,K,D])   -> {{B,K},D}
-                             end,
-                 Spec =
-                     [{map, {qfun, UnMap}, none, true}],
-                 Inputs = [{IntsBucket, <<"bar1">>},
-                           {{IntsBucket, <<"bar2">>}, <<"keydata works">>},
-                           [IntsBucket, <<"bar3">>],
-                           [IntsBucket, <<"bar4">>, <<"keydata still works">>]],
-                 {ok, Results} =
-                     riak_kv_mrc_pipe:mapred(Inputs, Spec),
-                 SortedNormal = lists:sort([ Normalize(I) || I <- Inputs ]),
-                 ?assertEqual(SortedNormal, lists:sort(Results))
-             end),
-          ?_test(
-             %% Key Filters
-             begin
-                 %% filter sould match only "bar4" key
-                 Inputs = {IntsBucket, [[<<"ends_with">>, <<"r4">>]]},
-                 Spec = [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                          none, true}],
-                 {ok, [4]} = riak_kv_mrc_pipe:mapred(Inputs, Spec)
-             end),
-          ?_test(
-             %% modfun for inputs generator
-             begin
-                 Inputs = {modfun, ?MODULE, inputs_gen_seq, 6},
-                 Spec = [{reduce, {qfun, ReduceSumFun},none,true}],
-                 {ok, [21]} = riak_kv_mrc_pipe:mapred(Inputs, Spec)
-             end),
-          ?_test(
-             %% modfun for inputs generator: make BKeys for conventional phases
-             begin
-                 Inputs = {modfun, ?MODULE, inputs_gen_bkeys_1,
-                           {IntsBucket, 1, 5}},
-                 Spec = [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                          none, false},
-                         {reduce, {modfun, riak_kv_mapreduce,
-                                   reduce_string_to_integer},none,false},
-                         {reduce, {qfun, ReduceSumFun},none,true}],
-                 {ok, [15]} = riak_kv_mrc_pipe:mapred(Inputs, Spec)
-             end)
-          ]
-     end}.
-
-compat_buffer_and_prereduce_test_() ->
-    IntsBucket = <<"foonum">>,
-    NumInts = 1000,
-    ReduceSumFun = fun(Inputs, _) -> [lists:sum(Inputs)] end,
-
-    {setup,
-     setup(),
-     cleanup(),
-     fun(_) ->
-         [
-          ?_test(
-             %% The data created by this step is used by all/most of the
-             %% following tests.
-             ok = riak_kv_mrc_pipe:example_setup(NumInts)
-            ),
-          ?_test(
-             %% Verify that example_setup/1 did what it was supposed to.
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, true},
-                      {reduce, {qfun, ReduceSumFun},
-                       none, true}],
-                 {ok, [MapRs, [500500]]} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, Spec),
-                 NumInts = length(MapRs)
-             end),
-          ?_test(
-             %% Test the {reduce_phase_batch_size, int()} option
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, true},
-                      {reduce, {qfun, ReduceSumFun},
-                       [{reduce_phase_batch_size, 10}], true}],
-                 {ok, [MapRs, [500500]]} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, Spec),
-                 NumInts = length(MapRs)
-             end),
-          ?_test(
-             %% Test degenerate {reduce_phase_batch_size, 0} option
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, true},
-                      {reduce, {qfun, ReduceSumFun},
-                       [{reduce_phase_batch_size, 0}], true}],
-                 {ok, [MapRs, [500500]]} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, Spec),
-                 NumInts = length(MapRs)
-             end),
-          ?_test(
-             %% Test degenerate reduce_phase_only_1 option
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, true},
-                      {reduce, {qfun, ReduceSumFun},
-                       [reduce_phase_only_1], true}],
-                 {ok, [MapRs, [500500]]} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, Spec),
-                 NumInts = length(MapRs)
-             end),
-          ?_test(
-             %% Prereduce+reduce_phase_only_1 (combined happily!)
-             %% and then reduce batch size = 7.
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       [do_prereduce, reduce_phase_only_1], true},
-                      {reduce, {qfun, ReduceSumFun},
-                       [{reduce_phase_batch_size, 7}], true}],
-                 {ok, [MapRs, [500500]]} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, Spec),
-                 NumInts = length(MapRs)
-             end)
-         ]
-     end}.
-
-compat_javascript_test_() ->
-    IntsBucket = <<"foonum">>,
-    NumInts = 5,
-    JSBucket = <<"jsfuns">>,
-    NotFoundBkey = {<<"does not">>, <<"exit">>},
-
-    {setup,
-     setup(),
-     cleanup(),
-     fun(_) ->
-         [
-          ?_test(
-             %% The data created by this step is used by all/most of the
-             %% following tests.
-             ok = riak_kv_mrc_pipe:example_setup(NumInts)
-            ),
-          ?_test(
-             begin
-                 %% map & reduce with jsanon-Source
-                 Spec =
-                     [{map,
-                       {jsanon, <<"function(v) {
-                                      return [v.values[0].data];
-                                   }">>},
-                       <<>>, true},
-                      {reduce,
-                       {jsanon, <<"function(v) {
-                                      Sum = function(A, B) { return A+B; };
-                                      return [ v.reduce(Sum) ];
-                                   }">>},
-                       <<>>, true}],
-                 {ok, [MapRs, [15]]} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, Spec),
-                 5 = length(MapRs)
-             end),
-          ?_test(
-             begin
-                 %% map & reduce with jsanon-Bucket/Key
-                 {ok, C} = riak:local_client(),
-                 ok = C:put(riak_object:new(
-                              JSBucket, <<"map">>,
-                              <<"function(v) {
-                                    return [v.values[0].data];
-                                 }">>),
-                            1),
-                 ok = C:put(riak_object:new(
-                              JSBucket, <<"reduce">>,
-                              <<"function(v) {
-                                    Sum = function(A, B) { return A+B; };
-                                    return [ v.reduce(Sum) ];
-                                 }">>),
-                            1),
-                 Spec =
-                     [{map,
-                       {jsanon, {JSBucket, <<"map">>}},
-                       <<>>, true},
-                      {reduce,
-                       {jsanon, {JSBucket, <<"reduce">>}},
-                       <<>>, true}],
-                 {ok, [MapRs, [15]]} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, Spec),
-                 5 = length(MapRs)
-             end),
-          ?_test(
-             begin
-                 %% map & reduce with jsfun
-                 Spec =
-                     [{map,
-                       {jsfun, <<"Riak.mapValues">>},
-                       <<>>, true},
-                      {reduce,
-                       {jsfun, <<"Riak.reduceSum">>},
-                       <<>>, true}],
-                 {ok, [MapRs, [15]]} =
-                     riak_kv_mrc_pipe:mapred(IntsBucket, Spec),
-                 5 = length(MapRs)
-             end),
-          ?_test(
-             begin
-                 %% objects not found for JS map turn into
-                 %% {not_found, {Bucket, Key}, KeyData} tuples
-                 Spec =
-                     [{map, {jsfun, <<"Riak.mapValues">>}, <<>>, true},
-                      {reduce,
-                       {jsanon, <<"function(v) {
-                                      F = function(O) {
-                                             if ((O[\"not_found\"] &&
-                                                  O.not_found[\"bucket\"]) ||
-                                                 O[\"mapred_test_pass\"])
-                                                return {mapred_test_pass:1};
-                                             else
-                                                return O;
-                                          }
-                                      return v.map(F);
-                                   }">>},
-                       <<>>, true}],
-                 {ok, [[{not_found,
-                         NotFoundBkey,
-                         undefined}],
-                       [{struct,[{<<"mapred_test_pass">>,1}]}]]} =
-                     riak_kv_mrc_pipe:mapred([NotFoundBkey], Spec)
-             end),
-          ?_test(
-             %% KeyData
-             begin
-                 UnMap = <<"function(O, KD) {
-                               R = {b:O.bucket, k:O.key};
-                               if (KD != \"undefined\")
-                                  R.d = KD;
-                               return [R];
-                            }">>,
-                 Normalize = fun({{B,K},D}) -> {struct, [{<<"b">>, B},
-                                                         {<<"k">>, K},
-                                                         {<<"d">>, D}]};
-                                ({B,K})     -> {struct, [{<<"b">>, B},
-                                                         {<<"k">>, K}]};
-                                ([B,K])     -> {struct, [{<<"b">>, B},
-                                                         {<<"k">>, K}]};
-                                ([B,K,D])   -> {struct, [{<<"b">>, B},
-                                                         {<<"k">>, K},
-                                                         {<<"d">>, D}]}
-                             end,
-                 Spec =
-                     [{map, {jsanon, UnMap}, none, true}],
-                 Inputs = [{IntsBucket, <<"bar1">>},
-                           {{IntsBucket, <<"bar2">>}, <<"keydata works">>},
-                           [IntsBucket, <<"bar3">>],
-                           [IntsBucket, <<"bar4">>, <<"keydata still works">>]],
-                 {ok, Results} =
-                     riak_kv_mrc_pipe:mapred(Inputs, Spec),
-                 SortedNormal = lists:sort([ Normalize(I) || I <- Inputs ]),
-                 ?assertEqual(SortedNormal, lists:sort(Results))
-             end)
-          ]
-     end}.
-
-dead_pipe_test_() ->
-    {setup,
-     setup(),
-     cleanup(),
-     fun(_) ->
-         [
-          ?_test(
-             %% Verify that sending inputs to a pipe that has already
-             %% stopped raises an error (synchronous send)
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, true}],
-                 {{ok, Pipe}, _NumKeeps} =
-                     riak_kv_mrc_pipe:mapred_stream(Spec),
-                 riak_pipe:destroy(Pipe),
-                 {error, Reason} = riak_kv_mrc_pipe:send_inputs(
-                                     Pipe, [{<<"foo">>, <<"bar">>}]),
-                 %% Each vnode should have received the input, but
-                 %% being unable to find the fitting process, returned
-                 %% `worker_startup_failed` (and probably also printed
-                 %% "fitting was gone before startup")
-                 ?assert(lists:member(worker_startup_failed, Reason))
-             end),
-          ?_test(
-             %% Verify that sending inputs to a pipe that has already
-             %% stopped raises an error (async send)
-             begin
-                 Spec = 
-                     [{map, {modfun, riak_kv_mapreduce, map_object_value},
-                       none, true}],
-                 {{ok, Pipe}, _NumKeeps} =
-                     riak_kv_mrc_pipe:mapred_stream(Spec),
-                 riak_pipe:destroy(Pipe),
-                 %% this is a hack to make sure that the async sender
-                 %% doesn't die immediately upon linking to the
-                 %% already-dead builder
-                 PipeB = Pipe#pipe{builder=spawn(fake_builder(self()))},
-                 {Sender, SenderRef} =
-                     riak_kv_mrc_pipe:send_inputs_async(
-                       PipeB, [{<<"foo">>, <<"bar">>}]),
-                 receive
-                     {'DOWN', SenderRef, process, Sender, Error} ->
-                         {error, Reason} = Error
-                 end,
-                 %% let the fake builder shut down now
-                 PipeB#pipe.builder ! test_over,
-                 %% Each vnode should have received the input, but
-                 %% being unable to find the fitting process, returned
-                 %% `worker_startup_failed` (and probably also printed
-                 %% "fitting was gone before startup")
-                 ?assert(lists:member(worker_startup_failed, Reason))
-             end)
-         ]
-     end}.
-
-fake_builder(TestProc) ->
-    fun() ->
-            Ref = erlang:monitor(process, TestProc),
-            receive
-                test_over ->
-                    ok;
-                {'DOWN',Ref,process,TestProc,_} ->
-                    ok
-            end
-    end.
-
-notfound_failover_test_() ->
-    IntsBucket = <<"foonum">>,
-    NumInts = 5,
-
-    {setup,
-     setup(),
-     cleanup(),
-     fun(_) ->
-         [
-          ?_test(
-             %% The data created by this step is used by all/most of the
-             %% following tests.
-             ok = riak_kv_mrc_pipe:example_setup(NumInts)
-            ),
-          ?_test(
-             %% check the condition that used to bring down a pipe in
-             %% https://github.com/basho/riak_kv/issues/290
-             %% this version checks it with an actual not-found
-             begin
-                 QLimit = 3,
-                 WaitRef = make_ref(),
-                 Spec =
-                     [{map,
-                       {modfun, riak_kv_mapreduce, map_object_value},
-                       <<"include_keydata">>, false},
-                      {reduce,
-                       {modfun, ?MODULE, reduce_wait_for_signal},
-                       [{reduce_phase_batch_size, 1},
-                        {wait, {self(), WaitRef}}],
-                       true}],
-                 PipeSpec = riak_kv_mrc_pipe:mapred_plan(Spec),
-                 %% make it easier to fill
-                 SmallPipeSpec = [ S#fitting_spec{q_limit=QLimit}
-                                   || S <- PipeSpec ],
-                 {ok, Pipe} = riak_pipe:exec(SmallPipeSpec,
-                                             [{log, sink},
-                                              {trace, [error, queue_full]}]),
-                 ExistingKey = {IntsBucket, <<"bar1">>},
-                 ChashFun = (hd(SmallPipeSpec))#fitting_spec.chashfun,
-                 MissingKey = find_adjacent_key(ChashFun, ExistingKey),
-                 %% get main workers spun up
-                 ok = riak_pipe:queue_work(Pipe, ExistingKey),
-                 receive {waiting, WaitRef, ReducePid} -> ok end,
-
-                 %% reduce is now blocking, fill its queue
-                 [ ok = riak_pipe:queue_work(Pipe, ExistingKey)
-                   || _ <- lists:seq(1, QLimit) ],
-
-                 {NValMod,NValFun} = (hd(SmallPipeSpec))#fitting_spec.nval,
-                 NVal = NValMod:NValFun(ExistingKey),
-
-                 %% each of N paths through the primary preflist
-                 [ fill_map_queue(Pipe, QLimit, ExistingKey)
-                   || _ <- lists:seq(1, NVal) ],
-
-                 %% check get queue actually full
-                 ExpectedTOs = lists:duplicate(NVal, timeout),
-                 {error, ExpectedTOs} =
-                     riak_pipe:queue_work(Pipe, ExistingKey, noblock),
-
-                 %% now inject a missing key that would need to
-                 %% failover to the full queue
-                 ok = riak_pipe:queue_work(Pipe, {MissingKey, test_passing}),
-                 %% and watch for it to block in the reduce queue
-                 %% *this* is when pre-patched code would fail:
-                 %% we'll receive an [error] trace from the kvget fitting's
-                 %% failure to forward the bkey along its preflist
-                 ok = consume_queue_full(Pipe, 1),
-
-                 %% let the pipe finish
-                 riak_pipe:eoi(Pipe),
-                 ReducePid ! {continue, WaitRef},
-
-                 {eoi, Results, Logs} = riak_pipe:collect_results(Pipe),
-                 %% the object does not exist, but we told the map
-                 %% phase to send on its keydata - check for it
-                 ?assert(lists:member({1, test_passing}, Results)),
-                 %% just to be a little extra cautious, check for
-                 %% other errors
-                 ?assertEqual([], [E || {_,{trace,[error],_}}=E <- Logs])
-             end),
-          ?_test(
-             %% check the condition that used to bring down a pipe in
-             %% https://github.com/basho/riak_kv/issues/290
-             %% this version checks with an object that is missing a replica
-             begin
-                 QLimit = 3,
-                 WaitRef = make_ref(),
-                 Spec =
-                     [{map,
-                       {modfun, riak_kv_mapreduce, map_object_value},
-                       none, false},
-                      {reduce,
-                       {modfun, ?MODULE, reduce_wait_for_signal},
-                       [{reduce_phase_batch_size, 1},
-                        {wait, {self(), WaitRef}}],
-                       true}],
-                 PipeSpec = riak_kv_mrc_pipe:mapred_plan(Spec),
-                 %% make it easier to fill
-                 SmallPipeSpec = [ S#fitting_spec{q_limit=QLimit}
-                                   || S <- PipeSpec ],
-                 {ok, Pipe} = riak_pipe:exec(SmallPipeSpec,
-                                             [{log, sink},
-                                              {trace, [error, queue_full]}]),
-                 ExistingKey = {IntsBucket, <<"bar1">>},
-                 ChashFun = (hd(SmallPipeSpec))#fitting_spec.chashfun,
-                 {MissingBucket, MissingKey} =
-                     find_adjacent_key(ChashFun, ExistingKey),
-
-                 %% create a value for the "missing" key
-                 {ok, C} = riak:local_client(),
-                 ok = C:put(riak_object:new(MissingBucket, MissingKey,
-                                            test_passing),
-                            3),
-                 %% and now kill the first replica;
-                 %% this will make the vnode local to the kvget pipe
-                 %% fitting return an error (because it's the memory
-                 %% backend), so it will have to look at another kv vnode
-                 [{{PrimaryIndex, _},_}] =
-                     riak_core_apl:get_primary_apl(
-                       ChashFun({MissingBucket, MissingKey}), 1, riak_kv),
-                 {ok, VnodePid} = riak_core_vnode_manager:get_vnode_pid(
-                                    PrimaryIndex, riak_kv_vnode),
-                 exit(VnodePid, kill),
-                 
-                 %% get main workers spun up
-                 ok = riak_pipe:queue_work(Pipe, ExistingKey),
-                 receive {waiting, WaitRef, ReducePid} -> ok end,
-
-                 %% reduce is now blocking, fill its queue
-                 [ ok = riak_pipe:queue_work(Pipe, ExistingKey)
-                   || _ <- lists:seq(1, QLimit) ],
-
-                 {NValMod,NValFun} = (hd(SmallPipeSpec))#fitting_spec.nval,
-                 NVal = NValMod:NValFun(ExistingKey),
-
-                 %% each of N paths through the primary preflist
-                 [ fill_map_queue(Pipe, QLimit, ExistingKey)
-                   || _ <- lists:seq(1, NVal) ],
-
-                 %% check get queue actually full
-                 ExpectedTOs = lists:duplicate(NVal, timeout),
-                 {error, ExpectedTOs} =
-                     riak_pipe:queue_work(Pipe, ExistingKey, noblock),
-
-                 %% now inject a missing key that would need to
-                 %% failover to the full queue
-                 ok = riak_pipe:queue_work(Pipe, {MissingBucket, MissingKey}),
-                 %% and watch for it to block in the reduce queue
-                 %% *this* is when pre-patched code would fail:
-                 %% we'll receive an [error] trace from the kvget fitting's
-                 %% failure to forward the bkey along its preflist
-                 ok = consume_queue_full(Pipe, 1),
-
-                 %% let the pipe finish
-                 riak_pipe:eoi(Pipe),
-                 ReducePid ! {continue, WaitRef},
-
-                 {eoi, Results, Logs} = riak_pipe:collect_results(Pipe),
-                 %% the object does not exist, but we told the map
-                 %% phase to send on its keydata - check for it
-                 ?assert(lists:member({1, test_passing}, Results)),
-                 %% just to be a little extra cautious, check for
-                 %% other errors
-                 ?assertEqual([], [E || {_,{trace,[error],_}}=E <- Logs])
-             end)
-         ]
-     end}.
-
-fill_map_queue(Pipe, QLimit, ExistingKey) ->
-    %% give the map worker one more to block on
-    ok = riak_pipe:queue_work(Pipe, ExistingKey, noblock),
-    consume_queue_full(Pipe, 1),
-    %% map is now blocking, fill its queue
-    [ ok = riak_pipe:queue_work(Pipe, ExistingKey, noblock)
-      || _ <- lists:seq(1, QLimit) ],
-    %% give the get worker one more to block on
-    ok = riak_pipe:queue_work(Pipe, ExistingKey, noblock),
-    consume_queue_full(Pipe, {xform_map, 0}),
-    %% get is now blocking, fill its queue
-    [ ok = riak_pipe:queue_work(Pipe, ExistingKey, noblock)
-      || _ <- lists:seq(1, QLimit) ],
-    ok.
-
-find_adjacent_key({Mod, Fun}, ExistingKey) ->
-    [ExistingHead|_] = riak_core_apl:get_primary_apl(
-                         Mod:Fun(ExistingKey), 2, riak_kv),
-    [K|_] = lists:dropwhile(
-              fun(N) ->
-                      K = {<<"foonum_missing">>,
-                           list_to_binary(integer_to_list(N))},
-                      [_,Second] = riak_core_apl:get_primary_apl(
-                                     Mod:Fun(K), 2, riak_kv),
-                      Second /= ExistingHead
-              end,
-              lists:seq(1, 1000)),
-    {<<"foonum_missing">>, list_to_binary(integer_to_list(K))}.
-
-consume_queue_full(Pipe, FittingName) ->
-    {log, {FittingName, {trace, [queue_full], _}}} =
-        riak_pipe:receive_result(Pipe, 5000),
-    ok.
-
-reduce_wait_for_signal(Inputs, Args) ->
-    case get(waited) of
-        true ->
-            Inputs;
-        _ ->
-            {TestProc, WaitRef} = proplists:get_value(wait, Args),
-            TestProc ! {waiting, WaitRef, self()},
-            receive {continue, WaitRef} -> ok end,
-            put(waited, true),
-            Inputs
-    end.
-
-wait_until_dead(Pid) when is_pid(Pid) ->
-    Ref = monitor(process, Pid),
-    receive
-        {'DOWN', Ref, process, _Obj, Info} ->
-            Info
-    after 10*1000 ->
-            exit({timeout_waiting_for, Pid})
-    end;
-wait_until_dead(_) ->
-    ok.
-- 
1.8.1.4