From 134cd670ed5c2773eec8ab6774ea0a6bdb2e7528 Mon Sep 17 00:00:00 2001 From: bbouters Date: Wed, 3 Aug 2016 19:23:28 +0000 Subject: [PATCH] Fixes Qpid file descriptor leak This is already fixed in upstream Kombu, so this is a port of the upstream fix to the version we carry. The Pulp issue is: https://pulp.plan.io/issues/2124 --- tests/transport/test_qpid.py | 48 +++++++++++++++----------------------------- transport/qpid.py | 19 +++++++++--------- 2 files changed, 26 insertions(+), 41 deletions(-) diff --git a/tests/transport/test_qpid.py b/tests/transport/test_qpid.py index e9272ff..fb7a831 100644 --- a/kombu/tests/transport/test_qpid.py +++ b/kombu/tests/transport/test_qpid.py @@ -1427,21 +1427,6 @@ class TestTransportInit(Case): Transport(m) self.mock_base_Transport__init__.assert_called_once_with(m) - def test_transport___init___calls_os_pipe(self): - Transport(Mock()) - self.mock_os.pipe.assert_called_once_with() - - def test_transport___init___saves_os_pipe_file_descriptors(self): - transport = Transport(Mock()) - self.assertIs(transport.r, self.mock_r) - self.assertIs(transport._w, self.mock_w) - - def test_transport___init___sets_non_blocking_behavior_on_r_fd(self): - Transport(Mock()) - self.mock_fcntl.fcntl.assert_called_once_with( - self.mock_r, self.mock_fcntl.F_SETFL, self.mock_os.O_NONBLOCK, - ) - @case_no_python3 @case_no_pypy @@ -1813,6 +1798,7 @@ class TestTransportOnReadable(Case): self.patch_b = patch.object(Transport, 'drain_events') self.mock_drain_events = self.patch_b.start() self.transport = Transport(Mock()) + self.transport.register_with_event_loop(Mock(), Mock()) def tearDown(self): self.patch_a.stop() @@ -1904,25 +1890,23 @@ class TestTransport(ExtraAssertionsMixin, Case): result_params = my_transport.default_connection_params self.assertDictEqual(correct_params, result_params) - @patch('os.close') - def test_del(self, close): + @patch(QPID_MODULE + '.os.close') + def test_del_sync(self, close): + my_transport = Transport(self.mock_client) + my_transport.__del__() + self.assertFalse(close.called) + + @patch(QPID_MODULE + '.os.close') + def test_del_async(self, close): my_transport = Transport(self.mock_client) + my_transport.register_with_event_loop(Mock(), Mock()) my_transport.__del__() - self.assertEqual( - close.call_args_list, - [ - ((my_transport.r,), {}), - ((my_transport._w,), {}), - ]) - - @patch('os.close') - def test_del_failed(self, close): + self.assertTrue(close.called) + + @patch(QPID_MODULE + '.os.close') + def test_del_async_failed(self, close): close.side_effect = OSError() my_transport = Transport(self.mock_client) + my_transport.register_with_event_loop(Mock(), Mock()) my_transport.__del__() - self.assertEqual( - close.call_args_list, - [ - ((my_transport.r,), {}), - ((my_transport._w,), {}), - ]) + self.assertTrue(close.called) diff --git a/transport/qpid.py b/transport/qpid.py index aa0d8e9..a8e78c4 100644 --- a/kombu/transport/qpid.py +++ b/kombu/transport/qpid.py @@ -1393,9 +1393,6 @@ class Transport(base.Transport): """ self.verify_runtime_environment() super(Transport, self).__init__(*args, **kwargs) - self.r, self._w = os.pipe() - if fcntl is not None: - fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK) self.use_async_interface = False def verify_runtime_environment(self): @@ -1522,6 +1519,9 @@ class Transport(base.Transport): :param loop: A reference to the external loop. :type loop: kombu.async.hub.Hub """ + self.r, self._w = os.pipe() + if fcntl is not None: + fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK) self.use_async_interface = True loop.add_reader(self.r, self.on_readable, connection, loop) @@ -1691,9 +1691,10 @@ class Transport(base.Transport): """ Ensure file descriptors opened in __init__() are closed. """ - for fd in (self.r, self._w): - try: - os.close(fd) - except OSError: - # ignored - pass + if self.use_async_interface: + for fd in (self.r, self._w): + try: + os.close(fd) + except OSError: + # ignored + pass -- 2.7.4