Blob Blame History Raw
From 6115b1a9be4de41f2c7cbb855405bfd60eff81fc Mon Sep 17 00:00:00 2001
From: Brian Bouterse <bmbouter@gmail.com>
Date: Tue, 9 Feb 2016 14:37:09 -0500
Subject: [PATCH] Adds asynchronous error handling to Qpid transport

Fixes #568
---
 kombu/transport/qpid.py | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index b458d32..081c6c6 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -1437,6 +1437,9 @@ def verify_runtime_environment(self):
     def _qpid_session_ready(self):
         os.write(self._w, '0')
 
+    def _qpid_exception(self, obj_with_exception):
+        os.write(self._w, 'e')
+
     def on_readable(self, connection, loop):
         """Handle any messages associated with this Transport.
 
@@ -1594,6 +1597,12 @@ def establish_connection(self):
         conn.client = self.client
         self.session = conn.get_qpid_connection().session()
         self.session.set_message_received_handler(self._qpid_session_ready)
+        conn.get_qpid_connection().set_exception_notify_handler(
+            self._qpid_exception
+        )
+        self.session.set_exception_notify_handler(
+            self._qpid_exception
+        )
         return conn
 
     def close_connection(self, connection):