Blob Blame History Raw
From 7f35ac622a7389a3b2077b245d7f322918b03515 Mon Sep 17 00:00:00 2001
From: Lubomír Sedlář <lsedlar@redhat.com>
Date: Nov 25 2019 13:28:34 +0000
Subject: gather: Collect and re-raise errors from gather method


When there is an exception in gathering (such as after seeing unsigned
packages in deps method), the exception was lost and the compose
continued to run until it tried to access the result and crashed on
KeyError.

Relates: https://pagure.io/releng/failed-composes/issue/587
JIRA: COMPOSE-3986
Signed-off-by: Lubomír Sedlář <lsedlar@redhat.com>

---

diff --git a/pungi/phases/gather/__init__.py b/pungi/phases/gather/__init__.py
index 444a8c2..9d54f15 100644
--- a/pungi/phases/gather/__init__.py
+++ b/pungi/phases/gather/__init__.py
@@ -443,6 +443,7 @@ def _gather_variants(result, compose, variant_type, package_sets, exclude_fulltr
             continue
         threads_list = []
         que = Queue()
+        errors = Queue()
         for arch in variant.arches:
             fulltree_excludes = set()
             if exclude_fulltree:
@@ -454,11 +455,17 @@ def _gather_variants(result, compose, variant_type, package_sets, exclude_fulltr
             # there.
             _update_lookaside_config(compose, variant, arch, result, package_sets)
 
+            def worker(que, errors, arch, *args, **kwargs):
+                try:
+                    que.put((arch, gather_packages(*args, **kwargs)))
+                except Exception as exc:
+                    errors.put(exc)
+
             # Run gather_packages() in parallel with multi threads and store
             # its return value in a Queue() for later use.
             t = threading.Thread(
-                target=lambda q, arch, *args, **kwargs: q.put((arch, gather_packages(*args, **kwargs))),
-                args=(que, arch, compose, arch, variant, package_sets),
+                target=worker,
+                args=(que, errors, arch, compose, arch, variant, package_sets),
                 kwargs={'fulltree_excludes': fulltree_excludes},
             )
             threads_list.append(t)
@@ -467,6 +474,10 @@ def _gather_variants(result, compose, variant_type, package_sets, exclude_fulltr
         for t in threads_list:
             t.join()
 
+        while not errors.empty():
+            exc = errors.get()
+            raise exc
+
         while not que.empty():
             arch, pkg_map = que.get()
             result.setdefault(arch, {})[variant.uid] = pkg_map