From 134cd670ed5c2773eec8ab6774ea0a6bdb2e7528 Mon Sep 17 00:00:00 2001
From: bbouters <bbouters@redhat.com>
Date: Wed, 3 Aug 2016 19:23:28 +0000
Subject: [PATCH] Fixes Qpid file descriptor leak
This is already fixed in upstream Kombu, so this is a
port of the upstream fix to the version we carry.
The Pulp issue is:
https://pulp.plan.io/issues/2124
---
tests/transport/test_qpid.py | 48 +++++++++++++++-----------------------------
transport/qpid.py | 19 +++++++++---------
2 files changed, 26 insertions(+), 41 deletions(-)
diff --git a/tests/transport/test_qpid.py b/tests/transport/test_qpid.py
index e9272ff..fb7a831 100644
--- a/kombu/tests/transport/test_qpid.py
+++ b/kombu/tests/transport/test_qpid.py
@@ -1427,21 +1427,6 @@ class TestTransportInit(Case):
Transport(m)
self.mock_base_Transport__init__.assert_called_once_with(m)
- def test_transport___init___calls_os_pipe(self):
- Transport(Mock())
- self.mock_os.pipe.assert_called_once_with()
-
- def test_transport___init___saves_os_pipe_file_descriptors(self):
- transport = Transport(Mock())
- self.assertIs(transport.r, self.mock_r)
- self.assertIs(transport._w, self.mock_w)
-
- def test_transport___init___sets_non_blocking_behavior_on_r_fd(self):
- Transport(Mock())
- self.mock_fcntl.fcntl.assert_called_once_with(
- self.mock_r, self.mock_fcntl.F_SETFL, self.mock_os.O_NONBLOCK,
- )
-
@case_no_python3
@case_no_pypy
@@ -1813,6 +1798,7 @@ class TestTransportOnReadable(Case):
self.patch_b = patch.object(Transport, 'drain_events')
self.mock_drain_events = self.patch_b.start()
self.transport = Transport(Mock())
+ self.transport.register_with_event_loop(Mock(), Mock())
def tearDown(self):
self.patch_a.stop()
@@ -1904,25 +1890,23 @@ class TestTransport(ExtraAssertionsMixin, Case):
result_params = my_transport.default_connection_params
self.assertDictEqual(correct_params, result_params)
- @patch('os.close')
- def test_del(self, close):
+ @patch(QPID_MODULE + '.os.close')
+ def test_del_sync(self, close):
+ my_transport = Transport(self.mock_client)
+ my_transport.__del__()
+ self.assertFalse(close.called)
+
+ @patch(QPID_MODULE + '.os.close')
+ def test_del_async(self, close):
my_transport = Transport(self.mock_client)
+ my_transport.register_with_event_loop(Mock(), Mock())
my_transport.__del__()
- self.assertEqual(
- close.call_args_list,
- [
- ((my_transport.r,), {}),
- ((my_transport._w,), {}),
- ])
-
- @patch('os.close')
- def test_del_failed(self, close):
+ self.assertTrue(close.called)
+
+ @patch(QPID_MODULE + '.os.close')
+ def test_del_async_failed(self, close):
close.side_effect = OSError()
my_transport = Transport(self.mock_client)
+ my_transport.register_with_event_loop(Mock(), Mock())
my_transport.__del__()
- self.assertEqual(
- close.call_args_list,
- [
- ((my_transport.r,), {}),
- ((my_transport._w,), {}),
- ])
+ self.assertTrue(close.called)
diff --git a/transport/qpid.py b/transport/qpid.py
index aa0d8e9..a8e78c4 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -1393,9 +1393,6 @@ class Transport(base.Transport):
"""
self.verify_runtime_environment()
super(Transport, self).__init__(*args, **kwargs)
- self.r, self._w = os.pipe()
- if fcntl is not None:
- fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
self.use_async_interface = False
def verify_runtime_environment(self):
@@ -1522,6 +1519,9 @@ class Transport(base.Transport):
:param loop: A reference to the external loop.
:type loop: kombu.async.hub.Hub
"""
+ self.r, self._w = os.pipe()
+ if fcntl is not None:
+ fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
self.use_async_interface = True
loop.add_reader(self.r, self.on_readable, connection, loop)
@@ -1691,9 +1691,10 @@ class Transport(base.Transport):
"""
Ensure file descriptors opened in __init__() are closed.
"""
- for fd in (self.r, self._w):
- try:
- os.close(fd)
- except OSError:
- # ignored
- pass
+ if self.use_async_interface:
+ for fd in (self.r, self._w):
+ try:
+ os.close(fd)
+ except OSError:
+ # ignored
+ pass
--
2.7.4