--- /kombu/transport/qpid.py 2016-04-01 16:59:49.698901199 -0400 +++ /kombu/transport/qpid.py.modified 2016-04-01 16:59:43.643873453 -0400 @@ -1396,6 +1396,7 @@ self.r, self._w = os.pipe() if fcntl is not None: fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK) + self.use_async_interface = False def verify_runtime_environment(self): """Verify that the runtime environment is acceptable. @@ -1435,10 +1436,12 @@ 'qpid-python`.') def _qpid_message_ready_handler(self, session): - os.write(self._w, '0') + if self.use_async_interface: + os.write(self._w, '0') def _qpid_async_exception_notify_handler(self, obj_with_exception, exc): - os.write(self._w, 'e') + if self.use_async_interface: + os.write(self._w, 'e') def on_readable(self, connection, loop): """Handle any messages associated with this Transport. @@ -1519,6 +1522,7 @@ :param loop: A reference to the external loop. :type loop: kombu.async.hub.Hub """ + self.use_async_interface = True loop.add_reader(self.r, self.on_readable, connection, loop) def establish_connection(self): --- /kombu/tests/transport/test_qpid.py 2016-04-01 17:10:59.140637210 -0400 +++ /kombu/tests/transport/test_qpid.py.modified 2016-04-01 17:13:40.708274142 -0400 @@ -1788,6 +1788,7 @@ self.patch_a = patch(QPID_MODULE + '.os.write') self.mock_os_write = self.patch_a.start() self.transport = Transport(Mock()) + self.transport.register_with_event_loop(Mock(), Mock()) def tearDown(self): self.patch_a.stop()