27f258
From 5ef3cba9682fd7b12493af6db4628ae2962d6998 Mon Sep 17 00:00:00 2001
27f258
From: Brian Bouterse <bmbouter@gmail.com>
27f258
Date: Fri, 22 Jan 2016 16:39:13 -0500
27f258
Subject: [PATCH 1/4] @acks_late usage in Qpid Transport now acks all messages
27f258
27f258
Implements a workaround for celery/celery#3019
27f258
---
27f258
 kombu/transport/qpid.py | 5 +++++
27f258
 1 file changed, 5 insertions(+)
27f258
27f258
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
27f258
index 639d837..8b6301e 100644
27f258
--- a/kombu/transport/qpid.py
27f258
+++ b/kombu/transport/qpid.py
27f258
@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the
27f258
 from __future__ import absolute_import
27f258
 
27f258
 import os
27f258
+import random
27f258
 import select
27f258
 import socket
27f258
 import ssl
27f258
@@ -938,6 +939,10 @@ class Channel(base.StdChannel):
27f258
 
27f258
         def _callback(qpid_message):
27f258
             raw_message = qpid_message.content
27f258
+
27f258
+            # workaround for https://github.com/celery/celery/issues/3019
27f258
+            raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000)
27f258
+
27f258
             message = self.Message(self, raw_message)
27f258
             delivery_tag = message.delivery_tag
27f258
             self.qos.append(qpid_message, delivery_tag)
27f258
-- 
27f258
2.4.3
27f258
27f258
27f258
From f7483a3cde70e488e308132295c23c39ee469092 Mon Sep 17 00:00:00 2001
27f258
From: Brian Bouterse <bmbouter@gmail.com>
27f258
Date: Mon, 25 Jan 2016 11:43:20 -0500
27f258
Subject: [PATCH 2/4] Revert "@acks_late usage in Qpid Transport now acks all
27f258
 messages"
27f258
27f258
This reverts commit 5ef3cba9682fd7b12493af6db4628ae2962d6998.
27f258
---
27f258
 kombu/transport/qpid.py | 5 -----
27f258
 1 file changed, 5 deletions(-)
27f258
27f258
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
27f258
index 8b6301e..639d837 100644
27f258
--- a/kombu/transport/qpid.py
27f258
+++ b/kombu/transport/qpid.py
27f258
@@ -74,7 +74,6 @@ Celery with Kombu, this can be accomplished by setting the
27f258
 from __future__ import absolute_import
27f258
 
27f258
 import os
27f258
-import random
27f258
 import select
27f258
 import socket
27f258
 import ssl
27f258
@@ -939,10 +938,6 @@ class Channel(base.StdChannel):
27f258
 
27f258
         def _callback(qpid_message):
27f258
             raw_message = qpid_message.content
27f258
-
27f258
-            # workaround for https://github.com/celery/celery/issues/3019
27f258
-            raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000)
27f258
-
27f258
             message = self.Message(self, raw_message)
27f258
             delivery_tag = message.delivery_tag
27f258
             self.qos.append(qpid_message, delivery_tag)
27f258
-- 
27f258
2.4.3
27f258
27f258
27f258
From affa5f5e09c75c660f5ffbafd8aaedc7b8cdae5e Mon Sep 17 00:00:00 2001
27f258
From: Brian Bouterse <bmbouter@gmail.com>
27f258
Date: Mon, 25 Jan 2016 11:54:45 -0500
27f258
Subject: [PATCH 3/4] @acks_late usage in Qpid Transport now acks all messages
27f258
27f258
Implements a workaround for celery/celery#3019
27f258
---
27f258
 kombu/tests/transport/test_qpid.py | 4 ----
27f258
 kombu/transport/qpid.py            | 8 +++-----
27f258
 2 files changed, 3 insertions(+), 9 deletions(-)
27f258
27f258
diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
27f258
index 4131193..a72077b 100644
27f258
--- a/kombu/tests/transport/test_qpid.py
27f258
+++ b/kombu/tests/transport/test_qpid.py
27f258
@@ -940,10 +940,6 @@ class TestChannel(ExtraAssertionsMixin, Case):
27f258
         self.assertIn('base64', Channel.codecs)
27f258
         self.assertIsInstance(Channel.codecs['base64'], Base64)
27f258
 
27f258
-    def test_delivery_tags(self):
27f258
-        """Test that _delivery_tags is using itertools"""
27f258
-        self.assertIsInstance(Channel._delivery_tags, count)
27f258
-
27f258
     def test_size(self):
27f258
         """Test getting the number of messages in a queue specified by
27f258
         name and returning them."""
27f258
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
27f258
index 639d837..6d9b006 100644
27f258
--- a/kombu/transport/qpid.py
27f258
+++ b/kombu/transport/qpid.py
27f258
@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the
27f258
 from __future__ import absolute_import
27f258
 
27f258
 import os
27f258
+import random
27f258
 import select
27f258
 import socket
27f258
 import ssl
27f258
@@ -368,9 +369,6 @@ class Channel(base.StdChannel):
27f258
     #: Binary <-> ASCII codecs.
27f258
     codecs = {'base64': Base64()}
27f258
 
27f258
-    #: counter used to generate delivery tags for this channel.
27f258
-    _delivery_tags = count(1)
27f258
-
27f258
     def __init__(self, connection, transport):
27f258
         """Instantiate a Channel object.
27f258
 
27f258
@@ -1070,7 +1068,7 @@ class Channel(base.StdChannel):
27f258
         - wraps the body as a buffer object, so that
27f258
             :class:`qpid.messaging.endpoints.Sender` uses a content type
27f258
             that can support arbitrarily large messages.
27f258
-        - assigns a delivery_tag generated through self._delivery_tags
27f258
+        - assigns a random delivery_tag
27f258
         - sets the exchange and routing_key info as delivery_info
27f258
 
27f258
         Internally uses :meth:`_put` to send the message synchronously. This
27f258
@@ -1096,7 +1094,7 @@ class Channel(base.StdChannel):
27f258
         props = message['properties']
27f258
         props.update(
27f258
             body_encoding=body_encoding,
27f258
-            delivery_tag=next(self._delivery_tags),
27f258
+            delivery_tag=random.randint(1, sys.maxint),
27f258
         )
27f258
         props['delivery_info'].update(
27f258
             exchange=exchange,
27f258
-- 
27f258
2.4.3
27f258
27f258
27f258
From 7d6af48c06002deffc135c7fad506909fbf840e6 Mon Sep 17 00:00:00 2001
27f258
From: Brian Bouterse <bmbouter@gmail.com>
27f258
Date: Mon, 25 Jan 2016 16:07:53 -0500
27f258
Subject: [PATCH 4/4] Switches delivery_tag to uuid.uuid4() for Qpid transport
27f258
27f258
celery/kombu#563
27f258
---
27f258
 kombu/tests/transport/test_qpid.py |  3 ++-
27f258
 kombu/transport/qpid.py            | 43 +++++++++++++++++---------------------
27f258
 2 files changed, 21 insertions(+), 25 deletions(-)
27f258
27f258
diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
27f258
index a72077b..a3894e4 100644
27f258
--- a/kombu/tests/transport/test_qpid.py
27f258
+++ b/kombu/tests/transport/test_qpid.py
27f258
@@ -5,6 +5,7 @@ import ssl
27f258
 import socket
27f258
 import sys
27f258
 import time
27f258
+import uuid
27f258
 
27f258
 from collections import Callable
27f258
 from itertools import count
27f258
@@ -1317,7 +1318,7 @@ class TestChannel(ExtraAssertionsMixin, Case):
27f258
             mock_message['properties']['body_encoding'], mock_body_encoding,
27f258
         )
27f258
         self.assertIsInstance(
27f258
-            mock_message['properties']['delivery_tag'], int,
27f258
+            mock_message['properties']['delivery_tag'], uuid.UUID,
27f258
         )
27f258
         self.assertIs(
27f258
             mock_message['properties']['delivery_info']['exchange'],
27f258
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
27f258
index 6d9b006..b458d32 100644
27f258
--- a/kombu/transport/qpid.py
27f258
+++ b/kombu/transport/qpid.py
27f258
@@ -74,14 +74,13 @@ Celery with Kombu, this can be accomplished by setting the
27f258
 from __future__ import absolute_import
27f258
 
27f258
 import os
27f258
-import random
27f258
 import select
27f258
 import socket
27f258
 import ssl
27f258
 import sys
27f258
 import time
27f258
+import uuid
27f258
 
27f258
-from itertools import count
27f258
 from gettext import gettext as _
27f258
 
27f258
 import amqp.protocol
27f258
@@ -160,7 +159,7 @@ class QoS(object):
27f258
     ACKed asynchronously through a call to :meth:`ack`. Messages that are
27f258
     received, but not ACKed will not be delivered by the broker to another
27f258
     consumer until an ACK is received, or the session is closed. Messages
27f258
-    are referred to using delivery_tag integers, which are unique per
27f258
+    are referred to using delivery_tag, which are unique per
27f258
     :class:`Channel`. Delivery tags are managed outside of this object and
27f258
     are passed in with a message to :meth:`append`. Un-ACKed messages can
27f258
     be looked up from QoS using :meth:`get` and can be rejected and
27f258
@@ -214,15 +213,15 @@ class QoS(object):
27f258
     def append(self, message, delivery_tag):
27f258
         """Append message to the list of un-ACKed messages.
27f258
 
27f258
-        Add a message, referenced by the integer delivery_tag, for ACKing,
27f258
+        Add a message, referenced by the delivery_tag, for ACKing,
27f258
         rejecting, or getting later. Messages are saved into an
27f258
         :class:`~kombu.utils.compat.OrderedDict` by delivery_tag.
27f258
 
27f258
         :param message: A received message that has not yet been ACKed.
27f258
         :type message: qpid.messaging.Message
27f258
-        :param delivery_tag: An integer number to refer to this message by
27f258
+        :param delivery_tag: A UUID to refer to this message by
27f258
             upon receipt.
27f258
-        :type delivery_tag: int
27f258
+        :type delivery_tag: uuid.UUID
27f258
         """
27f258
         self._not_yet_acked[delivery_tag] = message
27f258
 
27f258
@@ -233,7 +232,7 @@ class QoS(object):
27f258
 
27f258
         :param delivery_tag: The delivery tag associated with the message
27f258
             to be returned.
27f258
-        :type delivery_tag: int
27f258
+        :type delivery_tag: uuid.UUID
27f258
 
27f258
         :return: An un-ACKed message that is looked up by delivery_tag.
27f258
         :rtype: qpid.messaging.Message
27f258
@@ -248,7 +247,7 @@ class QoS(object):
27f258
 
27f258
         :param delivery_tag: the delivery tag associated with the message
27f258
             to be acknowledged.
27f258
-        :type delivery_tag: int
27f258
+        :type delivery_tag: uuid.UUID
27f258
         """
27f258
         message = self._not_yet_acked.pop(delivery_tag)
27f258
         self.session.acknowledge(message=message)
27f258
@@ -266,7 +265,7 @@ class QoS(object):
27f258
 
27f258
         :param delivery_tag: The delivery tag associated with the message
27f258
             to be rejected.
27f258
-        :type delivery_tag: int
27f258
+        :type delivery_tag: uuid.UUID
27f258
         :keyword requeue: If True, the broker will be notified to requeue
27f258
             the message. If False, the broker will be told to drop the
27f258
             message entirely. In both cases, the message will be removed
27f258
@@ -311,10 +310,9 @@ class Channel(base.StdChannel):
27f258
     Messages sent using this Channel are assigned a delivery_tag. The
27f258
     delivery_tag is generated for a message as they are prepared for
27f258
     sending by :meth:`basic_publish`. The delivery_tag is unique per
27f258
-    Channel instance using :meth:`~itertools.count`. The delivery_tag has
27f258
-    no meaningful context in other objects, and is only maintained in the
27f258
-    memory of this object, and the underlying :class:`QoS` object that
27f258
-    provides support.
27f258
+    Channel instance. The delivery_tag has no meaningful context in other
27f258
+    objects, and is only maintained in the memory of this object, and the
27f258
+    underlying :class:`QoS` object that provides support.
27f258
 
27f258
     Each Channel object instantiates exactly one :class:`QoS` object for
27f258
     prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is
27f258
@@ -842,7 +840,7 @@ class Channel(base.StdChannel):
27f258
 
27f258
         :param delivery_tag: The delivery tag associated with the message
27f258
             to be acknowledged.
27f258
-        :type delivery_tag: int
27f258
+        :type delivery_tag: uuid.UUID
27f258
         """
27f258
         self.qos.ack(delivery_tag)
27f258
 
27f258
@@ -860,7 +858,7 @@ class Channel(base.StdChannel):
27f258
 
27f258
         :param delivery_tag: The delivery tag associated with the message
27f258
             to be rejected.
27f258
-        :type delivery_tag: int
27f258
+        :type delivery_tag: uuid.UUID
27f258
         :keyword requeue: If False, the rejected message will be dropped by
27f258
             the broker and not delivered to any other consumers. If True,
27f258
             then the rejected message will be requeued for delivery to
27f258
@@ -901,10 +899,9 @@ class Channel(base.StdChannel):
27f258
         handled by the caller of :meth:`~Transport.drain_events`. Messages
27f258
         can be ACKed after being received through a call to :meth:`basic_ack`.
27f258
 
27f258
-        If no_ack is True, the messages are immediately ACKed to avoid a
27f258
-        memory leak in qpid.messaging when messages go un-ACKed. The no_ack
27f258
-        flag indicates that the receiver of the message does not intent to
27f258
-        call :meth:`basic_ack`.
27f258
+        If no_ack is True, The no_ack flag indicates that the receiver of
27f258
+        the message will not call :meth:`basic_ack` later. Since the
27f258
+        message will not be ACKed later, it is ACKed immediately.
27f258
 
27f258
         :meth:`basic_consume` transforms the message object type prior to
27f258
         calling the callback. Initially the message comes in as a
27f258
@@ -940,9 +937,7 @@ class Channel(base.StdChannel):
27f258
             delivery_tag = message.delivery_tag
27f258
             self.qos.append(qpid_message, delivery_tag)
27f258
             if no_ack:
27f258
-                # Celery will not ack this message later, so we should to
27f258
-                # avoid a memory leak in qpid.messaging due to un-ACKed
27f258
-                # messages.
27f258
+                # Celery will not ack this message later, so we should ack now
27f258
                 self.basic_ack(delivery_tag)
27f258
             return callback(message)
27f258
 
27f258
@@ -1068,7 +1063,7 @@ class Channel(base.StdChannel):
27f258
         - wraps the body as a buffer object, so that
27f258
             :class:`qpid.messaging.endpoints.Sender` uses a content type
27f258
             that can support arbitrarily large messages.
27f258
-        - assigns a random delivery_tag
27f258
+        - sets delivery_tag to a random uuid.UUID
27f258
         - sets the exchange and routing_key info as delivery_info
27f258
 
27f258
         Internally uses :meth:`_put` to send the message synchronously. This
27f258
@@ -1094,7 +1089,7 @@ class Channel(base.StdChannel):
27f258
         props = message['properties']
27f258
         props.update(
27f258
             body_encoding=body_encoding,
27f258
-            delivery_tag=random.randint(1, sys.maxint),
27f258
+            delivery_tag=uuid.uuid4(),
27f258
         )
27f258
         props['delivery_info'].update(
27f258
             exchange=exchange,
27f258
-- 
27f258
2.4.3
27f258