Blob Blame History Raw
--- /kombu/transport/qpid.py	2016-04-01 16:59:49.698901199 -0400
+++ /kombu/transport/qpid.py.modified	2016-04-01 16:59:43.643873453 -0400
@@ -1396,6 +1396,7 @@
         self.r, self._w = os.pipe()
         if fcntl is not None:
             fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
+        self.use_async_interface = False
 
     def verify_runtime_environment(self):
         """Verify that the runtime environment is acceptable.
@@ -1435,10 +1436,12 @@
                 'qpid-python`.')
 
     def _qpid_message_ready_handler(self, session):
-        os.write(self._w, '0')
+        if self.use_async_interface:
+            os.write(self._w, '0')
 
     def _qpid_async_exception_notify_handler(self, obj_with_exception, exc):
-        os.write(self._w, 'e')
+        if self.use_async_interface:
+            os.write(self._w, 'e')
 
     def on_readable(self, connection, loop):
         """Handle any messages associated with this Transport.
@@ -1519,6 +1522,7 @@
         :param loop: A reference to the external loop.
         :type loop: kombu.async.hub.Hub
         """
+        self.use_async_interface = True
         loop.add_reader(self.r, self.on_readable, connection, loop)
 
     def establish_connection(self):
--- /kombu/tests/transport/test_qpid.py	2016-04-01 17:10:59.140637210 -0400
+++ /kombu/tests/transport/test_qpid.py.modified	2016-04-01 17:13:40.708274142 -0400
@@ -1788,6 +1788,7 @@
         self.patch_a = patch(QPID_MODULE + '.os.write')
         self.mock_os_write = self.patch_a.start()
         self.transport = Transport(Mock())
+        self.transport.register_with_event_loop(Mock(), Mock())
 
     def tearDown(self):
         self.patch_a.stop()