Blob Blame History Raw
From fca31f82b2ec561422e8f0d3f8295bc75cb99f2c Mon Sep 17 00:00:00 2001
From: Kenneth Giusti <kgiusti@gmail.com>
Date: Fri, 14 Nov 2014 17:06:58 -0500
Subject: [PATCH] Create a new connection when a process fork has been detected

This patch attempts to deal with applications that have forked the
process after connecting to the broker.  First, the creation of the
connection is delayed until the application attempts to perform a
messaging operation.  Second, each time the application performs a
messaging operation the current process id is checked against the id
of the process that created the connection.  If the process ids do not
match, the application has called os.fork().  The new child process
discards the existing connection and creates a new one.

Change-Id: I5455cb0f8d380d6b65f1268b34a91355cbb14aa2
Closes-Bug: #1392868

Conflicts:
	oslo/messaging/_drivers/protocols/amqp/controller.py
---
 .../_drivers/protocols/amqp/controller.py          | 16 ++++-----
 oslo/messaging/_drivers/protocols/amqp/driver.py   | 41 ++++++++++++++--------
 .../messaging/_drivers/protocols/amqp/eventloop.py | 35 +++++++++++-------
 3 files changed, 57 insertions(+), 35 deletions(-)

diff --git a/oslo/messaging/_drivers/protocols/amqp/controller.py b/oslo/messaging/_drivers/protocols/amqp/controller.py
index eddd9a3..0b1118c 100644
--- a/oslo/messaging/_drivers/protocols/amqp/controller.py
+++ b/oslo/messaging/_drivers/protocols/amqp/controller.py
@@ -102,8 +102,11 @@ class Replies(pyngus.ReceiverEventHandler):
         self._correlation = {}  # map of correlation-id to response queue
         self._ready = False
         self._on_ready = on_ready
+        rname = "Consumer-%s:src=[dynamic]:tgt=replies" % uuid.uuid4().hex
         self._receiver = connection.create_receiver("replies",
-                                                    event_handler=self)
+                                                    event_handler=self,
+                                                    name=rname)
+
         # capacity determines the maximum number of reply messages this link
         # can receive. As messages are received and credit is consumed, this
         # driver will 'top up' the credit back to max capacity.  This number
@@ -296,8 +299,6 @@ class Controller(pyngus.ConnectionEventHandler):
         self.broadcast_prefix = config.amqp1.broadcast_prefix
         self.group_request_prefix = config.amqp1.group_request_prefix
         self._container_name = config.amqp1.container_name
-        if not self._container_name:
-            self._container_name = "container-%s" % uuid.uuid4().hex
         self.idle_timeout = config.amqp1.idle_timeout
         self.trace_protocol = config.amqp1.trace
         self.ssl_ca_file = config.amqp1.ssl_ca_file
@@ -332,14 +333,13 @@ class Controller(pyngus.ConnectionEventHandler):
         self._tasks.put(task)
         self._schedule_task_processing()
 
-    def destroy(self):
+    def shutdown(self, wait=True, timeout=None):
         """Shutdown the messaging service."""
         if self.processor:
-            self.processor.wakeup(lambda: self._start_shutdown())
-            LOG.info("Waiting for eventloop to exit")
-            self.processor.join()
+            LOG.debug("Waiting for eventloop to exit")
+            self.processor.shutdown(wait, timeout)
             self.processor = None
-        LOG.info("Eventloop exited, driver shut down")
+        LOG.debug("Eventloop exited, driver shut down")
 
     # The remaining methods are reserved to run from the eventloop thread only!
     # They must not be invoked directly!
diff --git a/oslo/messaging/_drivers/protocols/amqp/driver.py b/oslo/messaging/_drivers/protocols/amqp/driver.py
index 2d72994..7c8e0f3 100644
--- a/oslo/messaging/_drivers/protocols/amqp/driver.py
+++ b/oslo/messaging/_drivers/protocols/amqp/driver.py
@@ -21,6 +21,7 @@ messaging protocol.  The driver sends messages and creates subscriptions via
 """
 
 import logging
+import os
 import threading
 import time
 
@@ -231,25 +232,36 @@ class ProtonDriver(base.BaseDriver):
 
         super(ProtonDriver, self).__init__(conf, url, default_exchange,
                                            allowed_remote_exmods)
+        # TODO(grs): handle authentication etc
+        self._hosts = url.hosts
+        self._conf = conf
+        self._default_exchange = default_exchange
 
-        # Create a Controller that connects to the messaging service:
-        self._ctrl = controller.Controller(url.hosts, default_exchange, conf)
-
-        # lazy connection setup - don't cause the controller to connect until
+        # lazy connection setup - don't create the controller until
         # after the first messaging request:
-        self._connect_called = False
+        self._ctrl = None
+        self._pid = None
         self._lock = threading.Lock()
 
     def _ensure_connect_called(func):
-        """Causes the controller to connect to the messaging service when it is
-        first used. It is safe to push tasks to it whether connected or not,
-        but those tasks won't be processed until connection completes.
+        """Causes a new controller to be created when the messaging service is
+        first used by the current process. It is safe to push tasks to it
+        whether connected or not, but those tasks won't be processed until
+        connection completes.
         """
         def wrap(self, *args, **kws):
             with self._lock:
-                connect_called = self._connect_called
-                self._connect_called = True
-            if not connect_called:
+                old_pid = self._pid
+                self._pid = os.getpid()
+
+            if old_pid != self._pid:
+                if self._ctrl is not None:
+                    LOG.warning("Process forked after connection established!")
+                    self._ctrl.shutdown(wait=False)
+                # Create a Controller that connects to the messaging service:
+                self._ctrl = controller.Controller(self._hosts,
+                                                   self._default_exchange,
+                                                   self._conf)
                 self._ctrl.connect()
             return func(self, *args, **kws)
         return wrap
@@ -315,6 +327,7 @@ class ProtonDriver(base.BaseDriver):
 
     def cleanup(self):
         """Release all resources."""
-        LOG.debug("Cleaning up ProtonDriver")
-        self._ctrl.destroy()
-        self._ctrl = None
+        if self._ctrl:
+            self._ctrl.shutdown()
+            self._ctrl = None
+        LOG.info("AMQP 1.0 messaging driver shutdown")
diff --git a/oslo/messaging/_drivers/protocols/amqp/eventloop.py b/oslo/messaging/_drivers/protocols/amqp/eventloop.py
index f3d235a..04ff868 100644
--- a/oslo/messaging/_drivers/protocols/amqp/eventloop.py
+++ b/oslo/messaging/_drivers/protocols/amqp/eventloop.py
@@ -235,7 +235,7 @@ class Thread(threading.Thread):
 
         # Configure a container
         if container_name is None:
-            container_name = uuid.uuid4().hex
+            container_name = "Container-" + uuid.uuid4().hex
         self._container = pyngus.Container(container_name)
 
         self.name = "Thread for Proton container: %s" % self._container.name
@@ -245,25 +245,27 @@ class Thread(threading.Thread):
 
     def wakeup(self, request=None):
         """Wake up the eventloop thread, Optionally providing a callable to run
-        when the eventloop wakes up.
+        when the eventloop wakes up.  Thread safe.
         """
         self._requests.wakeup(request)
 
+    def shutdown(self, wait=True, timeout=None):
+        """Shutdown the eventloop thread.  Thread safe.
+        """
+        LOG.debug("eventloop shutdown requested")
+        self._shutdown = True
+        self.wakeup()
+        if wait:
+            self.join(timeout=timeout)
+        LOG.debug("eventloop shutdown complete")
+
+    # the following methods are not thread safe - they must be run from the
+    # eventloop thread
+
     def schedule(self, request, delay):
         """Invoke request after delay seconds."""
         self._schedule.schedule(request, delay)
 
-    def destroy(self):
-        """Stop the processing thread, releasing all resources.
-        """
-        LOG.debug("Stopping Proton container %s", self._container.name)
-        self.wakeup(lambda: self.shutdown())
-        self.join()
-
-    def shutdown(self):
-        LOG.info("eventloop shutdown requested")
-        self._shutdown = True
-
     def connect(self, host, handler, properties=None, name=None):
         """Get a _SocketConnection to a peer represented by url."""
         key = name or "%s:%i" % (host.hostname, host.port)
@@ -311,6 +313,13 @@ class Thread(threading.Thread):
                                 str(serror))
                     continue
                 raise  # assuming fatal...
+
+            # don't process any I/O or timers if woken up by a shutdown:
+            # if we've been forked we don't want to do I/O on the parent's
+            # sockets
+            if self._shutdown:
+                break
+
             readable, writable, ignore = results
 
             for r in readable: