a7b729
From d1ee735a4aacb80d9c3c4c34fc4317c6eef6718a Mon Sep 17 00:00:00 2001
a7b729
From: "brian m. carlson" <bk2204@github.com>
a7b729
Date: Wed, 28 Aug 2019 21:02:26 +0000
a7b729
Subject: [PATCH] Avoid deadlock when transfer queue fails
a7b729
a7b729
In 1412d6e4 ("Don't fail if we lack objects the server has",
a7b729
2019-04-30), we changed the code to abort later if a missing object
a7b729
occurs.  In doing so, we had to consider the case where the transfer
a7b729
queue aborts early for some reason and ensure that the sync.WaitGroup
a7b729
does not unnecessarily block due to outstanding objects never getting
a7b729
processed.
a7b729
a7b729
However, the approach we used, which was to explicitly add the number of
a7b729
items we skipped processing, was error prone and didn't cover all cases.
a7b729
Notably, a DNS failure could randomly cause a hang during a push.  Solve
a7b729
this by creating a class for a wait group which is abortable and simply
a7b729
abort it if we encounter an error, preventing any deadlocks caused by
a7b729
miscounting the number of items.
a7b729
---
a7b729
 tq/transfer_queue.go | 55 ++++++++++++++++++++++++++++++++++----------
a7b729
 1 file changed, 43 insertions(+), 12 deletions(-)
a7b729
a7b729
diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go
a7b729
index 89296a646..7d39fe581 100644
a7b729
--- a/tq/transfer_queue.go
a7b729
+++ b/tq/transfer_queue.go
a7b729
@@ -123,6 +123,43 @@ func (b batch) Len() int           { return len(b) }
a7b729
 func (b batch) Less(i, j int) bool { return b[i].Size < b[j].Size }
a7b729
 func (b batch) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
a7b729
 
a7b729
+type abortableWaitGroup struct {
a7b729
+	wq      sync.WaitGroup
a7b729
+	counter int
a7b729
+	mu      sync.Mutex
a7b729
+}
a7b729
+
a7b729
+func newAbortableWaitQueue() *abortableWaitGroup {
a7b729
+	return &abortableWaitGroup{}
a7b729
+}
a7b729
+
a7b729
+func (q *abortableWaitGroup) Add(delta int) {
a7b729
+	q.mu.Lock()
a7b729
+	defer q.mu.Unlock()
a7b729
+
a7b729
+	q.counter += delta
a7b729
+	q.wq.Add(delta)
a7b729
+}
a7b729
+
a7b729
+func (q *abortableWaitGroup) Done() {
a7b729
+	q.mu.Lock()
a7b729
+	defer q.mu.Unlock()
a7b729
+
a7b729
+	q.counter -= 1
a7b729
+	q.wq.Done()
a7b729
+}
a7b729
+
a7b729
+func (q *abortableWaitGroup) Abort() {
a7b729
+	q.mu.Lock()
a7b729
+	defer q.mu.Unlock()
a7b729
+
a7b729
+	q.wq.Add(-q.counter)
a7b729
+}
a7b729
+
a7b729
+func (q *abortableWaitGroup) Wait() {
a7b729
+	q.wq.Wait()
a7b729
+}
a7b729
+
a7b729
 // TransferQueue organises the wider process of uploading and downloading,
a7b729
 // including calling the API, passing the actual transfer request to transfer
a7b729
 // adapters, and dealing with progress, errors and retries.
a7b729
@@ -150,7 +187,7 @@ type TransferQueue struct {
a7b729
 	// wait is used to keep track of pending transfers. It is incremented
a7b729
 	// once per unique OID on Add(), and is decremented when that transfer
a7b729
 	// is marked as completed or failed, but not retried.
a7b729
-	wait     sync.WaitGroup
a7b729
+	wait     *abortableWaitGroup
a7b729
 	manifest *Manifest
a7b729
 	rc       *retryCounter
a7b729
 
a7b729
@@ -250,6 +287,7 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
a7b729
 		trMutex:   &sync.Mutex{},
a7b729
 		manifest:  manifest,
a7b729
 		rc:        newRetryCounter(),
a7b729
+		wait:      newAbortableWaitQueue(),
a7b729
 	}
a7b729
 
a7b729
 	for _, opt := range options {
a7b729
@@ -401,8 +439,11 @@ func (q *TransferQueue) collectBatches() {
a7b729
 		collected, closing = q.collectPendingUntil(done)
a7b729
 
a7b729
 		// If we've encountered a serious error here, abort immediately;
a7b729
-		// don't process further batches.
a7b729
+		// don't process further batches.  Abort the wait queue so that
a7b729
+		// we don't deadlock waiting for objects to complete when they
a7b729
+		// never will.
a7b729
 		if err != nil {
a7b729
+			q.wait.Abort()
a7b729
 			break
a7b729
 		}
a7b729
 
a7b729
@@ -497,11 +538,6 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
a7b729
 				}
a7b729
 			}
a7b729
 
a7b729
-			if err != nil && bRes != nil {
a7b729
-				// Avoid a hang if we return early.
a7b729
-				q.wait.Add(-len(bRes.Objects))
a7b729
-			}
a7b729
-
a7b729
 			return next, err
a7b729
 		}
a7b729
 	}
a7b729
@@ -521,11 +557,6 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
a7b729
 			// missing in that case, since we don't need to upload
a7b729
 			// it.
a7b729
 			if o.Missing && len(o.Actions) != 0 {
a7b729
-				// Indicate that we've handled these objects, in
a7b729
-				// this case by ignoring them and aborting
a7b729
-				// early. Failing to do this means we deadlock
a7b729
-				// on this WaitGroup.
a7b729
-				q.wait.Add(-len(bRes.Objects))
a7b729
 				return nil, errors.Errorf("Unable to find source for object %v (try running git lfs fetch --all)", o.Oid)
a7b729
 			}
a7b729
 		}