a7b729a
From d1ee735a4aacb80d9c3c4c34fc4317c6eef6718a Mon Sep 17 00:00:00 2001
a7b729a
From: "brian m. carlson" <bk2204@github.com>
a7b729a
Date: Wed, 28 Aug 2019 21:02:26 +0000
a7b729a
Subject: [PATCH] Avoid deadlock when transfer queue fails
a7b729a
a7b729a
In 1412d6e4 ("Don't fail if we lack objects the server has",
a7b729a
2019-04-30), we changed the code to abort later if a missing object
a7b729a
occurs.  In doing so, we had to consider the case where the transfer
a7b729a
queue aborts early for some reason and ensure that the sync.WaitGroup
a7b729a
does not unnecessarily block due to outstanding objects never getting
a7b729a
processed.
a7b729a
a7b729a
However, the approach we used, which was to explicitly add the number of
a7b729a
items we skipped processing, was error prone and didn't cover all cases.
a7b729a
Notably, a DNS failure could randomly cause a hang during a push.  Solve
a7b729a
this by creating a class for a wait group which is abortable and simply
a7b729a
abort it if we encounter an error, preventing any deadlocks caused by
a7b729a
miscounting the number of items.
a7b729a
---
a7b729a
 tq/transfer_queue.go | 55 ++++++++++++++++++++++++++++++++++----------
a7b729a
 1 file changed, 43 insertions(+), 12 deletions(-)
a7b729a
a7b729a
diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go
a7b729a
index 89296a646..7d39fe581 100644
a7b729a
--- a/tq/transfer_queue.go
a7b729a
+++ b/tq/transfer_queue.go
a7b729a
@@ -123,6 +123,43 @@ func (b batch) Len() int           { return len(b) }
a7b729a
 func (b batch) Less(i, j int) bool { return b[i].Size < b[j].Size }
a7b729a
 func (b batch) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
a7b729a
 
a7b729a
+type abortableWaitGroup struct {
a7b729a
+	wq      sync.WaitGroup
a7b729a
+	counter int
a7b729a
+	mu      sync.Mutex
a7b729a
+}
a7b729a
+
a7b729a
+func newAbortableWaitQueue() *abortableWaitGroup {
a7b729a
+	return &abortableWaitGroup{}
a7b729a
+}
a7b729a
+
a7b729a
+func (q *abortableWaitGroup) Add(delta int) {
a7b729a
+	q.mu.Lock()
a7b729a
+	defer q.mu.Unlock()
a7b729a
+
a7b729a
+	q.counter += delta
a7b729a
+	q.wq.Add(delta)
a7b729a
+}
a7b729a
+
a7b729a
+func (q *abortableWaitGroup) Done() {
a7b729a
+	q.mu.Lock()
a7b729a
+	defer q.mu.Unlock()
a7b729a
+
a7b729a
+	q.counter -= 1
a7b729a
+	q.wq.Done()
a7b729a
+}
a7b729a
+
a7b729a
+func (q *abortableWaitGroup) Abort() {
a7b729a
+	q.mu.Lock()
a7b729a
+	defer q.mu.Unlock()
a7b729a
+
a7b729a
+	q.wq.Add(-q.counter)
a7b729a
+}
a7b729a
+
a7b729a
+func (q *abortableWaitGroup) Wait() {
a7b729a
+	q.wq.Wait()
a7b729a
+}
a7b729a
+
a7b729a
 // TransferQueue organises the wider process of uploading and downloading,
a7b729a
 // including calling the API, passing the actual transfer request to transfer
a7b729a
 // adapters, and dealing with progress, errors and retries.
a7b729a
@@ -150,7 +187,7 @@ type TransferQueue struct {
a7b729a
 	// wait is used to keep track of pending transfers. It is incremented
a7b729a
 	// once per unique OID on Add(), and is decremented when that transfer
a7b729a
 	// is marked as completed or failed, but not retried.
a7b729a
-	wait     sync.WaitGroup
a7b729a
+	wait     *abortableWaitGroup
a7b729a
 	manifest *Manifest
a7b729a
 	rc       *retryCounter
a7b729a
 
a7b729a
@@ -250,6 +287,7 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
a7b729a
 		trMutex:   &sync.Mutex{},
a7b729a
 		manifest:  manifest,
a7b729a
 		rc:        newRetryCounter(),
a7b729a
+		wait:      newAbortableWaitQueue(),
a7b729a
 	}
a7b729a
 
a7b729a
 	for _, opt := range options {
a7b729a
@@ -401,8 +439,11 @@ func (q *TransferQueue) collectBatches() {
a7b729a
 		collected, closing = q.collectPendingUntil(done)
a7b729a
 
a7b729a
 		// If we've encountered a serious error here, abort immediately;
a7b729a
-		// don't process further batches.
a7b729a
+		// don't process further batches.  Abort the wait queue so that
a7b729a
+		// we don't deadlock waiting for objects to complete when they
a7b729a
+		// never will.
a7b729a
 		if err != nil {
a7b729a
+			q.wait.Abort()
a7b729a
 			break
a7b729a
 		}
a7b729a
 
a7b729a
@@ -497,11 +538,6 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
a7b729a
 				}
a7b729a
 			}
a7b729a
 
a7b729a
-			if err != nil && bRes != nil {
a7b729a
-				// Avoid a hang if we return early.
a7b729a
-				q.wait.Add(-len(bRes.Objects))
a7b729a
-			}
a7b729a
-
a7b729a
 			return next, err
a7b729a
 		}
a7b729a
 	}
a7b729a
@@ -521,11 +557,6 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
a7b729a
 			// missing in that case, since we don't need to upload
a7b729a
 			// it.
a7b729a
 			if o.Missing && len(o.Actions) != 0 {
a7b729a
-				// Indicate that we've handled these objects, in
a7b729a
-				// this case by ignoring them and aborting
a7b729a
-				// early. Failing to do this means we deadlock
a7b729a
-				// on this WaitGroup.
a7b729a
-				q.wait.Add(-len(bRes.Objects))
a7b729a
 				return nil, errors.Errorf("Unable to find source for object %v (try running git lfs fetch --all)", o.Oid)
a7b729a
 			}
a7b729a
 		}