|
|
27f258f |
From 5ef3cba9682fd7b12493af6db4628ae2962d6998 Mon Sep 17 00:00:00 2001
|
|
|
27f258f |
From: Brian Bouterse <bmbouter@gmail.com>
|
|
|
27f258f |
Date: Fri, 22 Jan 2016 16:39:13 -0500
|
|
|
27f258f |
Subject: [PATCH 1/4] @acks_late usage in Qpid Transport now acks all messages
|
|
|
27f258f |
|
|
|
27f258f |
Implements a workaround for celery/celery#3019
|
|
|
27f258f |
---
|
|
|
27f258f |
kombu/transport/qpid.py | 5 +++++
|
|
|
27f258f |
1 file changed, 5 insertions(+)
|
|
|
27f258f |
|
|
|
27f258f |
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
|
|
|
27f258f |
index 639d837..8b6301e 100644
|
|
|
27f258f |
--- a/kombu/transport/qpid.py
|
|
|
27f258f |
+++ b/kombu/transport/qpid.py
|
|
|
27f258f |
@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the
|
|
|
27f258f |
from __future__ import absolute_import
|
|
|
27f258f |
|
|
|
27f258f |
import os
|
|
|
27f258f |
+import random
|
|
|
27f258f |
import select
|
|
|
27f258f |
import socket
|
|
|
27f258f |
import ssl
|
|
|
27f258f |
@@ -938,6 +939,10 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
|
|
|
27f258f |
def _callback(qpid_message):
|
|
|
27f258f |
raw_message = qpid_message.content
|
|
|
27f258f |
+
|
|
|
27f258f |
+ # workaround for https://github.com/celery/celery/issues/3019
|
|
|
27f258f |
+ raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000)
|
|
|
27f258f |
+
|
|
|
27f258f |
message = self.Message(self, raw_message)
|
|
|
27f258f |
delivery_tag = message.delivery_tag
|
|
|
27f258f |
self.qos.append(qpid_message, delivery_tag)
|
|
|
27f258f |
--
|
|
|
27f258f |
2.4.3
|
|
|
27f258f |
|
|
|
27f258f |
|
|
|
27f258f |
From f7483a3cde70e488e308132295c23c39ee469092 Mon Sep 17 00:00:00 2001
|
|
|
27f258f |
From: Brian Bouterse <bmbouter@gmail.com>
|
|
|
27f258f |
Date: Mon, 25 Jan 2016 11:43:20 -0500
|
|
|
27f258f |
Subject: [PATCH 2/4] Revert "@acks_late usage in Qpid Transport now acks all
|
|
|
27f258f |
messages"
|
|
|
27f258f |
|
|
|
27f258f |
This reverts commit 5ef3cba9682fd7b12493af6db4628ae2962d6998.
|
|
|
27f258f |
---
|
|
|
27f258f |
kombu/transport/qpid.py | 5 -----
|
|
|
27f258f |
1 file changed, 5 deletions(-)
|
|
|
27f258f |
|
|
|
27f258f |
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
|
|
|
27f258f |
index 8b6301e..639d837 100644
|
|
|
27f258f |
--- a/kombu/transport/qpid.py
|
|
|
27f258f |
+++ b/kombu/transport/qpid.py
|
|
|
27f258f |
@@ -74,7 +74,6 @@ Celery with Kombu, this can be accomplished by setting the
|
|
|
27f258f |
from __future__ import absolute_import
|
|
|
27f258f |
|
|
|
27f258f |
import os
|
|
|
27f258f |
-import random
|
|
|
27f258f |
import select
|
|
|
27f258f |
import socket
|
|
|
27f258f |
import ssl
|
|
|
27f258f |
@@ -939,10 +938,6 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
|
|
|
27f258f |
def _callback(qpid_message):
|
|
|
27f258f |
raw_message = qpid_message.content
|
|
|
27f258f |
-
|
|
|
27f258f |
- # workaround for https://github.com/celery/celery/issues/3019
|
|
|
27f258f |
- raw_message['properties']['delivery_tag'] = random.randint(1, 100000000000)
|
|
|
27f258f |
-
|
|
|
27f258f |
message = self.Message(self, raw_message)
|
|
|
27f258f |
delivery_tag = message.delivery_tag
|
|
|
27f258f |
self.qos.append(qpid_message, delivery_tag)
|
|
|
27f258f |
--
|
|
|
27f258f |
2.4.3
|
|
|
27f258f |
|
|
|
27f258f |
|
|
|
27f258f |
From affa5f5e09c75c660f5ffbafd8aaedc7b8cdae5e Mon Sep 17 00:00:00 2001
|
|
|
27f258f |
From: Brian Bouterse <bmbouter@gmail.com>
|
|
|
27f258f |
Date: Mon, 25 Jan 2016 11:54:45 -0500
|
|
|
27f258f |
Subject: [PATCH 3/4] @acks_late usage in Qpid Transport now acks all messages
|
|
|
27f258f |
|
|
|
27f258f |
Implements a workaround for celery/celery#3019
|
|
|
27f258f |
---
|
|
|
27f258f |
kombu/tests/transport/test_qpid.py | 4 ----
|
|
|
27f258f |
kombu/transport/qpid.py | 8 +++-----
|
|
|
27f258f |
2 files changed, 3 insertions(+), 9 deletions(-)
|
|
|
27f258f |
|
|
|
27f258f |
diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
|
|
|
27f258f |
index 4131193..a72077b 100644
|
|
|
27f258f |
--- a/kombu/tests/transport/test_qpid.py
|
|
|
27f258f |
+++ b/kombu/tests/transport/test_qpid.py
|
|
|
27f258f |
@@ -940,10 +940,6 @@ class TestChannel(ExtraAssertionsMixin, Case):
|
|
|
27f258f |
self.assertIn('base64', Channel.codecs)
|
|
|
27f258f |
self.assertIsInstance(Channel.codecs['base64'], Base64)
|
|
|
27f258f |
|
|
|
27f258f |
- def test_delivery_tags(self):
|
|
|
27f258f |
- """Test that _delivery_tags is using itertools"""
|
|
|
27f258f |
- self.assertIsInstance(Channel._delivery_tags, count)
|
|
|
27f258f |
-
|
|
|
27f258f |
def test_size(self):
|
|
|
27f258f |
"""Test getting the number of messages in a queue specified by
|
|
|
27f258f |
name and returning them."""
|
|
|
27f258f |
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
|
|
|
27f258f |
index 639d837..6d9b006 100644
|
|
|
27f258f |
--- a/kombu/transport/qpid.py
|
|
|
27f258f |
+++ b/kombu/transport/qpid.py
|
|
|
27f258f |
@@ -74,6 +74,7 @@ Celery with Kombu, this can be accomplished by setting the
|
|
|
27f258f |
from __future__ import absolute_import
|
|
|
27f258f |
|
|
|
27f258f |
import os
|
|
|
27f258f |
+import random
|
|
|
27f258f |
import select
|
|
|
27f258f |
import socket
|
|
|
27f258f |
import ssl
|
|
|
27f258f |
@@ -368,9 +369,6 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
#: Binary <-> ASCII codecs.
|
|
|
27f258f |
codecs = {'base64': Base64()}
|
|
|
27f258f |
|
|
|
27f258f |
- #: counter used to generate delivery tags for this channel.
|
|
|
27f258f |
- _delivery_tags = count(1)
|
|
|
27f258f |
-
|
|
|
27f258f |
def __init__(self, connection, transport):
|
|
|
27f258f |
"""Instantiate a Channel object.
|
|
|
27f258f |
|
|
|
27f258f |
@@ -1070,7 +1068,7 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
- wraps the body as a buffer object, so that
|
|
|
27f258f |
:class:`qpid.messaging.endpoints.Sender` uses a content type
|
|
|
27f258f |
that can support arbitrarily large messages.
|
|
|
27f258f |
- - assigns a delivery_tag generated through self._delivery_tags
|
|
|
27f258f |
+ - assigns a random delivery_tag
|
|
|
27f258f |
- sets the exchange and routing_key info as delivery_info
|
|
|
27f258f |
|
|
|
27f258f |
Internally uses :meth:`_put` to send the message synchronously. This
|
|
|
27f258f |
@@ -1096,7 +1094,7 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
props = message['properties']
|
|
|
27f258f |
props.update(
|
|
|
27f258f |
body_encoding=body_encoding,
|
|
|
27f258f |
- delivery_tag=next(self._delivery_tags),
|
|
|
27f258f |
+ delivery_tag=random.randint(1, sys.maxint),
|
|
|
27f258f |
)
|
|
|
27f258f |
props['delivery_info'].update(
|
|
|
27f258f |
exchange=exchange,
|
|
|
27f258f |
--
|
|
|
27f258f |
2.4.3
|
|
|
27f258f |
|
|
|
27f258f |
|
|
|
27f258f |
From 7d6af48c06002deffc135c7fad506909fbf840e6 Mon Sep 17 00:00:00 2001
|
|
|
27f258f |
From: Brian Bouterse <bmbouter@gmail.com>
|
|
|
27f258f |
Date: Mon, 25 Jan 2016 16:07:53 -0500
|
|
|
27f258f |
Subject: [PATCH 4/4] Switches delivery_tag to uuid.uuid4() for Qpid transport
|
|
|
27f258f |
|
|
|
27f258f |
celery/kombu#563
|
|
|
27f258f |
---
|
|
|
27f258f |
kombu/tests/transport/test_qpid.py | 3 ++-
|
|
|
27f258f |
kombu/transport/qpid.py | 43 +++++++++++++++++---------------------
|
|
|
27f258f |
2 files changed, 21 insertions(+), 25 deletions(-)
|
|
|
27f258f |
|
|
|
27f258f |
diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
|
|
|
27f258f |
index a72077b..a3894e4 100644
|
|
|
27f258f |
--- a/kombu/tests/transport/test_qpid.py
|
|
|
27f258f |
+++ b/kombu/tests/transport/test_qpid.py
|
|
|
27f258f |
@@ -5,6 +5,7 @@ import ssl
|
|
|
27f258f |
import socket
|
|
|
27f258f |
import sys
|
|
|
27f258f |
import time
|
|
|
27f258f |
+import uuid
|
|
|
27f258f |
|
|
|
27f258f |
from collections import Callable
|
|
|
27f258f |
from itertools import count
|
|
|
27f258f |
@@ -1317,7 +1318,7 @@ class TestChannel(ExtraAssertionsMixin, Case):
|
|
|
27f258f |
mock_message['properties']['body_encoding'], mock_body_encoding,
|
|
|
27f258f |
)
|
|
|
27f258f |
self.assertIsInstance(
|
|
|
27f258f |
- mock_message['properties']['delivery_tag'], int,
|
|
|
27f258f |
+ mock_message['properties']['delivery_tag'], uuid.UUID,
|
|
|
27f258f |
)
|
|
|
27f258f |
self.assertIs(
|
|
|
27f258f |
mock_message['properties']['delivery_info']['exchange'],
|
|
|
27f258f |
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
|
|
|
27f258f |
index 6d9b006..b458d32 100644
|
|
|
27f258f |
--- a/kombu/transport/qpid.py
|
|
|
27f258f |
+++ b/kombu/transport/qpid.py
|
|
|
27f258f |
@@ -74,14 +74,13 @@ Celery with Kombu, this can be accomplished by setting the
|
|
|
27f258f |
from __future__ import absolute_import
|
|
|
27f258f |
|
|
|
27f258f |
import os
|
|
|
27f258f |
-import random
|
|
|
27f258f |
import select
|
|
|
27f258f |
import socket
|
|
|
27f258f |
import ssl
|
|
|
27f258f |
import sys
|
|
|
27f258f |
import time
|
|
|
27f258f |
+import uuid
|
|
|
27f258f |
|
|
|
27f258f |
-from itertools import count
|
|
|
27f258f |
from gettext import gettext as _
|
|
|
27f258f |
|
|
|
27f258f |
import amqp.protocol
|
|
|
27f258f |
@@ -160,7 +159,7 @@ class QoS(object):
|
|
|
27f258f |
ACKed asynchronously through a call to :meth:`ack`. Messages that are
|
|
|
27f258f |
received, but not ACKed will not be delivered by the broker to another
|
|
|
27f258f |
consumer until an ACK is received, or the session is closed. Messages
|
|
|
27f258f |
- are referred to using delivery_tag integers, which are unique per
|
|
|
27f258f |
+ are referred to using delivery_tag, which are unique per
|
|
|
27f258f |
:class:`Channel`. Delivery tags are managed outside of this object and
|
|
|
27f258f |
are passed in with a message to :meth:`append`. Un-ACKed messages can
|
|
|
27f258f |
be looked up from QoS using :meth:`get` and can be rejected and
|
|
|
27f258f |
@@ -214,15 +213,15 @@ class QoS(object):
|
|
|
27f258f |
def append(self, message, delivery_tag):
|
|
|
27f258f |
"""Append message to the list of un-ACKed messages.
|
|
|
27f258f |
|
|
|
27f258f |
- Add a message, referenced by the integer delivery_tag, for ACKing,
|
|
|
27f258f |
+ Add a message, referenced by the delivery_tag, for ACKing,
|
|
|
27f258f |
rejecting, or getting later. Messages are saved into an
|
|
|
27f258f |
:class:`~kombu.utils.compat.OrderedDict` by delivery_tag.
|
|
|
27f258f |
|
|
|
27f258f |
:param message: A received message that has not yet been ACKed.
|
|
|
27f258f |
:type message: qpid.messaging.Message
|
|
|
27f258f |
- :param delivery_tag: An integer number to refer to this message by
|
|
|
27f258f |
+ :param delivery_tag: A UUID to refer to this message by
|
|
|
27f258f |
upon receipt.
|
|
|
27f258f |
- :type delivery_tag: int
|
|
|
27f258f |
+ :type delivery_tag: uuid.UUID
|
|
|
27f258f |
"""
|
|
|
27f258f |
self._not_yet_acked[delivery_tag] = message
|
|
|
27f258f |
|
|
|
27f258f |
@@ -233,7 +232,7 @@ class QoS(object):
|
|
|
27f258f |
|
|
|
27f258f |
:param delivery_tag: The delivery tag associated with the message
|
|
|
27f258f |
to be returned.
|
|
|
27f258f |
- :type delivery_tag: int
|
|
|
27f258f |
+ :type delivery_tag: uuid.UUID
|
|
|
27f258f |
|
|
|
27f258f |
:return: An un-ACKed message that is looked up by delivery_tag.
|
|
|
27f258f |
:rtype: qpid.messaging.Message
|
|
|
27f258f |
@@ -248,7 +247,7 @@ class QoS(object):
|
|
|
27f258f |
|
|
|
27f258f |
:param delivery_tag: the delivery tag associated with the message
|
|
|
27f258f |
to be acknowledged.
|
|
|
27f258f |
- :type delivery_tag: int
|
|
|
27f258f |
+ :type delivery_tag: uuid.UUID
|
|
|
27f258f |
"""
|
|
|
27f258f |
message = self._not_yet_acked.pop(delivery_tag)
|
|
|
27f258f |
self.session.acknowledge(message=message)
|
|
|
27f258f |
@@ -266,7 +265,7 @@ class QoS(object):
|
|
|
27f258f |
|
|
|
27f258f |
:param delivery_tag: The delivery tag associated with the message
|
|
|
27f258f |
to be rejected.
|
|
|
27f258f |
- :type delivery_tag: int
|
|
|
27f258f |
+ :type delivery_tag: uuid.UUID
|
|
|
27f258f |
:keyword requeue: If True, the broker will be notified to requeue
|
|
|
27f258f |
the message. If False, the broker will be told to drop the
|
|
|
27f258f |
message entirely. In both cases, the message will be removed
|
|
|
27f258f |
@@ -311,10 +310,9 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
Messages sent using this Channel are assigned a delivery_tag. The
|
|
|
27f258f |
delivery_tag is generated for a message as they are prepared for
|
|
|
27f258f |
sending by :meth:`basic_publish`. The delivery_tag is unique per
|
|
|
27f258f |
- Channel instance using :meth:`~itertools.count`. The delivery_tag has
|
|
|
27f258f |
- no meaningful context in other objects, and is only maintained in the
|
|
|
27f258f |
- memory of this object, and the underlying :class:`QoS` object that
|
|
|
27f258f |
- provides support.
|
|
|
27f258f |
+ Channel instance. The delivery_tag has no meaningful context in other
|
|
|
27f258f |
+ objects, and is only maintained in the memory of this object, and the
|
|
|
27f258f |
+ underlying :class:`QoS` object that provides support.
|
|
|
27f258f |
|
|
|
27f258f |
Each Channel object instantiates exactly one :class:`QoS` object for
|
|
|
27f258f |
prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is
|
|
|
27f258f |
@@ -842,7 +840,7 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
|
|
|
27f258f |
:param delivery_tag: The delivery tag associated with the message
|
|
|
27f258f |
to be acknowledged.
|
|
|
27f258f |
- :type delivery_tag: int
|
|
|
27f258f |
+ :type delivery_tag: uuid.UUID
|
|
|
27f258f |
"""
|
|
|
27f258f |
self.qos.ack(delivery_tag)
|
|
|
27f258f |
|
|
|
27f258f |
@@ -860,7 +858,7 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
|
|
|
27f258f |
:param delivery_tag: The delivery tag associated with the message
|
|
|
27f258f |
to be rejected.
|
|
|
27f258f |
- :type delivery_tag: int
|
|
|
27f258f |
+ :type delivery_tag: uuid.UUID
|
|
|
27f258f |
:keyword requeue: If False, the rejected message will be dropped by
|
|
|
27f258f |
the broker and not delivered to any other consumers. If True,
|
|
|
27f258f |
then the rejected message will be requeued for delivery to
|
|
|
27f258f |
@@ -901,10 +899,9 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
handled by the caller of :meth:`~Transport.drain_events`. Messages
|
|
|
27f258f |
can be ACKed after being received through a call to :meth:`basic_ack`.
|
|
|
27f258f |
|
|
|
27f258f |
- If no_ack is True, the messages are immediately ACKed to avoid a
|
|
|
27f258f |
- memory leak in qpid.messaging when messages go un-ACKed. The no_ack
|
|
|
27f258f |
- flag indicates that the receiver of the message does not intent to
|
|
|
27f258f |
- call :meth:`basic_ack`.
|
|
|
27f258f |
+ If no_ack is True, The no_ack flag indicates that the receiver of
|
|
|
27f258f |
+ the message will not call :meth:`basic_ack` later. Since the
|
|
|
27f258f |
+ message will not be ACKed later, it is ACKed immediately.
|
|
|
27f258f |
|
|
|
27f258f |
:meth:`basic_consume` transforms the message object type prior to
|
|
|
27f258f |
calling the callback. Initially the message comes in as a
|
|
|
27f258f |
@@ -940,9 +937,7 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
delivery_tag = message.delivery_tag
|
|
|
27f258f |
self.qos.append(qpid_message, delivery_tag)
|
|
|
27f258f |
if no_ack:
|
|
|
27f258f |
- # Celery will not ack this message later, so we should to
|
|
|
27f258f |
- # avoid a memory leak in qpid.messaging due to un-ACKed
|
|
|
27f258f |
- # messages.
|
|
|
27f258f |
+ # Celery will not ack this message later, so we should ack now
|
|
|
27f258f |
self.basic_ack(delivery_tag)
|
|
|
27f258f |
return callback(message)
|
|
|
27f258f |
|
|
|
27f258f |
@@ -1068,7 +1063,7 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
- wraps the body as a buffer object, so that
|
|
|
27f258f |
:class:`qpid.messaging.endpoints.Sender` uses a content type
|
|
|
27f258f |
that can support arbitrarily large messages.
|
|
|
27f258f |
- - assigns a random delivery_tag
|
|
|
27f258f |
+ - sets delivery_tag to a random uuid.UUID
|
|
|
27f258f |
- sets the exchange and routing_key info as delivery_info
|
|
|
27f258f |
|
|
|
27f258f |
Internally uses :meth:`_put` to send the message synchronously. This
|
|
|
27f258f |
@@ -1094,7 +1089,7 @@ class Channel(base.StdChannel):
|
|
|
27f258f |
props = message['properties']
|
|
|
27f258f |
props.update(
|
|
|
27f258f |
body_encoding=body_encoding,
|
|
|
27f258f |
- delivery_tag=random.randint(1, sys.maxint),
|
|
|
27f258f |
+ delivery_tag=uuid.uuid4(),
|
|
|
27f258f |
)
|
|
|
27f258f |
props['delivery_info'].update(
|
|
|
27f258f |
exchange=exchange,
|
|
|
27f258f |
--
|
|
|
27f258f |
2.4.3
|
|
|
27f258f |
|