From 6115b1a9be4de41f2c7cbb855405bfd60eff81fc Mon Sep 17 00:00:00 2001
From: Brian Bouterse <bmbouter@gmail.com>
Date: Tue, 9 Feb 2016 14:37:09 -0500
Subject: [PATCH] Adds asynchronous error handling to Qpid transport
Fixes #568
---
kombu/transport/qpid.py | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index b458d32..081c6c6 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -1437,6 +1437,9 @@ def verify_runtime_environment(self):
def _qpid_session_ready(self):
os.write(self._w, '0')
+ def _qpid_exception(self, obj_with_exception):
+ os.write(self._w, 'e')
+
def on_readable(self, connection, loop):
"""Handle any messages associated with this Transport.
@@ -1594,6 +1597,12 @@ def establish_connection(self):
conn.client = self.client
self.session = conn.get_qpid_connection().session()
self.session.set_message_received_handler(self._qpid_session_ready)
+ conn.get_qpid_connection().set_exception_notify_handler(
+ self._qpid_exception
+ )
+ self.session.set_exception_notify_handler(
+ self._qpid_exception
+ )
return conn
def close_connection(self, connection):