From 5ef3cba9682fd7b12493af6db4628ae2962d6998 Mon Sep 17 00:00:00 2001 From: Brian Bouterse Date: Fri, 22 Jan 2016 16:39:13 -0500 Subject: [PATCH 1/4] @acks_late usage in Qpid Transport now acks all messages Implements a workaround for celery/celery#3019 --- kombu/transport/qpid.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py index 639d837..8b6301e 100644 --- a/kombu/transport/qpid.py +++ b/kombu/transport/qpid.py @@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the from __future__ import absolute_import import os +import random import select import socket import ssl @@ -938,6 +939,10 @@ class Channel(base.StdChannel): def _callback(qpid_message): raw_message = qpid_message.content + + # workaround for https://github.com/celery/celery/issues/3019 + raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000) + message = self.Message(self, raw_message) delivery_tag = message.delivery_tag self.qos.append(qpid_message, delivery_tag) -- 2.4.3 From f7483a3cde70e488e308132295c23c39ee469092 Mon Sep 17 00:00:00 2001 From: Brian Bouterse Date: Mon, 25 Jan 2016 11:43:20 -0500 Subject: [PATCH 2/4] Revert "@acks_late usage in Qpid Transport now acks all messages" This reverts commit 5ef3cba9682fd7b12493af6db4628ae2962d6998. --- kombu/transport/qpid.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py index 8b6301e..639d837 100644 --- a/kombu/transport/qpid.py +++ b/kombu/transport/qpid.py @@ -74,7 +74,6 @@ Celery with Kombu, this can be accomplished by setting the from __future__ import absolute_import import os -import random import select import socket import ssl @@ -939,10 +938,6 @@ class Channel(base.StdChannel): def _callback(qpid_message): raw_message = qpid_message.content - - # workaround for https://github.com/celery/celery/issues/3019 - raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000) - message = self.Message(self, raw_message) delivery_tag = message.delivery_tag self.qos.append(qpid_message, delivery_tag) -- 2.4.3 From affa5f5e09c75c660f5ffbafd8aaedc7b8cdae5e Mon Sep 17 00:00:00 2001 From: Brian Bouterse Date: Mon, 25 Jan 2016 11:54:45 -0500 Subject: [PATCH 3/4] @acks_late usage in Qpid Transport now acks all messages Implements a workaround for celery/celery#3019 --- kombu/tests/transport/test_qpid.py | 4 ---- kombu/transport/qpid.py | 8 +++----- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py index 4131193..a72077b 100644 --- a/kombu/tests/transport/test_qpid.py +++ b/kombu/tests/transport/test_qpid.py @@ -940,10 +940,6 @@ class TestChannel(ExtraAssertionsMixin, Case): self.assertIn('base64', Channel.codecs) self.assertIsInstance(Channel.codecs['base64'], Base64) - def test_delivery_tags(self): - """Test that _delivery_tags is using itertools""" - self.assertIsInstance(Channel._delivery_tags, count) - def test_size(self): """Test getting the number of messages in a queue specified by name and returning them.""" diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py index 639d837..6d9b006 100644 --- a/kombu/transport/qpid.py +++ b/kombu/transport/qpid.py @@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the from __future__ import absolute_import import os +import random import select import socket import ssl @@ -368,9 +369,6 @@ class Channel(base.StdChannel): #: Binary <-> ASCII codecs. codecs = {'base64': Base64()} - #: counter used to generate delivery tags for this channel. - _delivery_tags = count(1) - def __init__(self, connection, transport): """Instantiate a Channel object. @@ -1070,7 +1068,7 @@ class Channel(base.StdChannel): - wraps the body as a buffer object, so that :class:`qpid.messaging.endpoints.Sender` uses a content type that can support arbitrarily large messages. - - assigns a delivery_tag generated through self._delivery_tags + - assigns a random delivery_tag - sets the exchange and routing_key info as delivery_info Internally uses :meth:`_put` to send the message synchronously. This @@ -1096,7 +1094,7 @@ class Channel(base.StdChannel): props = message['properties'] props.update( body_encoding=body_encoding, - delivery_tag=next(self._delivery_tags), + delivery_tag=random.randint(1, sys.maxint), ) props['delivery_info'].update( exchange=exchange, -- 2.4.3 From 7d6af48c06002deffc135c7fad506909fbf840e6 Mon Sep 17 00:00:00 2001 From: Brian Bouterse Date: Mon, 25 Jan 2016 16:07:53 -0500 Subject: [PATCH 4/4] Switches delivery_tag to uuid.uuid4() for Qpid transport celery/kombu#563 --- kombu/tests/transport/test_qpid.py | 3 ++- kombu/transport/qpid.py | 43 +++++++++++++++++--------------------- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py index a72077b..a3894e4 100644 --- a/kombu/tests/transport/test_qpid.py +++ b/kombu/tests/transport/test_qpid.py @@ -5,6 +5,7 @@ import ssl import socket import sys import time +import uuid from collections import Callable from itertools import count @@ -1317,7 +1318,7 @@ class TestChannel(ExtraAssertionsMixin, Case): mock_message['properties']['body_encoding'], mock_body_encoding, ) self.assertIsInstance( - mock_message['properties']['delivery_tag'], int, + mock_message['properties']['delivery_tag'], uuid.UUID, ) self.assertIs( mock_message['properties']['delivery_info']['exchange'], diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py index 6d9b006..b458d32 100644 --- a/kombu/transport/qpid.py +++ b/kombu/transport/qpid.py @@ -74,14 +74,13 @@ Celery with Kombu, this can be accomplished by setting the from __future__ import absolute_import import os -import random import select import socket import ssl import sys import time +import uuid -from itertools import count from gettext import gettext as _ import amqp.protocol @@ -160,7 +159,7 @@ class QoS(object): ACKed asynchronously through a call to :meth:`ack`. Messages that are received, but not ACKed will not be delivered by the broker to another consumer until an ACK is received, or the session is closed. Messages - are referred to using delivery_tag integers, which are unique per + are referred to using delivery_tag, which are unique per :class:`Channel`. Delivery tags are managed outside of this object and are passed in with a message to :meth:`append`. Un-ACKed messages can be looked up from QoS using :meth:`get` and can be rejected and @@ -214,15 +213,15 @@ class QoS(object): def append(self, message, delivery_tag): """Append message to the list of un-ACKed messages. - Add a message, referenced by the integer delivery_tag, for ACKing, + Add a message, referenced by the delivery_tag, for ACKing, rejecting, or getting later. Messages are saved into an :class:`~kombu.utils.compat.OrderedDict` by delivery_tag. :param message: A received message that has not yet been ACKed. :type message: qpid.messaging.Message - :param delivery_tag: An integer number to refer to this message by + :param delivery_tag: A UUID to refer to this message by upon receipt. - :type delivery_tag: int + :type delivery_tag: uuid.UUID """ self._not_yet_acked[delivery_tag] = message @@ -233,7 +232,7 @@ class QoS(object): :param delivery_tag: The delivery tag associated with the message to be returned. - :type delivery_tag: int + :type delivery_tag: uuid.UUID :return: An un-ACKed message that is looked up by delivery_tag. :rtype: qpid.messaging.Message @@ -248,7 +247,7 @@ class QoS(object): :param delivery_tag: the delivery tag associated with the message to be acknowledged. - :type delivery_tag: int + :type delivery_tag: uuid.UUID """ message = self._not_yet_acked.pop(delivery_tag) self.session.acknowledge(message=message) @@ -266,7 +265,7 @@ class QoS(object): :param delivery_tag: The delivery tag associated with the message to be rejected. - :type delivery_tag: int + :type delivery_tag: uuid.UUID :keyword requeue: If True, the broker will be notified to requeue the message. If False, the broker will be told to drop the message entirely. In both cases, the message will be removed @@ -311,10 +310,9 @@ class Channel(base.StdChannel): Messages sent using this Channel are assigned a delivery_tag. The delivery_tag is generated for a message as they are prepared for sending by :meth:`basic_publish`. The delivery_tag is unique per - Channel instance using :meth:`~itertools.count`. The delivery_tag has - no meaningful context in other objects, and is only maintained in the - memory of this object, and the underlying :class:`QoS` object that - provides support. + Channel instance. The delivery_tag has no meaningful context in other + objects, and is only maintained in the memory of this object, and the + underlying :class:`QoS` object that provides support. Each Channel object instantiates exactly one :class:`QoS` object for prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is @@ -842,7 +840,7 @@ class Channel(base.StdChannel): :param delivery_tag: The delivery tag associated with the message to be acknowledged. - :type delivery_tag: int + :type delivery_tag: uuid.UUID """ self.qos.ack(delivery_tag) @@ -860,7 +858,7 @@ class Channel(base.StdChannel): :param delivery_tag: The delivery tag associated with the message to be rejected. - :type delivery_tag: int + :type delivery_tag: uuid.UUID :keyword requeue: If False, the rejected message will be dropped by the broker and not delivered to any other consumers. If True, then the rejected message will be requeued for delivery to @@ -901,10 +899,9 @@ class Channel(base.StdChannel): handled by the caller of :meth:`~Transport.drain_events`. Messages can be ACKed after being received through a call to :meth:`basic_ack`. - If no_ack is True, the messages are immediately ACKed to avoid a - memory leak in qpid.messaging when messages go un-ACKed. The no_ack - flag indicates that the receiver of the message does not intent to - call :meth:`basic_ack`. + If no_ack is True, The no_ack flag indicates that the receiver of + the message will not call :meth:`basic_ack` later. Since the + message will not be ACKed later, it is ACKed immediately. :meth:`basic_consume` transforms the message object type prior to calling the callback. Initially the message comes in as a @@ -940,9 +937,7 @@ class Channel(base.StdChannel): delivery_tag = message.delivery_tag self.qos.append(qpid_message, delivery_tag) if no_ack: - # Celery will not ack this message later, so we should to - # avoid a memory leak in qpid.messaging due to un-ACKed - # messages. + # Celery will not ack this message later, so we should ack now self.basic_ack(delivery_tag) return callback(message) @@ -1068,7 +1063,7 @@ class Channel(base.StdChannel): - wraps the body as a buffer object, so that :class:`qpid.messaging.endpoints.Sender` uses a content type that can support arbitrarily large messages. - - assigns a random delivery_tag + - sets delivery_tag to a random uuid.UUID - sets the exchange and routing_key info as delivery_info Internally uses :meth:`_put` to send the message synchronously. This @@ -1094,7 +1089,7 @@ class Channel(base.StdChannel): props = message['properties'] props.update( body_encoding=body_encoding, - delivery_tag=random.randint(1, sys.maxint), + delivery_tag=uuid.uuid4(), ) props['delivery_info'].update( exchange=exchange, -- 2.4.3