Blob Blame History Raw
From 5ef3cba9682fd7b12493af6db4628ae2962d6998 Mon Sep 17 00:00:00 2001
From: Brian Bouterse <bmbouter@gmail.com>
Date: Fri, 22 Jan 2016 16:39:13 -0500
Subject: [PATCH 1/4] @acks_late usage in Qpid Transport now acks all messages

Implements a workaround for celery/celery#3019
---
 kombu/transport/qpid.py | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 639d837..8b6301e 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the
 from __future__ import absolute_import
 
 import os
+import random
 import select
 import socket
 import ssl
@@ -938,6 +939,10 @@ class Channel(base.StdChannel):
 
         def _callback(qpid_message):
             raw_message = qpid_message.content
+
+            # workaround for https://github.com/celery/celery/issues/3019
+            raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000)
+
             message = self.Message(self, raw_message)
             delivery_tag = message.delivery_tag
             self.qos.append(qpid_message, delivery_tag)
-- 
2.4.3


From f7483a3cde70e488e308132295c23c39ee469092 Mon Sep 17 00:00:00 2001
From: Brian Bouterse <bmbouter@gmail.com>
Date: Mon, 25 Jan 2016 11:43:20 -0500
Subject: [PATCH 2/4] Revert "@acks_late usage in Qpid Transport now acks all
 messages"

This reverts commit 5ef3cba9682fd7b12493af6db4628ae2962d6998.
---
 kombu/transport/qpid.py | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 8b6301e..639d837 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -74,7 +74,6 @@ Celery with Kombu, this can be accomplished by setting the
 from __future__ import absolute_import
 
 import os
-import random
 import select
 import socket
 import ssl
@@ -939,10 +938,6 @@ class Channel(base.StdChannel):
 
         def _callback(qpid_message):
             raw_message = qpid_message.content
-
-            # workaround for https://github.com/celery/celery/issues/3019
-            raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000)
-
             message = self.Message(self, raw_message)
             delivery_tag = message.delivery_tag
             self.qos.append(qpid_message, delivery_tag)
-- 
2.4.3


From affa5f5e09c75c660f5ffbafd8aaedc7b8cdae5e Mon Sep 17 00:00:00 2001
From: Brian Bouterse <bmbouter@gmail.com>
Date: Mon, 25 Jan 2016 11:54:45 -0500
Subject: [PATCH 3/4] @acks_late usage in Qpid Transport now acks all messages

Implements a workaround for celery/celery#3019
---
 kombu/tests/transport/test_qpid.py | 4 ----
 kombu/transport/qpid.py            | 8 +++-----
 2 files changed, 3 insertions(+), 9 deletions(-)

diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
index 4131193..a72077b 100644
--- a/kombu/tests/transport/test_qpid.py
+++ b/kombu/tests/transport/test_qpid.py
@@ -940,10 +940,6 @@ class TestChannel(ExtraAssertionsMixin, Case):
         self.assertIn('base64', Channel.codecs)
         self.assertIsInstance(Channel.codecs['base64'], Base64)
 
-    def test_delivery_tags(self):
-        """Test that _delivery_tags is using itertools"""
-        self.assertIsInstance(Channel._delivery_tags, count)
-
     def test_size(self):
         """Test getting the number of messages in a queue specified by
         name and returning them."""
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 639d837..6d9b006 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the
 from __future__ import absolute_import
 
 import os
+import random
 import select
 import socket
 import ssl
@@ -368,9 +369,6 @@ class Channel(base.StdChannel):
     #: Binary <-> ASCII codecs.
     codecs = {'base64': Base64()}
 
-    #: counter used to generate delivery tags for this channel.
-    _delivery_tags = count(1)
-
     def __init__(self, connection, transport):
         """Instantiate a Channel object.
 
@@ -1070,7 +1068,7 @@ class Channel(base.StdChannel):
         - wraps the body as a buffer object, so that
             :class:`qpid.messaging.endpoints.Sender` uses a content type
             that can support arbitrarily large messages.
-        - assigns a delivery_tag generated through self._delivery_tags
+        - assigns a random delivery_tag
         - sets the exchange and routing_key info as delivery_info
 
         Internally uses :meth:`_put` to send the message synchronously. This
@@ -1096,7 +1094,7 @@ class Channel(base.StdChannel):
         props = message['properties']
         props.update(
             body_encoding=body_encoding,
-            delivery_tag=next(self._delivery_tags),
+            delivery_tag=random.randint(1, sys.maxint),
         )
         props['delivery_info'].update(
             exchange=exchange,
-- 
2.4.3


From 7d6af48c06002deffc135c7fad506909fbf840e6 Mon Sep 17 00:00:00 2001
From: Brian Bouterse <bmbouter@gmail.com>
Date: Mon, 25 Jan 2016 16:07:53 -0500
Subject: [PATCH 4/4] Switches delivery_tag to uuid.uuid4() for Qpid transport

celery/kombu#563
---
 kombu/tests/transport/test_qpid.py |  3 ++-
 kombu/transport/qpid.py            | 43 +++++++++++++++++---------------------
 2 files changed, 21 insertions(+), 25 deletions(-)

diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
index a72077b..a3894e4 100644
--- a/kombu/tests/transport/test_qpid.py
+++ b/kombu/tests/transport/test_qpid.py
@@ -5,6 +5,7 @@ import ssl
 import socket
 import sys
 import time
+import uuid
 
 from collections import Callable
 from itertools import count
@@ -1317,7 +1318,7 @@ class TestChannel(ExtraAssertionsMixin, Case):
             mock_message['properties']['body_encoding'], mock_body_encoding,
         )
         self.assertIsInstance(
-            mock_message['properties']['delivery_tag'], int,
+            mock_message['properties']['delivery_tag'], uuid.UUID,
         )
         self.assertIs(
             mock_message['properties']['delivery_info']['exchange'],
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 6d9b006..b458d32 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -74,14 +74,13 @@ Celery with Kombu, this can be accomplished by setting the
 from __future__ import absolute_import
 
 import os
-import random
 import select
 import socket
 import ssl
 import sys
 import time
+import uuid
 
-from itertools import count
 from gettext import gettext as _
 
 import amqp.protocol
@@ -160,7 +159,7 @@ class QoS(object):
     ACKed asynchronously through a call to :meth:`ack`. Messages that are
     received, but not ACKed will not be delivered by the broker to another
     consumer until an ACK is received, or the session is closed. Messages
-    are referred to using delivery_tag integers, which are unique per
+    are referred to using delivery_tag, which are unique per
     :class:`Channel`. Delivery tags are managed outside of this object and
     are passed in with a message to :meth:`append`. Un-ACKed messages can
     be looked up from QoS using :meth:`get` and can be rejected and
@@ -214,15 +213,15 @@ class QoS(object):
     def append(self, message, delivery_tag):
         """Append message to the list of un-ACKed messages.
 
-        Add a message, referenced by the integer delivery_tag, for ACKing,
+        Add a message, referenced by the delivery_tag, for ACKing,
         rejecting, or getting later. Messages are saved into an
         :class:`~kombu.utils.compat.OrderedDict` by delivery_tag.
 
         :param message: A received message that has not yet been ACKed.
         :type message: qpid.messaging.Message
-        :param delivery_tag: An integer number to refer to this message by
+        :param delivery_tag: A UUID to refer to this message by
             upon receipt.
-        :type delivery_tag: int
+        :type delivery_tag: uuid.UUID
         """
         self._not_yet_acked[delivery_tag] = message
 
@@ -233,7 +232,7 @@ class QoS(object):
 
         :param delivery_tag: The delivery tag associated with the message
             to be returned.
-        :type delivery_tag: int
+        :type delivery_tag: uuid.UUID
 
         :return: An un-ACKed message that is looked up by delivery_tag.
         :rtype: qpid.messaging.Message
@@ -248,7 +247,7 @@ class QoS(object):
 
         :param delivery_tag: the delivery tag associated with the message
             to be acknowledged.
-        :type delivery_tag: int
+        :type delivery_tag: uuid.UUID
         """
         message = self._not_yet_acked.pop(delivery_tag)
         self.session.acknowledge(message=message)
@@ -266,7 +265,7 @@ class QoS(object):
 
         :param delivery_tag: The delivery tag associated with the message
             to be rejected.
-        :type delivery_tag: int
+        :type delivery_tag: uuid.UUID
         :keyword requeue: If True, the broker will be notified to requeue
             the message. If False, the broker will be told to drop the
             message entirely. In both cases, the message will be removed
@@ -311,10 +310,9 @@ class Channel(base.StdChannel):
     Messages sent using this Channel are assigned a delivery_tag. The
     delivery_tag is generated for a message as they are prepared for
     sending by :meth:`basic_publish`. The delivery_tag is unique per
-    Channel instance using :meth:`~itertools.count`. The delivery_tag has
-    no meaningful context in other objects, and is only maintained in the
-    memory of this object, and the underlying :class:`QoS` object that
-    provides support.
+    Channel instance. The delivery_tag has no meaningful context in other
+    objects, and is only maintained in the memory of this object, and the
+    underlying :class:`QoS` object that provides support.
 
     Each Channel object instantiates exactly one :class:`QoS` object for
     prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is
@@ -842,7 +840,7 @@ class Channel(base.StdChannel):
 
         :param delivery_tag: The delivery tag associated with the message
             to be acknowledged.
-        :type delivery_tag: int
+        :type delivery_tag: uuid.UUID
         """
         self.qos.ack(delivery_tag)
 
@@ -860,7 +858,7 @@ class Channel(base.StdChannel):
 
         :param delivery_tag: The delivery tag associated with the message
             to be rejected.
-        :type delivery_tag: int
+        :type delivery_tag: uuid.UUID
         :keyword requeue: If False, the rejected message will be dropped by
             the broker and not delivered to any other consumers. If True,
             then the rejected message will be requeued for delivery to
@@ -901,10 +899,9 @@ class Channel(base.StdChannel):
         handled by the caller of :meth:`~Transport.drain_events`. Messages
         can be ACKed after being received through a call to :meth:`basic_ack`.
 
-        If no_ack is True, the messages are immediately ACKed to avoid a
-        memory leak in qpid.messaging when messages go un-ACKed. The no_ack
-        flag indicates that the receiver of the message does not intent to
-        call :meth:`basic_ack`.
+        If no_ack is True, The no_ack flag indicates that the receiver of
+        the message will not call :meth:`basic_ack` later. Since the
+        message will not be ACKed later, it is ACKed immediately.
 
         :meth:`basic_consume` transforms the message object type prior to
         calling the callback. Initially the message comes in as a
@@ -940,9 +937,7 @@ class Channel(base.StdChannel):
             delivery_tag = message.delivery_tag
             self.qos.append(qpid_message, delivery_tag)
             if no_ack:
-                # Celery will not ack this message later, so we should to
-                # avoid a memory leak in qpid.messaging due to un-ACKed
-                # messages.
+                # Celery will not ack this message later, so we should ack now
                 self.basic_ack(delivery_tag)
             return callback(message)
 
@@ -1068,7 +1063,7 @@ class Channel(base.StdChannel):
         - wraps the body as a buffer object, so that
             :class:`qpid.messaging.endpoints.Sender` uses a content type
             that can support arbitrarily large messages.
-        - assigns a random delivery_tag
+        - sets delivery_tag to a random uuid.UUID
         - sets the exchange and routing_key info as delivery_info
 
         Internally uses :meth:`_put` to send the message synchronously. This
@@ -1094,7 +1089,7 @@ class Channel(base.StdChannel):
         props = message['properties']
         props.update(
             body_encoding=body_encoding,
-            delivery_tag=random.randint(1, sys.maxint),
+            delivery_tag=uuid.uuid4(),
         )
         props['delivery_info'].update(
             exchange=exchange,
-- 
2.4.3