Blob Blame History Raw
From 134cd670ed5c2773eec8ab6774ea0a6bdb2e7528 Mon Sep 17 00:00:00 2001
From: bbouters <bbouters@redhat.com>
Date: Wed, 3 Aug 2016 19:23:28 +0000
Subject: [PATCH] Fixes Qpid file descriptor leak

This is already fixed in upstream Kombu, so this is a
port of the upstream fix to the version we carry.

The Pulp issue is:

https://pulp.plan.io/issues/2124
---
 tests/transport/test_qpid.py | 48 +++++++++++++++-----------------------------
 transport/qpid.py            | 19 +++++++++---------
 2 files changed, 26 insertions(+), 41 deletions(-)

diff --git a/tests/transport/test_qpid.py b/tests/transport/test_qpid.py
index e9272ff..fb7a831 100644
--- a/kombu/tests/transport/test_qpid.py
+++ b/kombu/tests/transport/test_qpid.py
@@ -1427,21 +1427,6 @@ class TestTransportInit(Case):
         Transport(m)
         self.mock_base_Transport__init__.assert_called_once_with(m)
 
-    def test_transport___init___calls_os_pipe(self):
-        Transport(Mock())
-        self.mock_os.pipe.assert_called_once_with()
-
-    def test_transport___init___saves_os_pipe_file_descriptors(self):
-        transport = Transport(Mock())
-        self.assertIs(transport.r, self.mock_r)
-        self.assertIs(transport._w, self.mock_w)
-
-    def test_transport___init___sets_non_blocking_behavior_on_r_fd(self):
-        Transport(Mock())
-        self.mock_fcntl.fcntl.assert_called_once_with(
-            self.mock_r,  self.mock_fcntl.F_SETFL,  self.mock_os.O_NONBLOCK,
-        )
-
 
 @case_no_python3
 @case_no_pypy
@@ -1813,6 +1798,7 @@ class TestTransportOnReadable(Case):
         self.patch_b = patch.object(Transport, 'drain_events')
         self.mock_drain_events = self.patch_b.start()
         self.transport = Transport(Mock())
+        self.transport.register_with_event_loop(Mock(), Mock())
 
     def tearDown(self):
         self.patch_a.stop()
@@ -1904,25 +1890,23 @@ class TestTransport(ExtraAssertionsMixin, Case):
         result_params = my_transport.default_connection_params
         self.assertDictEqual(correct_params, result_params)
 
-    @patch('os.close')
-    def test_del(self, close):
+    @patch(QPID_MODULE + '.os.close')
+    def test_del_sync(self, close):
+        my_transport = Transport(self.mock_client)
+        my_transport.__del__()
+        self.assertFalse(close.called)
+
+    @patch(QPID_MODULE + '.os.close')
+    def test_del_async(self, close):
         my_transport = Transport(self.mock_client)
+        my_transport.register_with_event_loop(Mock(), Mock())
         my_transport.__del__()
-        self.assertEqual(
-            close.call_args_list,
-            [
-                ((my_transport.r,), {}),
-                ((my_transport._w,), {}),
-            ])
-
-    @patch('os.close')
-    def test_del_failed(self, close):
+        self.assertTrue(close.called)
+
+    @patch(QPID_MODULE + '.os.close')
+    def test_del_async_failed(self, close):
         close.side_effect = OSError()
         my_transport = Transport(self.mock_client)
+        my_transport.register_with_event_loop(Mock(), Mock())
         my_transport.__del__()
-        self.assertEqual(
-            close.call_args_list,
-            [
-                ((my_transport.r,), {}),
-                ((my_transport._w,), {}),
-            ])
+        self.assertTrue(close.called)
diff --git a/transport/qpid.py b/transport/qpid.py
index aa0d8e9..a8e78c4 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -1393,9 +1393,6 @@ class Transport(base.Transport):
         """
         self.verify_runtime_environment()
         super(Transport, self).__init__(*args, **kwargs)
-        self.r, self._w = os.pipe()
-        if fcntl is not None:
-            fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
         self.use_async_interface = False
 
     def verify_runtime_environment(self):
@@ -1522,6 +1519,9 @@ class Transport(base.Transport):
         :param loop: A reference to the external loop.
         :type loop: kombu.async.hub.Hub
         """
+        self.r, self._w = os.pipe()
+        if fcntl is not None:
+            fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
         self.use_async_interface = True
         loop.add_reader(self.r, self.on_readable, connection, loop)
 
@@ -1691,9 +1691,10 @@ class Transport(base.Transport):
         """
         Ensure file descriptors opened in __init__() are closed.
         """
-        for fd in (self.r, self._w):
-            try:
-                os.close(fd)
-            except OSError:
-                # ignored
-                pass
+        if self.use_async_interface:
+            for fd in (self.r, self._w):
+                try:
+                    os.close(fd)
+                except OSError:
+                    # ignored
+                    pass
-- 
2.7.4