From 6115b1a9be4de41f2c7cbb855405bfd60eff81fc Mon Sep 17 00:00:00 2001 From: Brian Bouterse Date: Tue, 9 Feb 2016 14:37:09 -0500 Subject: [PATCH] Adds asynchronous error handling to Qpid transport Fixes #568 --- kombu/transport/qpid.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py index b458d32..081c6c6 100644 --- a/kombu/transport/qpid.py +++ b/kombu/transport/qpid.py @@ -1437,6 +1437,9 @@ def verify_runtime_environment(self): def _qpid_session_ready(self): os.write(self._w, '0') + def _qpid_exception(self, obj_with_exception): + os.write(self._w, 'e') + def on_readable(self, connection, loop): """Handle any messages associated with this Transport. @@ -1594,6 +1597,12 @@ def establish_connection(self): conn.client = self.client self.session = conn.get_qpid_connection().session() self.session.set_message_received_handler(self._qpid_session_ready) + conn.get_qpid_connection().set_exception_notify_handler( + self._qpid_exception + ) + self.session.set_exception_notify_handler( + self._qpid_exception + ) return conn def close_connection(self, connection):