From 5ef3cba9682fd7b12493af6db4628ae2962d6998 Mon Sep 17 00:00:00 2001
From: Brian Bouterse <bmbouter@gmail.com>
Date: Fri, 22 Jan 2016 16:39:13 -0500
Subject: [PATCH 1/4] @acks_late usage in Qpid Transport now acks all messages
Implements a workaround for celery/celery#3019
---
kombu/transport/qpid.py | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 639d837..8b6301e 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the
from __future__ import absolute_import
import os
+import random
import select
import socket
import ssl
@@ -938,6 +939,10 @@ class Channel(base.StdChannel):
def _callback(qpid_message):
raw_message = qpid_message.content
+
+ # workaround for https://github.com/celery/celery/issues/3019
+ raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000)
+
message = self.Message(self, raw_message)
delivery_tag = message.delivery_tag
self.qos.append(qpid_message, delivery_tag)
--
2.4.3
From f7483a3cde70e488e308132295c23c39ee469092 Mon Sep 17 00:00:00 2001
From: Brian Bouterse <bmbouter@gmail.com>
Date: Mon, 25 Jan 2016 11:43:20 -0500
Subject: [PATCH 2/4] Revert "@acks_late usage in Qpid Transport now acks all
messages"
This reverts commit 5ef3cba9682fd7b12493af6db4628ae2962d6998.
---
kombu/transport/qpid.py | 5 -----
1 file changed, 5 deletions(-)
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 8b6301e..639d837 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -74,7 +74,6 @@ Celery with Kombu, this can be accomplished by setting the
from __future__ import absolute_import
import os
-import random
import select
import socket
import ssl
@@ -939,10 +938,6 @@ class Channel(base.StdChannel):
def _callback(qpid_message):
raw_message = qpid_message.content
-
- # workaround for https://github.com/celery/celery/issues/3019
- raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000)
-
message = self.Message(self, raw_message)
delivery_tag = message.delivery_tag
self.qos.append(qpid_message, delivery_tag)
--
2.4.3
From affa5f5e09c75c660f5ffbafd8aaedc7b8cdae5e Mon Sep 17 00:00:00 2001
From: Brian Bouterse <bmbouter@gmail.com>
Date: Mon, 25 Jan 2016 11:54:45 -0500
Subject: [PATCH 3/4] @acks_late usage in Qpid Transport now acks all messages
Implements a workaround for celery/celery#3019
---
kombu/tests/transport/test_qpid.py | 4 ----
kombu/transport/qpid.py | 8 +++-----
2 files changed, 3 insertions(+), 9 deletions(-)
diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
index 4131193..a72077b 100644
--- a/kombu/tests/transport/test_qpid.py
+++ b/kombu/tests/transport/test_qpid.py
@@ -940,10 +940,6 @@ class TestChannel(ExtraAssertionsMixin, Case):
self.assertIn('base64', Channel.codecs)
self.assertIsInstance(Channel.codecs['base64'], Base64)
- def test_delivery_tags(self):
- """Test that _delivery_tags is using itertools"""
- self.assertIsInstance(Channel._delivery_tags, count)
-
def test_size(self):
"""Test getting the number of messages in a queue specified by
name and returning them."""
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 639d837..6d9b006 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the
from __future__ import absolute_import
import os
+import random
import select
import socket
import ssl
@@ -368,9 +369,6 @@ class Channel(base.StdChannel):
#: Binary <-> ASCII codecs.
codecs = {'base64': Base64()}
- #: counter used to generate delivery tags for this channel.
- _delivery_tags = count(1)
-
def __init__(self, connection, transport):
"""Instantiate a Channel object.
@@ -1070,7 +1068,7 @@ class Channel(base.StdChannel):
- wraps the body as a buffer object, so that
:class:`qpid.messaging.endpoints.Sender` uses a content type
that can support arbitrarily large messages.
- - assigns a delivery_tag generated through self._delivery_tags
+ - assigns a random delivery_tag
- sets the exchange and routing_key info as delivery_info
Internally uses :meth:`_put` to send the message synchronously. This
@@ -1096,7 +1094,7 @@ class Channel(base.StdChannel):
props = message['properties']
props.update(
body_encoding=body_encoding,
- delivery_tag=next(self._delivery_tags),
+ delivery_tag=random.randint(1, sys.maxint),
)
props['delivery_info'].update(
exchange=exchange,
--
2.4.3
From 7d6af48c06002deffc135c7fad506909fbf840e6 Mon Sep 17 00:00:00 2001
From: Brian Bouterse <bmbouter@gmail.com>
Date: Mon, 25 Jan 2016 16:07:53 -0500
Subject: [PATCH 4/4] Switches delivery_tag to uuid.uuid4() for Qpid transport
celery/kombu#563
---
kombu/tests/transport/test_qpid.py | 3 ++-
kombu/transport/qpid.py | 43 +++++++++++++++++---------------------
2 files changed, 21 insertions(+), 25 deletions(-)
diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
index a72077b..a3894e4 100644
--- a/kombu/tests/transport/test_qpid.py
+++ b/kombu/tests/transport/test_qpid.py
@@ -5,6 +5,7 @@ import ssl
import socket
import sys
import time
+import uuid
from collections import Callable
from itertools import count
@@ -1317,7 +1318,7 @@ class TestChannel(ExtraAssertionsMixin, Case):
mock_message['properties']['body_encoding'], mock_body_encoding,
)
self.assertIsInstance(
- mock_message['properties']['delivery_tag'], int,
+ mock_message['properties']['delivery_tag'], uuid.UUID,
)
self.assertIs(
mock_message['properties']['delivery_info']['exchange'],
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 6d9b006..b458d32 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -74,14 +74,13 @@ Celery with Kombu, this can be accomplished by setting the
from __future__ import absolute_import
import os
-import random
import select
import socket
import ssl
import sys
import time
+import uuid
-from itertools import count
from gettext import gettext as _
import amqp.protocol
@@ -160,7 +159,7 @@ class QoS(object):
ACKed asynchronously through a call to :meth:`ack`. Messages that are
received, but not ACKed will not be delivered by the broker to another
consumer until an ACK is received, or the session is closed. Messages
- are referred to using delivery_tag integers, which are unique per
+ are referred to using delivery_tag, which are unique per
:class:`Channel`. Delivery tags are managed outside of this object and
are passed in with a message to :meth:`append`. Un-ACKed messages can
be looked up from QoS using :meth:`get` and can be rejected and
@@ -214,15 +213,15 @@ class QoS(object):
def append(self, message, delivery_tag):
"""Append message to the list of un-ACKed messages.
- Add a message, referenced by the integer delivery_tag, for ACKing,
+ Add a message, referenced by the delivery_tag, for ACKing,
rejecting, or getting later. Messages are saved into an
:class:`~kombu.utils.compat.OrderedDict` by delivery_tag.
:param message: A received message that has not yet been ACKed.
:type message: qpid.messaging.Message
- :param delivery_tag: An integer number to refer to this message by
+ :param delivery_tag: A UUID to refer to this message by
upon receipt.
- :type delivery_tag: int
+ :type delivery_tag: uuid.UUID
"""
self._not_yet_acked[delivery_tag] = message
@@ -233,7 +232,7 @@ class QoS(object):
:param delivery_tag: The delivery tag associated with the message
to be returned.
- :type delivery_tag: int
+ :type delivery_tag: uuid.UUID
:return: An un-ACKed message that is looked up by delivery_tag.
:rtype: qpid.messaging.Message
@@ -248,7 +247,7 @@ class QoS(object):
:param delivery_tag: the delivery tag associated with the message
to be acknowledged.
- :type delivery_tag: int
+ :type delivery_tag: uuid.UUID
"""
message = self._not_yet_acked.pop(delivery_tag)
self.session.acknowledge(message=message)
@@ -266,7 +265,7 @@ class QoS(object):
:param delivery_tag: The delivery tag associated with the message
to be rejected.
- :type delivery_tag: int
+ :type delivery_tag: uuid.UUID
:keyword requeue: If True, the broker will be notified to requeue
the message. If False, the broker will be told to drop the
message entirely. In both cases, the message will be removed
@@ -311,10 +310,9 @@ class Channel(base.StdChannel):
Messages sent using this Channel are assigned a delivery_tag. The
delivery_tag is generated for a message as they are prepared for
sending by :meth:`basic_publish`. The delivery_tag is unique per
- Channel instance using :meth:`~itertools.count`. The delivery_tag has
- no meaningful context in other objects, and is only maintained in the
- memory of this object, and the underlying :class:`QoS` object that
- provides support.
+ Channel instance. The delivery_tag has no meaningful context in other
+ objects, and is only maintained in the memory of this object, and the
+ underlying :class:`QoS` object that provides support.
Each Channel object instantiates exactly one :class:`QoS` object for
prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is
@@ -842,7 +840,7 @@ class Channel(base.StdChannel):
:param delivery_tag: The delivery tag associated with the message
to be acknowledged.
- :type delivery_tag: int
+ :type delivery_tag: uuid.UUID
"""
self.qos.ack(delivery_tag)
@@ -860,7 +858,7 @@ class Channel(base.StdChannel):
:param delivery_tag: The delivery tag associated with the message
to be rejected.
- :type delivery_tag: int
+ :type delivery_tag: uuid.UUID
:keyword requeue: If False, the rejected message will be dropped by
the broker and not delivered to any other consumers. If True,
then the rejected message will be requeued for delivery to
@@ -901,10 +899,9 @@ class Channel(base.StdChannel):
handled by the caller of :meth:`~Transport.drain_events`. Messages
can be ACKed after being received through a call to :meth:`basic_ack`.
- If no_ack is True, the messages are immediately ACKed to avoid a
- memory leak in qpid.messaging when messages go un-ACKed. The no_ack
- flag indicates that the receiver of the message does not intent to
- call :meth:`basic_ack`.
+ If no_ack is True, The no_ack flag indicates that the receiver of
+ the message will not call :meth:`basic_ack` later. Since the
+ message will not be ACKed later, it is ACKed immediately.
:meth:`basic_consume` transforms the message object type prior to
calling the callback. Initially the message comes in as a
@@ -940,9 +937,7 @@ class Channel(base.StdChannel):
delivery_tag = message.delivery_tag
self.qos.append(qpid_message, delivery_tag)
if no_ack:
- # Celery will not ack this message later, so we should to
- # avoid a memory leak in qpid.messaging due to un-ACKed
- # messages.
+ # Celery will not ack this message later, so we should ack now
self.basic_ack(delivery_tag)
return callback(message)
@@ -1068,7 +1063,7 @@ class Channel(base.StdChannel):
- wraps the body as a buffer object, so that
:class:`qpid.messaging.endpoints.Sender` uses a content type
that can support arbitrarily large messages.
- - assigns a random delivery_tag
+ - sets delivery_tag to a random uuid.UUID
- sets the exchange and routing_key info as delivery_info
Internally uses :meth:`_put` to send the message synchronously. This
@@ -1094,7 +1089,7 @@ class Channel(base.StdChannel):
props = message['properties']
props.update(
body_encoding=body_encoding,
- delivery_tag=random.randint(1, sys.maxint),
+ delivery_tag=uuid.uuid4(),
)
props['delivery_info'].update(
exchange=exchange,
--
2.4.3