27f258f
From 5ef3cba9682fd7b12493af6db4628ae2962d6998 Mon Sep 17 00:00:00 2001
27f258f
From: Brian Bouterse <bmbouter@gmail.com>
27f258f
Date: Fri, 22 Jan 2016 16:39:13 -0500
27f258f
Subject: [PATCH 1/4] @acks_late usage in Qpid Transport now acks all messages
27f258f
27f258f
Implements a workaround for celery/celery#3019
27f258f
---
27f258f
 kombu/transport/qpid.py | 5 +++++
27f258f
 1 file changed, 5 insertions(+)
27f258f
27f258f
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
27f258f
index 639d837..8b6301e 100644
27f258f
--- a/kombu/transport/qpid.py
27f258f
+++ b/kombu/transport/qpid.py
27f258f
@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the
27f258f
 from __future__ import absolute_import
27f258f
 
27f258f
 import os
27f258f
+import random
27f258f
 import select
27f258f
 import socket
27f258f
 import ssl
27f258f
@@ -938,6 +939,10 @@ class Channel(base.StdChannel):
27f258f
 
27f258f
         def _callback(qpid_message):
27f258f
             raw_message = qpid_message.content
27f258f
+
27f258f
+            # workaround for https://github.com/celery/celery/issues/3019
27f258f
+            raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000)
27f258f
+
27f258f
             message = self.Message(self, raw_message)
27f258f
             delivery_tag = message.delivery_tag
27f258f
             self.qos.append(qpid_message, delivery_tag)
27f258f
-- 
27f258f
2.4.3
27f258f
27f258f
27f258f
From f7483a3cde70e488e308132295c23c39ee469092 Mon Sep 17 00:00:00 2001
27f258f
From: Brian Bouterse <bmbouter@gmail.com>
27f258f
Date: Mon, 25 Jan 2016 11:43:20 -0500
27f258f
Subject: [PATCH 2/4] Revert "@acks_late usage in Qpid Transport now acks all
27f258f
 messages"
27f258f
27f258f
This reverts commit 5ef3cba9682fd7b12493af6db4628ae2962d6998.
27f258f
---
27f258f
 kombu/transport/qpid.py | 5 -----
27f258f
 1 file changed, 5 deletions(-)
27f258f
27f258f
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
27f258f
index 8b6301e..639d837 100644
27f258f
--- a/kombu/transport/qpid.py
27f258f
+++ b/kombu/transport/qpid.py
27f258f
@@ -74,7 +74,6 @@ Celery with Kombu, this can be accomplished by setting the
27f258f
 from __future__ import absolute_import
27f258f
 
27f258f
 import os
27f258f
-import random
27f258f
 import select
27f258f
 import socket
27f258f
 import ssl
27f258f
@@ -939,10 +938,6 @@ class Channel(base.StdChannel):
27f258f
 
27f258f
         def _callback(qpid_message):
27f258f
             raw_message = qpid_message.content
27f258f
-
27f258f
-            # workaround for https://github.com/celery/celery/issues/3019
27f258f
-            raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000)
27f258f
-
27f258f
             message = self.Message(self, raw_message)
27f258f
             delivery_tag = message.delivery_tag
27f258f
             self.qos.append(qpid_message, delivery_tag)
27f258f
-- 
27f258f
2.4.3
27f258f
27f258f
27f258f
From affa5f5e09c75c660f5ffbafd8aaedc7b8cdae5e Mon Sep 17 00:00:00 2001
27f258f
From: Brian Bouterse <bmbouter@gmail.com>
27f258f
Date: Mon, 25 Jan 2016 11:54:45 -0500
27f258f
Subject: [PATCH 3/4] @acks_late usage in Qpid Transport now acks all messages
27f258f
27f258f
Implements a workaround for celery/celery#3019
27f258f
---
27f258f
 kombu/tests/transport/test_qpid.py | 4 ----
27f258f
 kombu/transport/qpid.py            | 8 +++-----
27f258f
 2 files changed, 3 insertions(+), 9 deletions(-)
27f258f
27f258f
diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
27f258f
index 4131193..a72077b 100644
27f258f
--- a/kombu/tests/transport/test_qpid.py
27f258f
+++ b/kombu/tests/transport/test_qpid.py
27f258f
@@ -940,10 +940,6 @@ class TestChannel(ExtraAssertionsMixin, Case):
27f258f
         self.assertIn('base64', Channel.codecs)
27f258f
         self.assertIsInstance(Channel.codecs['base64'], Base64)
27f258f
 
27f258f
-    def test_delivery_tags(self):
27f258f
-        """Test that _delivery_tags is using itertools"""
27f258f
-        self.assertIsInstance(Channel._delivery_tags, count)
27f258f
-
27f258f
     def test_size(self):
27f258f
         """Test getting the number of messages in a queue specified by
27f258f
         name and returning them."""
27f258f
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
27f258f
index 639d837..6d9b006 100644
27f258f
--- a/kombu/transport/qpid.py
27f258f
+++ b/kombu/transport/qpid.py
27f258f
@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the
27f258f
 from __future__ import absolute_import
27f258f
 
27f258f
 import os
27f258f
+import random
27f258f
 import select
27f258f
 import socket
27f258f
 import ssl
27f258f
@@ -368,9 +369,6 @@ class Channel(base.StdChannel):
27f258f
     #: Binary <-> ASCII codecs.
27f258f
     codecs = {'base64': Base64()}
27f258f
 
27f258f
-    #: counter used to generate delivery tags for this channel.
27f258f
-    _delivery_tags = count(1)
27f258f
-
27f258f
     def __init__(self, connection, transport):
27f258f
         """Instantiate a Channel object.
27f258f
 
27f258f
@@ -1070,7 +1068,7 @@ class Channel(base.StdChannel):
27f258f
         - wraps the body as a buffer object, so that
27f258f
             :class:`qpid.messaging.endpoints.Sender` uses a content type
27f258f
             that can support arbitrarily large messages.
27f258f
-        - assigns a delivery_tag generated through self._delivery_tags
27f258f
+        - assigns a random delivery_tag
27f258f
         - sets the exchange and routing_key info as delivery_info
27f258f
 
27f258f
         Internally uses :meth:`_put` to send the message synchronously. This
27f258f
@@ -1096,7 +1094,7 @@ class Channel(base.StdChannel):
27f258f
         props = message['properties']
27f258f
         props.update(
27f258f
             body_encoding=body_encoding,
27f258f
-            delivery_tag=next(self._delivery_tags),
27f258f
+            delivery_tag=random.randint(1, sys.maxint),
27f258f
         )
27f258f
         props['delivery_info'].update(
27f258f
             exchange=exchange,
27f258f
-- 
27f258f
2.4.3
27f258f
27f258f
27f258f
From 7d6af48c06002deffc135c7fad506909fbf840e6 Mon Sep 17 00:00:00 2001
27f258f
From: Brian Bouterse <bmbouter@gmail.com>
27f258f
Date: Mon, 25 Jan 2016 16:07:53 -0500
27f258f
Subject: [PATCH 4/4] Switches delivery_tag to uuid.uuid4() for Qpid transport
27f258f
27f258f
celery/kombu#563
27f258f
---
27f258f
 kombu/tests/transport/test_qpid.py |  3 ++-
27f258f
 kombu/transport/qpid.py            | 43 +++++++++++++++++---------------------
27f258f
 2 files changed, 21 insertions(+), 25 deletions(-)
27f258f
27f258f
diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
27f258f
index a72077b..a3894e4 100644
27f258f
--- a/kombu/tests/transport/test_qpid.py
27f258f
+++ b/kombu/tests/transport/test_qpid.py
27f258f
@@ -5,6 +5,7 @@ import ssl
27f258f
 import socket
27f258f
 import sys
27f258f
 import time
27f258f
+import uuid
27f258f
 
27f258f
 from collections import Callable
27f258f
 from itertools import count
27f258f
@@ -1317,7 +1318,7 @@ class TestChannel(ExtraAssertionsMixin, Case):
27f258f
             mock_message['properties']['body_encoding'], mock_body_encoding,
27f258f
         )
27f258f
         self.assertIsInstance(
27f258f
-            mock_message['properties']['delivery_tag'], int,
27f258f
+            mock_message['properties']['delivery_tag'], uuid.UUID,
27f258f
         )
27f258f
         self.assertIs(
27f258f
             mock_message['properties']['delivery_info']['exchange'],
27f258f
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
27f258f
index 6d9b006..b458d32 100644
27f258f
--- a/kombu/transport/qpid.py
27f258f
+++ b/kombu/transport/qpid.py
27f258f
@@ -74,14 +74,13 @@ Celery with Kombu, this can be accomplished by setting the
27f258f
 from __future__ import absolute_import
27f258f
 
27f258f
 import os
27f258f
-import random
27f258f
 import select
27f258f
 import socket
27f258f
 import ssl
27f258f
 import sys
27f258f
 import time
27f258f
+import uuid
27f258f
 
27f258f
-from itertools import count
27f258f
 from gettext import gettext as _
27f258f
 
27f258f
 import amqp.protocol
27f258f
@@ -160,7 +159,7 @@ class QoS(object):
27f258f
     ACKed asynchronously through a call to :meth:`ack`. Messages that are
27f258f
     received, but not ACKed will not be delivered by the broker to another
27f258f
     consumer until an ACK is received, or the session is closed. Messages
27f258f
-    are referred to using delivery_tag integers, which are unique per
27f258f
+    are referred to using delivery_tag, which are unique per
27f258f
     :class:`Channel`. Delivery tags are managed outside of this object and
27f258f
     are passed in with a message to :meth:`append`. Un-ACKed messages can
27f258f
     be looked up from QoS using :meth:`get` and can be rejected and
27f258f
@@ -214,15 +213,15 @@ class QoS(object):
27f258f
     def append(self, message, delivery_tag):
27f258f
         """Append message to the list of un-ACKed messages.
27f258f
 
27f258f
-        Add a message, referenced by the integer delivery_tag, for ACKing,
27f258f
+        Add a message, referenced by the delivery_tag, for ACKing,
27f258f
         rejecting, or getting later. Messages are saved into an
27f258f
         :class:`~kombu.utils.compat.OrderedDict` by delivery_tag.
27f258f
 
27f258f
         :param message: A received message that has not yet been ACKed.
27f258f
         :type message: qpid.messaging.Message
27f258f
-        :param delivery_tag: An integer number to refer to this message by
27f258f
+        :param delivery_tag: A UUID to refer to this message by
27f258f
             upon receipt.
27f258f
-        :type delivery_tag: int
27f258f
+        :type delivery_tag: uuid.UUID
27f258f
         """
27f258f
         self._not_yet_acked[delivery_tag] = message
27f258f
 
27f258f
@@ -233,7 +232,7 @@ class QoS(object):
27f258f
 
27f258f
         :param delivery_tag: The delivery tag associated with the message
27f258f
             to be returned.
27f258f
-        :type delivery_tag: int
27f258f
+        :type delivery_tag: uuid.UUID
27f258f
 
27f258f
         :return: An un-ACKed message that is looked up by delivery_tag.
27f258f
         :rtype: qpid.messaging.Message
27f258f
@@ -248,7 +247,7 @@ class QoS(object):
27f258f
 
27f258f
         :param delivery_tag: the delivery tag associated with the message
27f258f
             to be acknowledged.
27f258f
-        :type delivery_tag: int
27f258f
+        :type delivery_tag: uuid.UUID
27f258f
         """
27f258f
         message = self._not_yet_acked.pop(delivery_tag)
27f258f
         self.session.acknowledge(message=message)
27f258f
@@ -266,7 +265,7 @@ class QoS(object):
27f258f
 
27f258f
         :param delivery_tag: The delivery tag associated with the message
27f258f
             to be rejected.
27f258f
-        :type delivery_tag: int
27f258f
+        :type delivery_tag: uuid.UUID
27f258f
         :keyword requeue: If True, the broker will be notified to requeue
27f258f
             the message. If False, the broker will be told to drop the
27f258f
             message entirely. In both cases, the message will be removed
27f258f
@@ -311,10 +310,9 @@ class Channel(base.StdChannel):
27f258f
     Messages sent using this Channel are assigned a delivery_tag. The
27f258f
     delivery_tag is generated for a message as they are prepared for
27f258f
     sending by :meth:`basic_publish`. The delivery_tag is unique per
27f258f
-    Channel instance using :meth:`~itertools.count`. The delivery_tag has
27f258f
-    no meaningful context in other objects, and is only maintained in the
27f258f
-    memory of this object, and the underlying :class:`QoS` object that
27f258f
-    provides support.
27f258f
+    Channel instance. The delivery_tag has no meaningful context in other
27f258f
+    objects, and is only maintained in the memory of this object, and the
27f258f
+    underlying :class:`QoS` object that provides support.
27f258f
 
27f258f
     Each Channel object instantiates exactly one :class:`QoS` object for
27f258f
     prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is
27f258f
@@ -842,7 +840,7 @@ class Channel(base.StdChannel):
27f258f
 
27f258f
         :param delivery_tag: The delivery tag associated with the message
27f258f
             to be acknowledged.
27f258f
-        :type delivery_tag: int
27f258f
+        :type delivery_tag: uuid.UUID
27f258f
         """
27f258f
         self.qos.ack(delivery_tag)
27f258f
 
27f258f
@@ -860,7 +858,7 @@ class Channel(base.StdChannel):
27f258f
 
27f258f
         :param delivery_tag: The delivery tag associated with the message
27f258f
             to be rejected.
27f258f
-        :type delivery_tag: int
27f258f
+        :type delivery_tag: uuid.UUID
27f258f
         :keyword requeue: If False, the rejected message will be dropped by
27f258f
             the broker and not delivered to any other consumers. If True,
27f258f
             then the rejected message will be requeued for delivery to
27f258f
@@ -901,10 +899,9 @@ class Channel(base.StdChannel):
27f258f
         handled by the caller of :meth:`~Transport.drain_events`. Messages
27f258f
         can be ACKed after being received through a call to :meth:`basic_ack`.
27f258f
 
27f258f
-        If no_ack is True, the messages are immediately ACKed to avoid a
27f258f
-        memory leak in qpid.messaging when messages go un-ACKed. The no_ack
27f258f
-        flag indicates that the receiver of the message does not intent to
27f258f
-        call :meth:`basic_ack`.
27f258f
+        If no_ack is True, The no_ack flag indicates that the receiver of
27f258f
+        the message will not call :meth:`basic_ack` later. Since the
27f258f
+        message will not be ACKed later, it is ACKed immediately.
27f258f
 
27f258f
         :meth:`basic_consume` transforms the message object type prior to
27f258f
         calling the callback. Initially the message comes in as a
27f258f
@@ -940,9 +937,7 @@ class Channel(base.StdChannel):
27f258f
             delivery_tag = message.delivery_tag
27f258f
             self.qos.append(qpid_message, delivery_tag)
27f258f
             if no_ack:
27f258f
-                # Celery will not ack this message later, so we should to
27f258f
-                # avoid a memory leak in qpid.messaging due to un-ACKed
27f258f
-                # messages.
27f258f
+                # Celery will not ack this message later, so we should ack now
27f258f
                 self.basic_ack(delivery_tag)
27f258f
             return callback(message)
27f258f
 
27f258f
@@ -1068,7 +1063,7 @@ class Channel(base.StdChannel):
27f258f
         - wraps the body as a buffer object, so that
27f258f
             :class:`qpid.messaging.endpoints.Sender` uses a content type
27f258f
             that can support arbitrarily large messages.
27f258f
-        - assigns a random delivery_tag
27f258f
+        - sets delivery_tag to a random uuid.UUID
27f258f
         - sets the exchange and routing_key info as delivery_info
27f258f
 
27f258f
         Internally uses :meth:`_put` to send the message synchronously. This
27f258f
@@ -1094,7 +1089,7 @@ class Channel(base.StdChannel):
27f258f
         props = message['properties']
27f258f
         props.update(
27f258f
             body_encoding=body_encoding,
27f258f
-            delivery_tag=random.randint(1, sys.maxint),
27f258f
+            delivery_tag=uuid.uuid4(),
27f258f
         )
27f258f
         props['delivery_info'].update(
27f258f
             exchange=exchange,
27f258f
-- 
27f258f
2.4.3
27f258f