Blob Blame History Raw
From ba97fb792c248320edd6353593feaff91b1155ef Mon Sep 17 00:00:00 2001
From: Charles Kerr <charles@charleskerr.com>
Date: Tue, 14 Feb 2023 10:30:54 -0600
Subject: [PATCH 1/3] fix: processing of block fragments that arrive
 out-of-order from request

Fixes 4.0.0 regression on torrents whose pieces did not align on block boundaries.
---
 libtransmission/cache.cc     |  2 +-
 libtransmission/cache.h      |  2 +-
 libtransmission/peer-msgs.cc | 96 ++++++++++++++++++------------------
 libtransmission/torrent.cc   | 11 ++++-
 libtransmission/webseed.cc   |  2 +-
 5 files changed, 60 insertions(+), 53 deletions(-)

diff --git a/libtransmission/cache.cc b/libtransmission/cache.cc
index ef446b244c..f7ae03dcc1 100644
--- a/libtransmission/cache.cc
+++ b/libtransmission/cache.cc
@@ -140,7 +140,7 @@ Cache::Cache(tr_torrents& torrents, int64_t max_bytes)
 
 // ---
 
-int Cache::writeBlock(tr_torrent_id_t tor_id, tr_block_index_t block, std::unique_ptr<std::vector<uint8_t>>& writeme)
+int Cache::writeBlock(tr_torrent_id_t tor_id, tr_block_index_t block, std::unique_ptr<std::vector<uint8_t>> writeme)
 {
     auto const key = Key{ tor_id, block };
     auto iter = std::lower_bound(std::begin(blocks_), std::end(blocks_), key, CompareCacheBlockByKey{});
diff --git a/libtransmission/cache.h b/libtransmission/cache.h
index e25deb2328..9d6455610b 100644
--- a/libtransmission/cache.h
+++ b/libtransmission/cache.h
@@ -36,7 +36,7 @@ class Cache
     }
 
     // @return any error code from cacheTrim()
-    int writeBlock(tr_torrent_id_t tor, tr_block_index_t block, std::unique_ptr<std::vector<uint8_t>>& writeme);
+    int writeBlock(tr_torrent_id_t tor, tr_block_index_t block, std::unique_ptr<std::vector<uint8_t>> writeme);
 
     int readBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t len, uint8_t* setme);
     int prefetchBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t len);
diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc
index 837dbb4b76..05864fea20 100644
--- a/libtransmission/peer-msgs.cc
+++ b/libtransmission/peer-msgs.cc
@@ -188,7 +188,20 @@ struct tr_incoming
     uint8_t id = 0; // the protocol message, e.g. BtPeerMsgs::Piece
     uint32_t length = 0; // the full message payload length. Includes the +1 for id length
     std::optional<peer_request> block_req; // metadata for incoming blocks
-    std::map<tr_block_index_t, std::unique_ptr<std::vector<uint8_t>>> block_buf; // piece data for incoming blocks
+
+    struct incoming_piece_data
+    {
+        explicit incoming_piece_data(uint32_t block_size)
+            : buf{ std::make_unique<std::vector<uint8_t>>(block_size) }
+        {
+        }
+
+        uint32_t n_bytes_received = 0;
+
+        std::unique_ptr<std::vector<uint8_t>> buf;
+    };
+
+    std::map<tr_block_index_t, incoming_piece_data> blocks;
 };
 
 class tr_peerMsgsImpl;
@@ -1397,7 +1410,7 @@ bool messageLengthIsCorrect(tr_peerMsgsImpl const* msg, uint8_t id, uint32_t len
     }
 }
 
-int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>& block_data, tr_block_index_t block);
+int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>> block_data, tr_block_index_t block);
 
 ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_bytes_read)
 {
@@ -1406,7 +1419,8 @@ ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_b
     logtrace(msgs, "In readBtPiece");
 
     // If this is the first we've seen of the piece data, parse out the header
-    if (!msgs->incoming.block_req)
+    auto& incoming = msgs->incoming;
+    if (!incoming.block_req)
     {
         if (inlen < 8)
         {
@@ -1416,67 +1430,55 @@ ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_b
         auto req = peer_request{};
         msgs->io->read_uint32(&req.index);
         msgs->io->read_uint32(&req.offset);
-        req.length = msgs->incoming.length - 9;
+        req.length = incoming.length - 9;
         logtrace(msgs, fmt::format(FMT_STRING("got incoming block header {:d}:{:d}->{:d}"), req.index, req.offset, req.length));
-        msgs->incoming.block_req = req;
+        incoming.block_req = req;
         return READ_NOW;
     }
 
-    auto& req = msgs->incoming.block_req;
+    auto& req = incoming.block_req;
     auto const loc = msgs->torrent->pieceLoc(req->index, req->offset);
     auto const block = loc.block;
     auto const block_size = msgs->torrent->blockSize(block);
-    auto& block_buf = msgs->incoming.block_buf[block];
-    if (!block_buf)
-    {
-        block_buf = std::make_unique<std::vector<uint8_t>>();
-        block_buf->reserve(block_size);
-    }
 
-    // read in another chunk of data
-    auto const n_left_in_block = block_size - std::size(*block_buf);
-    auto const n_left_in_req = size_t{ req->length };
-    auto const n_to_read = std::min({ n_left_in_block, n_left_in_req, inlen });
-    auto const old_length = std::size(*block_buf);
-    block_buf->resize(old_length + n_to_read);
-    msgs->io->read_bytes(&((*block_buf)[old_length]), n_to_read);
+    auto const n_this_pass = std::min(size_t{ req->length }, inlen);
+    TR_ASSERT(loc.block_offset + n_this_pass <= block_size);
 
-    msgs->publish(tr_peer_event::GotPieceData(n_to_read));
-    *setme_piece_bytes_read += n_to_read;
-    logtrace(
-        msgs,
-        fmt::format(
-            FMT_STRING("got {:d} bytes for block {:d}:{:d}->{:d} ... {:d} remain in req, {:d} remain in block"),
-            n_to_read,
-            req->index,
-            req->offset,
-            req->length,
-            req->length,
-            block_size - std::size(*block_buf)));
-
-    // if we didn't read enough to finish off the request,
-    // update the table and wait for more
-    if (n_to_read < n_left_in_req)
-    {
-        auto new_loc = msgs->torrent->byteLoc(loc.byte + n_to_read);
+    auto& incoming_block = incoming.blocks.try_emplace(block, block_size).first->second;
+    msgs->io->read_bytes(std::data(*incoming_block.buf) + loc.block_offset, n_this_pass);
+
+    msgs->publish(tr_peer_event::GotPieceData(n_this_pass));
+    *setme_piece_bytes_read += n_this_pass;
+    incoming_block.n_bytes_received += n_this_pass;
+    TR_ASSERT(incoming_block.n_bytes_received <= block_size);
+    logtrace(msgs, fmt::format("got {:d} bytes for req {:d}:{:d}->{:d}", n_this_pass, req->index, req->offset, req->length));
+
+    // if we haven't gotten the full response yet,
+    // update what part of `req` is unfulfilled and wait for more
+    if (n_this_pass < req->length)
+    {
+        req->length -= n_this_pass;
+        auto const new_loc = msgs->torrent->byteLoc(loc.byte + n_this_pass);
         req->index = new_loc.piece;
         req->offset = new_loc.piece_offset;
-        req->length -= n_to_read;
         return READ_LATER;
     }
 
-    // we've fully read this message
+    // we've got the entire response message
     req.reset();
     msgs->state = AwaitingBt::Length;
 
-    // if we didn't read enough to finish off the block,
-    // update the table and wait for more
-    if (std::size(*block_buf) < block_size)
+    // if we haven't gotten the entire block yet, wait for more
+    if (incoming_block.n_bytes_received < block_size)
     {
         return READ_LATER;
     }
 
-    return clientGotBlock(msgs, block_buf, block) != 0 ? READ_ERR : READ_NOW;
+    // we've got the entire block, so send it along.
+    auto block_buf = std::move(incoming_block.buf);
+    incoming.blocks.erase(block); // note: invalidates `incoming_block` local
+    auto const ok = clientGotBlock(msgs, std::move(block_buf), block) == 0;
+    return ok ? READ_NOW : READ_ERR;
 }
 
 ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen)
@@ -1744,7 +1746,7 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen)
 }
 
 /* returns 0 on success, or an errno on failure */
-int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>& block_data, tr_block_index_t const block)
+int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>> block_data, tr_block_index_t const block)
 {
     TR_ASSERT(msgs != nullptr);
 
@@ -1760,7 +1762,6 @@ int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>&
     if (std::size(*block_data) != msgs->torrent->blockSize(block))
     {
         logdbg(msgs, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, std::size(*block_data)));
-        block_data->clear();
         return EMSGSIZE;
     }
 
@@ -1769,7 +1770,6 @@ int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>&
     if (!tr_peerMgrDidPeerRequest(msgs->torrent, msgs, block))
     {
         logdbg(msgs, "we didn't ask for this message...");
-        block_data->clear();
         return 0;
     }
 
@@ -1777,19 +1777,17 @@ int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>&
     if (msgs->torrent->hasPiece(loc.piece))
     {
         logtrace(msgs, "we did ask for this message, but the piece is already complete...");
-        block_data->clear();
         return 0;
     }
 
     // NB: if writeBlock() fails the torrent may be paused.
     // If this happens, `msgs` will be a dangling pointer and must no longer be used.
-    if (auto const err = msgs->session->cache->writeBlock(tor->id(), block, block_data); err != 0)
+    if (auto const err = msgs->session->cache->writeBlock(tor->id(), block, std::move(block_data)); err != 0)
     {
         return err;
     }
 
     msgs->blame.set(loc.piece);
-    msgs->incoming.block_buf.erase(block);
     msgs->publish(tr_peer_event::GotBlock(tor->blockInfo(), block));
 
     return 0;
diff --git a/libtransmission/torrent.cc b/libtransmission/torrent.cc
index 4b5a0b1dea..cd96bac2c3 100644
--- a/libtransmission/torrent.cc
+++ b/libtransmission/torrent.cc
@@ -2400,8 +2400,17 @@ void tr_torrentGotBlock(tr_torrent* tor, tr_block_index_t block)
     tor->setDirty();
 
     tor->completion.addBlock(block);
-    if (auto const piece = tor->blockLoc(block).piece; tor->hasPiece(piece))
+
+    auto const block_loc = tor->blockLoc(block);
+    auto const first_piece = block_loc.piece;
+    auto const last_piece = tor->byteLoc(block_loc.byte + tor->blockSize(block) - 1).piece;
+    for (auto piece = first_piece; piece <= last_piece; ++piece)
     {
+        if (!tor->hasPiece(piece))
+        {
+            continue;
+        }
+
         if (tor->checkPiece(piece))
         {
             onPieceCompleted(tor, piece);
diff --git a/libtransmission/webseed.cc b/libtransmission/webseed.cc
index 7f07dd149e..8136139d87 100644
--- a/libtransmission/webseed.cc
+++ b/libtransmission/webseed.cc
@@ -352,7 +352,7 @@ struct write_block_data
     {
         if (auto const* const tor = tr_torrentFindFromId(session_, tor_id_); tor != nullptr)
         {
-            session_->cache->writeBlock(tor_id_, block_, data_);
+            session_->cache->writeBlock(tor_id_, block_, std::move(data_));
             webseed_->publish(tr_peer_event::GotBlock(tor->blockInfo(), block_));
         }
 

From 8a3418aae23e37d446a86b11a34679db0bfa99f7 Mon Sep 17 00:00:00 2001
From: Charles Kerr <charles@charleskerr.com>
Date: Tue, 14 Feb 2023 10:59:16 -0600
Subject: [PATCH 2/3] fixup! fix: processing of block fragments that arrive
 out-of-order from request

refactor: be more paranoid about duplicate requests at endgame.
---
 libtransmission/peer-msgs.cc | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc
index 05864fea20..24e3359d65 100644
--- a/libtransmission/peer-msgs.cc
+++ b/libtransmission/peer-msgs.cc
@@ -22,6 +22,7 @@
 
 #include "transmission.h"
 
+#include "bitfield.h"
 #include "cache.h"
 #include "completion.h"
 #include "crypto-utils.h"
@@ -193,12 +194,12 @@ struct tr_incoming
     {
         explicit incoming_piece_data(uint32_t block_size)
             : buf{ std::make_unique<std::vector<uint8_t>>(block_size) }
+            , have{ block_size }
         {
         }
 
-        uint32_t n_bytes_received = 0;
-
         std::unique_ptr<std::vector<uint8_t>> buf;
+        tr_bitfield have;
     };
 
     std::map<tr_block_index_t, incoming_piece_data> blocks;
@@ -1443,19 +1444,22 @@ ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_b
 
     auto const n_this_pass = std::min(size_t{ req->length }, inlen);
     TR_ASSERT(loc.block_offset + n_this_pass <= block_size);
+    if (n_this_pass == 0)
+    {
+        return READ_LATER;
+    }
 
     auto& incoming_block = incoming.blocks.try_emplace(block, block_size).first->second;
     msgs->io->read_bytes(std::data(*incoming_block.buf) + loc.block_offset, n_this_pass);
 
     msgs->publish(tr_peer_event::GotPieceData(n_this_pass));
     *setme_piece_bytes_read += n_this_pass;
-    incoming_block.n_bytes_received += n_this_pass;
-    TR_ASSERT(incoming_block.n_bytes_received <= block_size);
+    incoming_block.have.setSpan(loc.block_offset, loc.block_offset + n_this_pass);
     logtrace(msgs, fmt::format("got {:d} bytes for req {:d}:{:d}->{:d}", n_this_pass, req->index, req->offset, req->length));
 
     // if we haven't gotten the full response yet,
     // update what part of `req` is unfulfilled and wait for more
-    if (n_this_pass < req->length)
+    if (req->length > n_this_pass)
     {
         req->length -= n_this_pass;
         auto const new_loc = msgs->torrent->byteLoc(loc.byte + n_this_pass);
@@ -1469,7 +1473,7 @@ ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_b
     msgs->state = AwaitingBt::Length;
 
     // if we haven't gotten the entire block yet, wait for more
-    if (incoming_block.n_bytes_received < block_size)
+    if (!incoming_block.have.hasAll())
     {
         return READ_LATER;
     }

From d75445758c3e7dc666aba781111275d62acbf3b8 Mon Sep 17 00:00:00 2001
From: Charles Kerr <charles@charleskerr.com>
Date: Tue, 14 Feb 2023 11:57:35 -0600
Subject: [PATCH 3/3] fixup! fix: processing of block fragments that arrive
 out-of-order from request

chore: sync tests
---
 tests/libtransmission/move-test.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/libtransmission/move-test.cc b/tests/libtransmission/move-test.cc
index cc5dc05ce6..e06d1daefd 100644
--- a/tests/libtransmission/move-test.cc
+++ b/tests/libtransmission/move-test.cc
@@ -84,7 +84,7 @@ TEST_P(IncompleteDirTest, incompleteDir)
 
     auto const test_incomplete_dir_threadfunc = [](TestIncompleteDirData* data) noexcept
     {
-        data->session->cache->writeBlock(data->tor->id(), data->block, data->buf);
+        data->session->cache->writeBlock(data->tor->id(), data->block, std::move(data->buf));
         tr_torrentGotBlock(data->tor, data->block);
         data->done = true;
     };